Skip to content
Snippets Groups Projects
Commit 33270080 authored by Erik Reid's avatar Erik Reid
Browse files

store this/next/current in latch

parent bcd7d179
No related branches found
No related tags found
No related merge requests found
......@@ -2,16 +2,33 @@ import functools
import logging
from flask import request, Response, current_app, g
from inventory_provider.tasks.common import get_redis as tasks_get_redis
from inventory_provider.tasks import common as tasks_common
logger = logging.getLogger(__name__)
def get_redis():
if 'redis_db' not in g:
config = current_app.config['INVENTORY_PROVIDER_CONFIG']
g.redis_db = tasks_get_redis(config)
return g.redis_db
def get_current_redis():
if 'current_redis_db' in g:
latch = tasks_common.get_latch(g.current_redis_db)
if latch and latch['current'] == latch['this']:
return g.current_redis_db
logger.warning('switching to current redis db')
config = current_app.config['INVENTORY_PROVIDER_CONFIG']
g.current_redis_db = tasks_common.get_current_redis(config)
return g.current_redis_db
def get_next_redis():
if 'next_redis_db' in g:
latch = tasks_common.get_latch(g.next_redis_db)
if latch and latch['next'] == latch['this']:
return g.next_redis_db
logger.warning('switching to next redis db')
config = current_app.config['INVENTORY_PROVIDER_CONFIG']
g.next_redis_db = tasks_common.get_next_redis(config)
return g.next_redis_db
def require_accepts_json(f):
......
import json
import logging
import jsonschema
import redis
import redis.sentinel
logger = logging.getLogger(__name__)
DB_LATCH_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"current": {"type": "integer"},
"next": {"type": "integer"},
"this": {"type": "integer"}
},
"required": ["current", "next", "this"],
"additionalProperties": False
}
def get_latch(r):
latch = r.get('db:latch')
if latch is None:
logger.error('no latch key found in db')
return None
try:
latch = json.loads(latch.decode('utf-8'))
jsonschema.validate(latch, DB_LATCH_SCHEMA)
except (jsonschema.ValidationError, json.JSONDecodeError):
logging.exception('error validating latch value')
return None
return latch
def _get_redis(config, dbid=None):
......@@ -27,35 +57,35 @@ def _get_redis(config, dbid=None):
config['sentinel']['hostname'],
config['sentinel']['port'])],
**kwargs)
return dbid, sentinel.master_for(
return sentinel.master_for(
config['sentinel']['name'],
socket_timeout=0.1)
else:
return dbid, redis.StrictRedis(
return redis.StrictRedis(
host=config['redis']['hostname'],
port=config['redis']['port'],
**kwargs)
def _get_specific_redis(config, db_id_key):
db, r = _get_redis(config)
required_db = r.get(db_id_key)
if required_db is None:
logger.warning('can''t determine current db, using: {}'.format(db))
def get_current_redis(config):
r = _get_redis(config)
latch = get_latch(r)
if not latch:
logger.warning("can't determine current db")
return r
required_db = int(required_db.decode('utf-8'))
if db == required_db:
if latch['this'] == latch['current']:
return r
db, r = _get_redis(config, required_db)
assert db == required_db
return r
def get_current_redis(config):
return _get_specific_redis(config, 'db:current')
else:
return _get_redis(config, latch['current'])
def get_next_redis(config):
return _get_specific_redis(config, 'db:current')
r = _get_redis(config)
latch = get_latch(r)
if not latch:
logger.warning("can't determine next db")
return r
if latch['this'] == latch['next']:
return r
else:
return _get_redis(config, latch['current'])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment