import json import logging import jsonschema import redis import redis.sentinel logger = logging.getLogger(__name__) DB_LATCH_SCHEMA = { "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": { "current": {"type": "integer"}, "next": {"type": "integer"}, "this": {"type": "integer"} }, "required": ["current", "next", "this"], "additionalProperties": False } def get_latch(r): latch = r.get('db:latch') if latch is None: logger.error('no latch key found in db') return None try: latch = json.loads(latch.decode('utf-8')) jsonschema.validate(latch, DB_LATCH_SCHEMA) except (jsonschema.ValidationError, json.JSONDecodeError): logging.exception('error validating latch value') return None return latch def set_latch(config, new_current, new_next): logger.debug('setting latch: new current={}, new next={}'.format( new_current, new_next)) for db in config['redis-databases']: latch = { 'current': new_current, 'next': new_next, 'this': db } r = _get_redis(config, dbid=db) r.set('db:latch', json.dumps(latch)) def latch_db(config): db_ids = config['redis-databases'] db_ids = sorted(set(db_ids)) r = get_next_redis(config) latch = get_latch(r) if not latch: latch = { 'current': db_ids[0], 'next': db_ids[0] } next_idx = db_ids.index(latch['next']) next_idx = (next_idx + 1) % len(db_ids) set_latch(config, new_current=latch['next'], new_next=db_ids[next_idx]) def _get_redis(config, dbid=None): if dbid is None: dbid = min(config['redis-databases']) logger.debug( '_get_redis[1], no db specified, ' 'using minimum as first guess: {}'.format(dbid)) if dbid not in config['redis-databases']: logger.error( '_get_redis[2], tried to connect to unknown db id: {}, ' 'using minimum: {}'.format(dbid, min(config['redis-databases']))) dbid = min(config['redis-databases']) kwargs = { 'db': dbid, 'socket_timeout': 0.1 } if 'sentinel' in config: sentinel = redis.sentinel.Sentinel([( config['sentinel']['hostname'], config['sentinel']['port'])], **kwargs) return sentinel.master_for( config['sentinel']['name'], socket_timeout=0.1) else: return redis.StrictRedis( host=config['redis']['hostname'], port=config['redis']['port'], **kwargs) def get_current_redis(config): r = _get_redis(config) latch = get_latch(r) if not latch: logger.warning("can't determine current db") return r if latch['this'] == latch['current']: logger.debug( 'get_current_redis[1], using first latch: {}'.format(latch)) return r else: logger.debug( 'get_current_redis[2], using current latch: {}'.format(latch)) return _get_redis(config, latch['current']) def get_next_redis(config): r = _get_redis(config) latch = get_latch(r) if latch and latch['this'] == latch['next']: logger.debug('get_next_redis[1], using first latch: {}'.format(latch)) return r if latch and latch['next'] in config['redis-databases']: logger.debug('get_next_redis[2], using next latch: {}'.format(latch)) next_id = latch['next'] else: db_ids = sorted(set(config['redis-databases'])) next_id = db_ids[0] if len(db_ids) == 1 else db_ids[1] logger.debug( 'get_next_redis[3], next db not configured, ' 'derived next id: {}'.format(next_id)) return _get_redis(config, next_id)