Skip to content
Snippets Groups Projects
common.py 7.52 KiB
import json
import logging
import re
import time

import jsonschema
import redis
import redis.sentinel

logger = logging.getLogger(__name__)

DEFAULT_REDIS_SENTINEL_TIMEOUT = 0.1
DEFAULT_SENTINEL_SOCKET_TIMEOUT = 0.1

DB_LATCH_SCHEMA = {
    '$schema': 'https://json-schema.org/draft-07/schema#',
    'type': 'object',
    'properties': {
        'current': {'type': 'integer'},
        'next': {'type': 'integer'},
        'this': {'type': 'integer'},
        'pending': {'type': 'boolean'},
        'failure': {'type': 'boolean'},
        'timestamp': {'type': 'number'},
        'update-started': {'type': 'number'}
    },
    'required': ['current', 'next', 'this'],
    'additionalProperties': False
}

TASK_LOG_SCHEMA = {
    '$schema': 'https://json-schema.org/draft-07/schema#',

    'definitions': {
        'meta': {
            'type': 'object',
            'properties': {
                'task': {'type': 'string'},
                'id': {'type': 'string'},
                'worker': {'type': 'string'},
                'pid': {'type': 'integer'},
                'warning': {'type': 'boolean'},
                'error': {'type': 'boolean'},
                'args': {'type': 'array'}
            },
            'required': ['task', 'id', 'worker', 'pid', 'warning', 'error'],
            'additionalProperties': False
        },
        'message': {
            'type': 'object',
            'properties': {
                'message': {'type': 'string'},
                'level': {
                    'type': 'string',
                    'enum': ['INFO', 'WARNING', 'ERROR']
                }
            }
        }
    },

    'type': 'object',
    'properties': {
        'meta': {'$ref': '#/definitions/meta'},
        'messages': {
            'type': 'array',
            'items': {'$ref': '#/definitions/message'}
        }
    },
    'required': ['meta', 'messages'],
    'additionalProperties': False
}


def get_latch(r):
    latch = r.get('db:latch')
    if latch is None:
        logger.error('no latch key found in db')
        return None
    try:
        latch = json.loads(latch.decode('utf-8'))
        jsonschema.validate(latch, DB_LATCH_SCHEMA)
    except (jsonschema.ValidationError, json.JSONDecodeError):
        logging.exception('error validating latch value')
        return None

    return latch


def update_latch_status(config, pending=False, failure=False):
    logger.debug('updating latch status: pending={}, failure={}'.format(
        pending, failure))

    now = time.time()
    for db in config['redis-databases']:
        r = _get_redis(config, dbid=db)
        latch = get_latch(r)
        if not latch:
            continue
        if not pending and not failure:
            if not latch['pending'] and not latch['failure']:
                logger.error(
                    f'updating latch for db {db} with pending=True, '
                    f'but latch is already {latch}')
            latch['timestamp'] = now
        if not latch['pending'] and pending:
            # we are starting a new update
            latch['update-started'] = now
        latch['pending'] = pending
        latch['failure'] = failure
        r.set('db:latch', json.dumps(latch))


def set_latch(config, new_current, new_next, timestamp, update_timestamp):

    logger.debug('setting latch: new current={}, new next={}'.format(
        new_current, new_next))

    for db in config['redis-databases']:
        r = _get_redis(config, dbid=db)
        set_single_latch(
            r, db, new_current, new_next, timestamp, update_timestamp)


def set_single_latch(
        r, db, new_current, new_next, timestamp, update_timestamp):

    latch = {
        'current': new_current,
        'next': new_next,
        'this': db,
        'pending': False,
        'failure': False,
        'timestamp': timestamp,
        'update-started': update_timestamp
    }
    r.set('db:latch', json.dumps(latch))


def latch_db(config):
    db_ids = config['redis-databases']
    db_ids = sorted(set(db_ids))

    r = get_next_redis(config)
    latch = get_latch(r)
    if not latch:
        latch = {
            'current': db_ids[0],
            'next': db_ids[0]
        }

    next_idx = db_ids.index(latch['next'])
    next_idx = (next_idx + 1) % len(db_ids)

    set_latch(
        config,
        new_current=latch['next'],
        new_next=db_ids[next_idx],
        timestamp=time.time(),
        update_timestamp=latch.get('update-started', 0))


def _get_redis(config, dbid=None):

    if dbid is None:
        dbid = min(config['redis-databases'])
        logger.debug(
            '_get_redis[1], no db specified, '
            'using minimum as first guess: {}'.format(dbid))

    if dbid not in config['redis-databases']:
        logger.error(
            '_get_redis[2], tried to connect to unknown db id: {}, '
            'using minimum: {}'.format(dbid, min(config['redis-databases'])))
        dbid = min(config['redis-databases'])

    kwargs = {
        'db': dbid
    }

    if 'sentinel' in config:
        kwargs['socket_timeout'] = config['sentinel'].get(
            'sentinel_socket_timeout', DEFAULT_SENTINEL_SOCKET_TIMEOUT)
        sentinel = redis.sentinel.Sentinel([(
            config['sentinel']['hostname'],
            config['sentinel']['port'])],
            **kwargs)
        return sentinel.master_for(
            config['sentinel']['name'],
            socket_timeout=config['sentinel'].get(
                'redis_socket_timeout', DEFAULT_REDIS_SENTINEL_TIMEOUT))
    else:
        kwargs['socket_timeout'] = config['redis'].get(
            'socket_timeout', DEFAULT_REDIS_SENTINEL_TIMEOUT)
        return redis.StrictRedis(
            host=config['redis']['hostname'],
            port=config['redis']['port'],
            **kwargs)


def get_current_redis(config):
    r = _get_redis(config)
    latch = get_latch(r)
    if not latch:
        logger.warning("can't determine current db")
        return r
    if latch['this'] == latch['current']:
        logger.debug(
            'get_current_redis[1], using first latch: {}'.format(latch))
        return r
    else:
        logger.debug(
            'get_current_redis[2], using current latch: {}'.format(latch))
        return _get_redis(config, latch['current'])


def get_next_redis(config):
    r = _get_redis(config)
    latch = get_latch(r)
    if latch and latch['this'] == latch['next']:
        logger.debug('get_next_redis[1], using first latch: {}'.format(latch))
        return r

    if latch and latch['next'] in config['redis-databases']:
        logger.debug('get_next_redis[2], using next latch: {}'.format(latch))
        next_id = latch['next']
    else:
        db_ids = sorted(set(config['redis-databases']))
        next_id = db_ids[1 % len(db_ids)]
        logger.debug(
            'get_next_redis[3], next db not configured, '
            'derived next id: {}'.format(next_id))

    return _get_redis(config, next_id)


def ims_sorted_service_type_key(service_type):
    """
    special-purpose function used for mapping IMS service type strings
    to the key used in redis ('ims:services:*')

    this method is only used by
    :meth:`inventory_provider:tasks:worker.update_circuit_hierarchy_and_port_id_services
    and :meth:`inventory_provider:routes.poller.get_service_types

    :param service_type: IMS-formatted service_type string
    :return: ims sorted service type redis key
    """  # noqa: E501

    service_type = re.sub(
        r'[^a-zA-Z0-9]+', '_', service_type.lower())
    # prettification ... e.g. no trailing _ for 'MD-VPN (NATIVE)'
    service_type = re.sub('_+$', '', service_type)
    return re.sub('^_+', '', service_type)