From e674e20b2b70849e9546a47352d22a02bc786ee0 Mon Sep 17 00:00:00 2001 From: Erik Reid <erik.reid@geant.org> Date: Sat, 12 Oct 2019 18:10:10 +0200 Subject: [PATCH] added pending & failure to latch struct --- inventory_provider/tasks/common.py | 22 ++++++++++++++++++++-- inventory_provider/tasks/worker.py | 24 +++++++++++++++++------- 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/inventory_provider/tasks/common.py b/inventory_provider/tasks/common.py index 1ded6cfe..f4492f61 100644 --- a/inventory_provider/tasks/common.py +++ b/inventory_provider/tasks/common.py @@ -13,7 +13,9 @@ DB_LATCH_SCHEMA = { "properties": { "current": {"type": "integer"}, "next": {"type": "integer"}, - "this": {"type": "integer"} + "this": {"type": "integer"}, + "pending": {"type": "boolean"}, + "failure": {"type": "boolean"} }, "required": ["current", "next", "this"], "additionalProperties": False @@ -35,6 +37,20 @@ def get_latch(r): return latch +def update_latch_status(config, pending=False, failure=False): + logger.debug('updating latch status: pending={}, failure={}'.format( + pending, failure)) + + for db in config['redis-databases']: + r = _get_redis(config, dbid=db) + latch = get_latch(r) + if not latch: + continue + latch['pending'] = pending + latch['failure'] = failure + r.set('db:latch', json.dumps(latch)) + + def set_latch(config, new_current, new_next): logger.debug('setting latch: new current={}, new next={}'.format( @@ -44,7 +60,9 @@ def set_latch(config, new_current, new_next): latch = { 'current': new_current, 'next': new_next, - 'this': db + 'this': db, + 'pending': False, + 'failure': False } r = _get_redis(config, dbid=db) diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index 39832cb6..59169ee9 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -13,7 +13,7 @@ import jsonschema from inventory_provider.tasks.app import app from inventory_provider.tasks.common \ - import get_next_redis, latch_db, get_latch, set_latch + import get_next_redis, latch_db, get_latch, set_latch, update_latch_status from inventory_provider import config from inventory_provider import environment from inventory_provider.db import db, opsdb @@ -473,6 +473,8 @@ def launch_refresh_cache_all(config): """ _erase_next_db(config) + update_latch_status(config, pending=True) + # first batch of subtasks: refresh cached opsdb data subtasks = [ update_junosspace_device_list.apply_async(), @@ -556,14 +558,22 @@ def refresh_finalizer(self, pending_task_ids_json): 'message': s }) - task_ids = json.loads(pending_task_ids_json) - logger.debug('task_ids: %r' % task_ids) - jsonschema.validate(task_ids, input_schema) - _wait_for_tasks(task_ids, update_callback=_update) - _build_subnet_db(update_callback=_update) + try: + task_ids = json.loads(pending_task_ids_json) + logger.debug('task_ids: %r' % task_ids) + jsonschema.validate(task_ids, input_schema) + + _wait_for_tasks(task_ids, update_callback=_update) + _build_subnet_db(update_callback=_update) + + except (jsonschema.ValidationError, + json.JSONDecodeError, + InventoryTaskError) as e: + update_latch_status(InventoryTask.config, failure=True) + raise e - _update('latching current/next dbs') latch_db(InventoryTask.config) + _update('latched current/next dbs') logger.debug('<<< refresh_finalizer') -- GitLab