Skip to content
Snippets Groups Projects
Select Git revision
  • aa67f630da5d044d5551a8133a14d2ad2e9b2002
  • develop default
  • master protected
  • feature/DBOARD3-1156-move-whois/nren-naming-to-inventory-update
  • inventoryProvider-functional
  • inventoryProvider-morework2
  • circuit-service-details-fix
  • lookup-SPECTRUM-SCHF-ports
  • inventoryProvider-1267-cleanup
  • inventoryProvider-moreWork
  • feature/DBOARD3-958
  • release/0.110
  • fix-uuid-validation-error
  • docker-poc
  • 0.154
  • 0.153
  • 0.152
  • 0.151
  • 0.150
  • 0.149
  • 0.148
  • 0.147
  • 0.146
  • 0.145
  • 0.144
  • 0.143
  • 0.142
  • 0.141
  • 0.140
  • 0.139
  • 0.138
  • 0.137
  • 0.136
  • 0.135
34 results

common.py

Blame
  • user avatar
    Neda Moeini authored
    4b7f3c8c
    History
    common.py 7.60 KiB
    import json
    import logging
    import re
    import time
    
    import jsonschema
    import redis
    import redis.sentinel
    
    logger = logging.getLogger(__name__)
    
    DEFAULT_REDIS_SENTINEL_TIMEOUT = 0.1
    DEFAULT_SENTINEL_SOCKET_TIMEOUT = 0.1
    
    DB_LATCH_SCHEMA = {
        '$schema': 'https://json-schema.org/draft-07/schema#',
        'type': 'object',
        'properties': {
            'current': {'type': 'integer'},
            'next': {'type': 'integer'},
            'this': {'type': 'integer'},
            'pending': {'type': 'boolean'},
            'failure': {'type': 'boolean'},
            'timestamp': {'type': 'number'},
            'update-started': {'type': 'number'}
        },
        'required': ['current', 'next', 'this'],
        'additionalProperties': False
    }
    
    TASK_LOG_SCHEMA = {
        '$schema': 'https://json-schema.org/draft-07/schema#',
    
        'definitions': {
            'meta': {
                'type': 'object',
                'properties': {
                    'task': {'type': 'string'},
                    'id': {'type': 'string'},
                    'worker': {'type': 'string'},
                    'pid': {'type': 'integer'},
                    'warning': {'type': 'boolean'},
                    'error': {'type': 'boolean'},
                    'args': {'type': 'array'}
                },
                'required': ['task', 'id', 'worker', 'pid', 'warning', 'error'],
                'additionalProperties': False
            },
            'message': {
                'type': 'object',
                'properties': {
                    'message': {'type': 'string'},
                    'level': {
                        'type': 'string',
                        'enum': ['INFO', 'WARNING', 'ERROR']
                    }
                }
            }
        },
    
        'type': 'object',
        'properties': {
            'meta': {'$ref': '#/definitions/meta'},
            'messages': {
                'type': 'array',
                'items': {'$ref': '#/definitions/message'}
            }
        },
        'required': ['meta', 'messages'],
        'additionalProperties': False
    }
    
    
    def get_latch(r):
        latch = r.get('db:latch')
        if latch is None:
            logger.error('no latch key found in db')
            return None
        try:
            latch = json.loads(latch.decode('utf-8'))
            jsonschema.validate(latch, DB_LATCH_SCHEMA)
        except (jsonschema.ValidationError, json.JSONDecodeError):
            logging.exception('error validating latch value')
            return None
    
        return latch
    
    
    def update_latch_status(config, pending=False, failure=False):
        logger.debug('updating latch status: pending={}, failure={}'.format(
            pending, failure))
    
        now = time.time()
        for db in config['redis-databases']:
            r = _get_redis(config, dbid=db)
            latch = get_latch(r)
            if not latch:
                continue
            if not pending and not failure:
                if not latch['pending'] and not latch['failure']:
                    logger.error(
                        f'updating latch for db {db} with pending=True, '
                        f'but latch is already {latch}')
                latch['timestamp'] = now
            if not latch['pending'] and pending:
                # we are starting a new update
                latch['update-started'] = now
            latch['pending'] = pending
            latch['failure'] = failure
            r.set('db:latch', json.dumps(latch))
    
    
    def set_latch(config, new_current, new_next, timestamp, update_timestamp):
    
        logger.debug('setting latch: new current={}, new next={}'.format(
            new_current, new_next))
    
        for db in config['redis-databases']:
            r = _get_redis(config, dbid=db)
            set_single_latch(
                r, db, new_current, new_next, timestamp, update_timestamp)
    
    
    def set_single_latch(
            r, db, new_current, new_next, timestamp, update_timestamp):
    
        latch = {
            'current': new_current,
            'next': new_next,
            'this': db,
            'pending': False,
            'failure': False,
            'timestamp': timestamp,
            'update-started': update_timestamp
        }
        r.set('db:latch', json.dumps(latch))
    
    
    def latch_db(config):
        db_ids = config['redis-databases']
        db_ids = sorted(set(db_ids))
    
        r = get_next_redis(config)
        latch = get_latch(r)
        if not latch:
            latch = {
                'current': db_ids[0],
                'next': db_ids[0]
            }
    
        next_idx = db_ids.index(latch['next'])
        next_idx = (next_idx + 1) % len(db_ids)
    
        set_latch(
            config,
            new_current=latch['next'],
            new_next=db_ids[next_idx],
            timestamp=time.time(),
            update_timestamp=latch.get('update-started', 0))
    
    
    def _get_redis(config, dbid=None):
    
        if dbid is None:
            dbid = min(config['redis-databases'])
            logger.debug(
                '_get_redis[1], no db specified, '
                'using minimum as first guess: {}'.format(dbid))
    
        if dbid not in config['redis-databases']:
            logger.error(
                '_get_redis[2], tried to connect to unknown db id: {}, '
                'using minimum: {}'.format(dbid, min(config['redis-databases'])))
            dbid = min(config['redis-databases'])
    
        kwargs = {
            'db': dbid
        }
    
        if 'sentinel' in config:
            sentinel_kwargs = {
                'password': config['sentinel'].get('password'),
                'socket_timeout': config['sentinel'].get(
                    'redis_socket_timeout', DEFAULT_REDIS_SENTINEL_TIMEOUT),
            }
            sentinel = redis.sentinel.Sentinel([(
                config['sentinel']['hostname'],
                config['sentinel']['port'])],
                **kwargs)
            return sentinel.master_for(
                config['sentinel']['name'], **sentinel_kwargs)
        else:
            redis_kwargs = {
                'password': config['redis'].get('password'),
                'socket_timeout': config['redis'].get(
                    'socket_timeout', DEFAULT_REDIS_SENTINEL_TIMEOUT),
            }
            return redis.StrictRedis(
                host=config['redis']['hostname'],
                port=config['redis']['port'],
                **redis_kwargs)
    
    
    def get_current_redis(config):
        r = _get_redis(config)
        latch = get_latch(r)
        if not latch:
            logger.warning("can't determine current db")
            return r
        if latch['this'] == latch['current']:
            logger.debug(
                'get_current_redis[1], using first latch: {}'.format(latch))
            return r
        else:
            logger.debug(
                'get_current_redis[2], using current latch: {}'.format(latch))
            return _get_redis(config, latch['current'])
    
    
    def get_next_redis(config):
        r = _get_redis(config)
        latch = get_latch(r)
        if latch and latch['this'] == latch['next']:
            logger.debug('get_next_redis[1], using first latch: {}'.format(latch))
            return r
    
        if latch and latch['next'] in config['redis-databases']:
            logger.debug('get_next_redis[2], using next latch: {}'.format(latch))
            next_id = latch['next']
        else:
            db_ids = sorted(set(config['redis-databases']))
            next_id = db_ids[1 % len(db_ids)]
            logger.debug(
                'get_next_redis[3], next db not configured, '
                'derived next id: {}'.format(next_id))
    
        return _get_redis(config, next_id)
    
    
    def ims_sorted_service_type_key(service_type):
        """
        special-purpose function used for mapping IMS service type strings
        to the key used in redis ('ims:services:*')
    
        this method is only used by
        :meth:`inventory_provider:tasks:worker.update_circuit_hierarchy_and_port_id_services
        and :meth:`inventory_provider:routes.poller.get_service_types
    
        :param service_type: IMS-formatted service_type string
        :return: ims sorted service type redis key
        """  # noqa: E501
    
        service_type = re.sub(
            r'[^a-zA-Z0-9]+', '_', service_type.lower())
        # prettification ... e.g. no trailing _ for 'MD-VPN (NATIVE)'
        service_type = re.sub('_+$', '', service_type)
        return re.sub('^_+', '', service_type)