diff --git a/inventory_provider/tasks/common.py b/inventory_provider/tasks/common.py index 1ded6cfeecd82d7a37eeab4230320a4f2f6b0c8e..f4492f61d3be7dfbd6a4cede91017697b3efc4d0 100644 --- a/inventory_provider/tasks/common.py +++ b/inventory_provider/tasks/common.py @@ -13,7 +13,9 @@ DB_LATCH_SCHEMA = { "properties": { "current": {"type": "integer"}, "next": {"type": "integer"}, - "this": {"type": "integer"} + "this": {"type": "integer"}, + "pending": {"type": "boolean"}, + "failure": {"type": "boolean"} }, "required": ["current", "next", "this"], "additionalProperties": False @@ -35,6 +37,20 @@ def get_latch(r): return latch +def update_latch_status(config, pending=False, failure=False): + logger.debug('updating latch status: pending={}, failure={}'.format( + pending, failure)) + + for db in config['redis-databases']: + r = _get_redis(config, dbid=db) + latch = get_latch(r) + if not latch: + continue + latch['pending'] = pending + latch['failure'] = failure + r.set('db:latch', json.dumps(latch)) + + def set_latch(config, new_current, new_next): logger.debug('setting latch: new current={}, new next={}'.format( @@ -44,7 +60,9 @@ def set_latch(config, new_current, new_next): latch = { 'current': new_current, 'next': new_next, - 'this': db + 'this': db, + 'pending': False, + 'failure': False } r = _get_redis(config, dbid=db) diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index 39832cb6355612bee20572a9eeabb9671c4d8591..59169ee915e47fce9838cbbca19c9a1438231e8e 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -13,7 +13,7 @@ import jsonschema from inventory_provider.tasks.app import app from inventory_provider.tasks.common \ - import get_next_redis, latch_db, get_latch, set_latch + import get_next_redis, latch_db, get_latch, set_latch, update_latch_status from inventory_provider import config from inventory_provider import environment from inventory_provider.db import db, opsdb @@ -473,6 +473,8 @@ def launch_refresh_cache_all(config): """ _erase_next_db(config) + update_latch_status(config, pending=True) + # first batch of subtasks: refresh cached opsdb data subtasks = [ update_junosspace_device_list.apply_async(), @@ -556,14 +558,22 @@ def refresh_finalizer(self, pending_task_ids_json): 'message': s }) - task_ids = json.loads(pending_task_ids_json) - logger.debug('task_ids: %r' % task_ids) - jsonschema.validate(task_ids, input_schema) - _wait_for_tasks(task_ids, update_callback=_update) - _build_subnet_db(update_callback=_update) + try: + task_ids = json.loads(pending_task_ids_json) + logger.debug('task_ids: %r' % task_ids) + jsonschema.validate(task_ids, input_schema) + + _wait_for_tasks(task_ids, update_callback=_update) + _build_subnet_db(update_callback=_update) + + except (jsonschema.ValidationError, + json.JSONDecodeError, + InventoryTaskError) as e: + update_latch_status(InventoryTask.config, failure=True) + raise e - _update('latching current/next dbs') latch_db(InventoryTask.config) + _update('latched current/next dbs') logger.debug('<<< refresh_finalizer')