From 47fab4cb97caf9acd1e882710b149c6532d75a38 Mon Sep 17 00:00:00 2001 From: Erik Reid <erik.reid@geant.org> Date: Thu, 18 Jul 2019 16:38:59 +0200 Subject: [PATCH] latch dbs after finalizing --- inventory_provider/routes/jobs.py | 19 +------------ inventory_provider/tasks/common.py | 18 ++++++++++++ inventory_provider/tasks/worker.py | 44 ++++++++++++++++++------------ 3 files changed, 45 insertions(+), 36 deletions(-) diff --git a/inventory_provider/routes/jobs.py b/inventory_provider/routes/jobs.py index 45974117..1782218a 100644 --- a/inventory_provider/routes/jobs.py +++ b/inventory_provider/routes/jobs.py @@ -41,24 +41,7 @@ def check_task_status(task_id): @routes.route("latchdb", methods=['GET', 'POST']) def latch_db(): - config = current_app.config["INVENTORY_PROVIDER_CONFIG"] - db_ids = config['redis-databases'] - db_ids = sorted(set(db_ids)) - - r = worker_common.get_next_redis(config) - latch = worker_common.get_latch(r) - if not latch: - latch = { - 'current': db_ids[0], - 'next': db_ids[0] - } - - next_idx = db_ids.index(latch['next']) - next_idx = (next_idx + 1) % len(db_ids) - - worker_common.set_latch( - config, new_current=latch['next'], new_next=db_ids[next_idx]) - + worker_common.latch_db(config) r = worker_common.get_current_redis(config) return jsonify(worker_common.get_latch(r)) diff --git a/inventory_provider/tasks/common.py b/inventory_provider/tasks/common.py index 956811ac..2aedf8de 100644 --- a/inventory_provider/tasks/common.py +++ b/inventory_provider/tasks/common.py @@ -48,6 +48,24 @@ def set_latch(config, new_current, new_next): r.set('db:latch', json.dumps(latch)) +def latch_db(config): + db_ids = config['redis-databases'] + db_ids = sorted(set(db_ids)) + + r = get_next_redis(config) + latch = get_latch(r) + if not latch: + latch = { + 'current': db_ids[0], + 'next': db_ids[0] + } + + next_idx = db_ids.index(latch['next']) + next_idx = (next_idx + 1) % len(db_ids) + + set_latch(config, new_current=latch['next'], new_next=db_ids[next_idx]) + + def _get_redis(config, dbid=None): if dbid is None: diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index 054bc5ab..23b725d7 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -12,7 +12,7 @@ from lxml import etree import jsonschema from inventory_provider.tasks.app import app -from inventory_provider.tasks.common import get_next_redis +from inventory_provider.tasks.common import get_next_redis, latch_db from inventory_provider import config from inventory_provider import environment from inventory_provider.db import db, opsdb @@ -464,13 +464,9 @@ def launch_refresh_cache_all(config): subtasks = [ update_equipment_locations.apply_async(), ] - # for hostname in _derive_router_hostnames(config): - hostnames = _derive_router_hostnames(config) - hostnames = list(hostnames)[:2] - logger.error("HOSTNAMES: %r" % hostnames) - # for hostname in hostnames: - # logger.debug('queueing router refresh jobs for %r' % hostname) - # subtasks.append(reload_router_config.apply_async(args=[hostname])) + for hostname in _derive_router_hostnames(config): + logger.debug('queueing router refresh jobs for %r' % hostname) + subtasks.append(reload_router_config.apply_async(args=[hostname])) pending_task_ids = [x.id for x in subtasks] @@ -479,12 +475,10 @@ def launch_refresh_cache_all(config): return pending_task_ids -def _wait_for_tasks(task_ids): - logger = logging.getLogger(__name__) - +def _wait_for_tasks(task_ids, update_callback=lambda s: None): start_time = time.time() while task_ids and time.time() - start_time < FINALIZER_TIMEOUT_S: - logger.debug('waiting for tasks to complete: %r', task_ids) + update_callback('waiting for tasks to complete: %r' % task_ids) time.sleep(FINALIZER_POLLING_FREQUENCY_S) task_ids = [ id for id in task_ids @@ -495,9 +489,8 @@ def _wait_for_tasks(task_ids): raise InventoryTaskError( 'timeout waiting for pending tasks to complete') - logger.debug( - 'previous tasks completed in {} seconds'.format( - time.time - start_time)) + update_callback('pending taskscompleted in {} seconds'.format( + time.time() - start_time)) @app.task(base=InventoryTask, bind=True) @@ -512,19 +505,32 @@ def refresh_finalizer(self, pending_task_ids_json): "items": {"type": "string"} } + def _update(s): + logger.debug(s) + self.update_state( + state=states.STARTED, + meta={ + 'task': 'refresh_finalizer', + 'message': s + }) + task_ids = json.loads(pending_task_ids_json) logger.debug('task_ids: %r' % task_ids) jsonschema.validate(task_ids, input_schema) - _wait_for_tasks(task_ids) - _build_subnet_db() + _wait_for_tasks(task_ids, update_callback=_update) + _build_subnet_db(update_callback=_update) + + _update('latching current/next dbs') + latch_db(InventoryTask.config) logger.debug('<<< refresh_finalizer') -def _build_subnet_db(): +def _build_subnet_db(update_callback=lambda s: None): r = get_next_redis(InventoryTask.config) + update_callback('loading all network addresses') subnets = {} for k in r.scan_iter('reverse_interface_addresses:*'): info = r.get(k.decode('utf-8')).decode('utf-8') @@ -535,6 +541,8 @@ def _build_subnet_db(): 'router': info['router'] }) + update_callback('saving {} subnets'.format(len(subnets))) + for k, v in subnets.items(): r.set('subnets:' + k, json.dumps(v)) -- GitLab