diff --git a/inventory_provider/routes/common.py b/inventory_provider/routes/common.py index fa172830bf612590cbc86f0f1d10945428390b8b..1d00f9318a56d1c9ff21b99fc3f216cfe660357f 100644 --- a/inventory_provider/routes/common.py +++ b/inventory_provider/routes/common.py @@ -2,16 +2,33 @@ import functools import logging from flask import request, Response, current_app, g -from inventory_provider.tasks.common import get_redis as tasks_get_redis +from inventory_provider.tasks import common as tasks_common logger = logging.getLogger(__name__) -def get_redis(): - if 'redis_db' not in g: - config = current_app.config['INVENTORY_PROVIDER_CONFIG'] - g.redis_db = tasks_get_redis(config) - return g.redis_db +def get_current_redis(): + if 'current_redis_db' in g: + latch = tasks_common.get_latch(g.current_redis_db) + if latch and latch['current'] == latch['this']: + return g.current_redis_db + logger.warning('switching to current redis db') + + config = current_app.config['INVENTORY_PROVIDER_CONFIG'] + g.current_redis_db = tasks_common.get_current_redis(config) + return g.current_redis_db + + +def get_next_redis(): + if 'next_redis_db' in g: + latch = tasks_common.get_latch(g.next_redis_db) + if latch and latch['next'] == latch['this']: + return g.next_redis_db + logger.warning('switching to next redis db') + + config = current_app.config['INVENTORY_PROVIDER_CONFIG'] + g.next_redis_db = tasks_common.get_next_redis(config) + return g.next_redis_db def require_accepts_json(f): diff --git a/inventory_provider/tasks/common.py b/inventory_provider/tasks/common.py index a774e96a3c61ece38fc71bea8973b59ff468b3eb..4f32d0e2eecf3d52a4f3f8d32e7ed8576f73b07b 100644 --- a/inventory_provider/tasks/common.py +++ b/inventory_provider/tasks/common.py @@ -1,9 +1,39 @@ +import json import logging + +import jsonschema import redis import redis.sentinel logger = logging.getLogger(__name__) +DB_LATCH_SCHEMA = { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "current": {"type": "integer"}, + "next": {"type": "integer"}, + "this": {"type": "integer"} + }, + "required": ["current", "next", "this"], + "additionalProperties": False +} + + +def get_latch(r): + latch = r.get('db:latch') + if latch is None: + logger.error('no latch key found in db') + return None + try: + latch = json.loads(latch.decode('utf-8')) + jsonschema.validate(latch, DB_LATCH_SCHEMA) + except (jsonschema.ValidationError, json.JSONDecodeError): + logging.exception('error validating latch value') + return None + + return latch + def _get_redis(config, dbid=None): @@ -27,35 +57,35 @@ def _get_redis(config, dbid=None): config['sentinel']['hostname'], config['sentinel']['port'])], **kwargs) - return dbid, sentinel.master_for( + return sentinel.master_for( config['sentinel']['name'], socket_timeout=0.1) else: - return dbid, redis.StrictRedis( + return redis.StrictRedis( host=config['redis']['hostname'], port=config['redis']['port'], **kwargs) -def _get_specific_redis(config, db_id_key): - db, r = _get_redis(config) - required_db = r.get(db_id_key) - if required_db is None: - logger.warning('can''t determine current db, using: {}'.format(db)) +def get_current_redis(config): + r = _get_redis(config) + latch = get_latch(r) + if not latch: + logger.warning("can't determine current db") return r - - required_db = int(required_db.decode('utf-8')) - if db == required_db: + if latch['this'] == latch['current']: return r - - db, r = _get_redis(config, required_db) - assert db == required_db - return r - - -def get_current_redis(config): - return _get_specific_redis(config, 'db:current') + else: + return _get_redis(config, latch['current']) def get_next_redis(config): - return _get_specific_redis(config, 'db:current') + r = _get_redis(config) + latch = get_latch(r) + if not latch: + logger.warning("can't determine next db") + return r + if latch['this'] == latch['next']: + return r + else: + return _get_redis(config, latch['current'])