common.py 7.52 KiB
import json
import logging
import re
import time
import jsonschema
import redis
import redis.sentinel
logger = logging.getLogger(__name__)
DEFAULT_REDIS_SENTINEL_TIMEOUT = 0.1
DEFAULT_SENTINEL_SOCKET_TIMEOUT = 0.1
DB_LATCH_SCHEMA = {
'$schema': 'https://json-schema.org/draft-07/schema#',
'type': 'object',
'properties': {
'current': {'type': 'integer'},
'next': {'type': 'integer'},
'this': {'type': 'integer'},
'pending': {'type': 'boolean'},
'failure': {'type': 'boolean'},
'timestamp': {'type': 'number'},
'update-started': {'type': 'number'}
},
'required': ['current', 'next', 'this'],
'additionalProperties': False
}
TASK_LOG_SCHEMA = {
'$schema': 'https://json-schema.org/draft-07/schema#',
'definitions': {
'meta': {
'type': 'object',
'properties': {
'task': {'type': 'string'},
'id': {'type': 'string'},
'worker': {'type': 'string'},
'pid': {'type': 'integer'},
'warning': {'type': 'boolean'},
'error': {'type': 'boolean'},
'args': {'type': 'array'}
},
'required': ['task', 'id', 'worker', 'pid', 'warning', 'error'],
'additionalProperties': False
},
'message': {
'type': 'object',
'properties': {
'message': {'type': 'string'},
'level': {
'type': 'string',
'enum': ['INFO', 'WARNING', 'ERROR']
}
}
}
},
'type': 'object',
'properties': {
'meta': {'$ref': '#/definitions/meta'},
'messages': {
'type': 'array',
'items': {'$ref': '#/definitions/message'}
}
},
'required': ['meta', 'messages'],
'additionalProperties': False
}
def get_latch(r):
latch = r.get('db:latch')
if latch is None:
logger.error('no latch key found in db')
return None
try:
latch = json.loads(latch.decode('utf-8'))
jsonschema.validate(latch, DB_LATCH_SCHEMA)
except (jsonschema.ValidationError, json.JSONDecodeError):
logging.exception('error validating latch value')
return None
return latch
def update_latch_status(config, pending=False, failure=False):
logger.debug('updating latch status: pending={}, failure={}'.format(
pending, failure))
now = time.time()
for db in config['redis-databases']:
r = _get_redis(config, dbid=db)
latch = get_latch(r)
if not latch:
continue
if not pending and not failure:
if not latch['pending'] and not latch['failure']:
logger.error(
f'updating latch for db {db} with pending=True, '
f'but latch is already {latch}')
latch['timestamp'] = now
if not latch['pending'] and pending:
# we are starting a new update
latch['update-started'] = now
latch['pending'] = pending
latch['failure'] = failure
r.set('db:latch', json.dumps(latch))
def set_latch(config, new_current, new_next, timestamp, update_timestamp):
logger.debug('setting latch: new current={}, new next={}'.format(
new_current, new_next))
for db in config['redis-databases']:
r = _get_redis(config, dbid=db)
set_single_latch(
r, db, new_current, new_next, timestamp, update_timestamp)
def set_single_latch(
r, db, new_current, new_next, timestamp, update_timestamp):
latch = {
'current': new_current,
'next': new_next,
'this': db,
'pending': False,
'failure': False,
'timestamp': timestamp,
'update-started': update_timestamp
}
r.set('db:latch', json.dumps(latch))
def latch_db(config):
db_ids = config['redis-databases']
db_ids = sorted(set(db_ids))
r = get_next_redis(config)
latch = get_latch(r)
if not latch:
latch = {
'current': db_ids[0],
'next': db_ids[0]
}
next_idx = db_ids.index(latch['next'])
next_idx = (next_idx + 1) % len(db_ids)
set_latch(
config,
new_current=latch['next'],
new_next=db_ids[next_idx],
timestamp=time.time(),
update_timestamp=latch.get('update-started', 0))
def _get_redis(config, dbid=None):
if dbid is None:
dbid = min(config['redis-databases'])
logger.debug(
'_get_redis[1], no db specified, '
'using minimum as first guess: {}'.format(dbid))
if dbid not in config['redis-databases']:
logger.error(
'_get_redis[2], tried to connect to unknown db id: {}, '
'using minimum: {}'.format(dbid, min(config['redis-databases'])))
dbid = min(config['redis-databases'])
kwargs = {
'db': dbid
}
if 'sentinel' in config:
kwargs['socket_timeout'] = config['sentinel'].get(
'sentinel_socket_timeout', DEFAULT_SENTINEL_SOCKET_TIMEOUT)
sentinel = redis.sentinel.Sentinel([(
config['sentinel']['hostname'],
config['sentinel']['port'])],
**kwargs)
return sentinel.master_for(
config['sentinel']['name'],
socket_timeout=config['sentinel'].get(
'redis_socket_timeout', DEFAULT_REDIS_SENTINEL_TIMEOUT))
else:
kwargs['socket_timeout'] = config['redis'].get(
'socket_timeout', DEFAULT_REDIS_SENTINEL_TIMEOUT)
return redis.StrictRedis(
host=config['redis']['hostname'],
port=config['redis']['port'],
**kwargs)
def get_current_redis(config):
r = _get_redis(config)
latch = get_latch(r)
if not latch:
logger.warning("can't determine current db")
return r
if latch['this'] == latch['current']:
logger.debug(
'get_current_redis[1], using first latch: {}'.format(latch))
return r
else:
logger.debug(
'get_current_redis[2], using current latch: {}'.format(latch))
return _get_redis(config, latch['current'])
def get_next_redis(config):
r = _get_redis(config)
latch = get_latch(r)
if latch and latch['this'] == latch['next']:
logger.debug('get_next_redis[1], using first latch: {}'.format(latch))
return r
if latch and latch['next'] in config['redis-databases']:
logger.debug('get_next_redis[2], using next latch: {}'.format(latch))
next_id = latch['next']
else:
db_ids = sorted(set(config['redis-databases']))
next_id = db_ids[1 % len(db_ids)]
logger.debug(
'get_next_redis[3], next db not configured, '
'derived next id: {}'.format(next_id))
return _get_redis(config, next_id)
def ims_sorted_service_type_key(service_type):
"""
special-purpose function used for mapping IMS service type strings
to the key used in redis ('ims:services:*')
this method is only used by
:meth:`inventory_provider:tasks:worker.update_circuit_hierarchy_and_port_id_services
and :meth:`inventory_provider:routes.poller.get_service_types
:param service_type: IMS-formatted service_type string
:return: ims sorted service type redis key
""" # noqa: E501
service_type = re.sub(
r'[^a-zA-Z0-9]+', '_', service_type.lower())
# prettification ... e.g. no trailing _ for 'MD-VPN (NATIVE)'
service_type = re.sub('_+$', '', service_type)
return re.sub('^_+', '', service_type)