diff --git a/inventory_provider/routes/jobs.py b/inventory_provider/routes/jobs.py index c46064366b53cbfc5a966d5c02b650490cb20c28..9a8080c87a48174a27fcc50935b585ed738961a7 100644 --- a/inventory_provider/routes/jobs.py +++ b/inventory_provider/routes/jobs.py @@ -32,8 +32,10 @@ def update(): status=503, mimetype="text/html") - phase2_task_id = worker.launch_refresh_cache_all( - current_app.config["INVENTORY_PROVIDER_CONFIG"]) + phase2_task_id = worker.launch_refresh_cache_all(config) + + r = common.get_current_redis() + r.set('classifier-cache:update-task-id', phase2_task_id.encode('utf-8')) return jsonify({'task id': phase2_task_id}) @@ -47,4 +49,19 @@ def reload_router_config(equipment_name): @routes.route("check-task-status/<task_id>", methods=['GET', 'POST']) @common.require_accepts_json def check_task_status(task_id): - return jsonify(worker.check_task_status(task_id)) + return jsonify(list(worker.check_task_status(task_id))) + + +@routes.route("check-update-status", methods=['GET', 'POST']) +@common.require_accepts_json +def check_update_status(): + r = common.get_current_redis() + task_id = r.get('classifier-cache:update-task-id') + if not task_id: + return Response( + response='no pending update task found', + status=404, + mimetype="text/html") + + task_id = task_id.decode('utf-8') + return jsonify(list(worker.check_task_status(task_id))) diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index 132dc96fb4efcea521188710209eec7cbc54be46..bab1b31db43b5ff78728733d0c24568390c517bc 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -584,6 +584,8 @@ def launch_refresh_cache_all(config): update_latch_status(config, pending=True) + # TODO: [DBOARD3-242] catch exceptions & reset latch status + # first batch of subtasks: refresh cached opsdb data subtasks = [ update_neteng_managed_device_list.apply_async(), @@ -608,17 +610,23 @@ def _wait_for_tasks(task_ids, update_callback=lambda s: None): update_callback('waiting for tasks to complete: %r' % task_ids) time.sleep(FINALIZER_POLLING_FREQUENCY_S) - def _is_error(id): - status = check_task_status(id) - return status['ready'] and not status['success'] + def _task_and_children_result(id): + tasks = list(check_task_status(id)) + return { + 'error': any([t['ready'] and not t['success'] for t in tasks]), + 'ready': all([t['ready'] for t in tasks]) + } + + results = dict([ + (id, _task_and_children_result(id)) + for id in task_ids]) - if any([_is_error(id) for id in task_ids]): + if any([t['error'] for t in results.values()]): all_successful = False task_ids = [ - id for id in task_ids - if not check_task_status(id)['ready'] - ] + id for id, status in results.items() + if not status['ready']] if task_ids: raise InventoryTaskError( @@ -627,7 +635,7 @@ def _wait_for_tasks(task_ids, update_callback=lambda s: None): raise InventoryTaskError( 'some tasks finished with an error') - update_callback('pending taskscompleted in {} seconds'.format( + update_callback('pending tasks completed in {} seconds'.format( time.time() - start_time)) @@ -721,15 +729,19 @@ def _build_subnet_db(update_callback=lambda s: None): rp.execute() -def check_task_status(task_id): +def check_task_status(task_id, parent=None, forget=False): r = AsyncResult(task_id, app=app) + assert r.id == task_id # sanity + result = { - 'id': r.id, + 'id': task_id, 'status': r.status, 'exception': r.status in states.EXCEPTION_STATES, 'ready': r.status in states.READY_STATES, 'success': r.status == states.SUCCESS, + 'parent': parent } + # TODO: only discovered this case by testing, is this the only one? # ... otherwise need to pre-test json serialization if isinstance(r.result, Exception): @@ -739,4 +751,26 @@ def check_task_status(task_id): } else: result['result'] = r.result - return result + + def child_taskids(children): + # reverse-engineered, can't find documentation on this + for child in children: + if not child: + continue + if isinstance(child, list): + logger.debug(f'list: {child}') + yield from child_taskids(child) + continue + if isinstance(child, str): + yield child + continue + assert isinstance(child, AsyncResult) + yield child.id + + for child_id in child_taskids(getattr(r, 'children', []) or []): + yield from check_task_status(child_id, parent=task_id) + + if forget and result['ready']: + r.forget() + + yield result diff --git a/test/test_job_routes.py b/test/test_job_routes.py index 7f1a20f1ebd70703116abb8c4f7d9ed9ddeade37..4f2b473c3abe34cc5c0744e0b48cde14c96522eb 100644 --- a/test/test_job_routes.py +++ b/test/test_job_routes.py @@ -1,7 +1,8 @@ import json import jsonschema -from inventory_provider.tasks.common import DB_LATCH_SCHEMA +from inventory_provider.tasks.common import _get_redis, DB_LATCH_SCHEMA + DEFAULT_REQUEST_HEADERS = { "Content-type": "application/json", "Accept": ["application/json"] @@ -19,20 +20,39 @@ TASK_ID_RESPONSE_SCHEMA = { TASK_STATUS_SCHEMA = { "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "id": {"type": "string"}, - "status": {"type": "string"}, - "exception": {"type": "boolean"}, - "ready": {"type": "boolean"}, - "success": {"type": "boolean"}, - "result": {"type": ["object", "null"]} + "definitions": { + "task": { + "type": "object", + "properties": { + "id": {"type": "string"}, + "status": {"type": "string"}, + "exception": {"type": "boolean"}, + "ready": {"type": "boolean"}, + "success": {"type": "boolean"}, + "result": {"type": ["object", "null"]}, + "parent": {"type": ["string", "null"]} + }, + "required": [ + "id", "status", "exception", "ready", "success", "parent"], + "additionalProperties": False + } }, - "required": ["id", "status", "exception", "ready", "success"], - "additionalProperties": False + + "type": "array", + "items": {"$ref": "#/definitions/task"} } +def backend_db(): + return _get_redis({ + 'redis': { + 'hostname': None, + 'port': None + }, + 'redis-databases': [0, 7] + }).db + + def test_job_update_all(client, mocker): expected_task_id = 'xyz@123#456' launch_refresh_cache_all = mocker.patch( @@ -47,6 +67,10 @@ def test_job_update_all(client, mocker): jsonschema.validate(refresh_task_response, TASK_ID_RESPONSE_SCHEMA) assert refresh_task_response['task id'] == expected_task_id + db = backend_db() + assert db['classifier-cache:update-task-id'] \ + == expected_task_id.encode('utf-8') + def test_job_update_force_pending(client, mocker): expected_task_id = 'asf#asdf%111' @@ -123,6 +147,42 @@ def test_reload_router_config(client, mocker): assert refresh_task_response['task id'] == 'bogus task id' +def test_check_update_status(client, mocker): + + db = backend_db() + db['classifier-cache:update-task-id'] = 'zz55' + + mocker.patch( + 'inventory_provider.tasks.worker.AsyncResult', MockedAsyncResult) + MockedAsyncResult.status = 'SUCCESS' # celery.states.SUCCESS + MockedAsyncResult.result = {'absab': 1, 'def': 'aaabbb'} + + rv = client.post( + 'jobs/check-update-status', + headers=DEFAULT_REQUEST_HEADERS) + assert rv.status_code == 200 + result = json.loads(rv.data.decode('utf-8')) + jsonschema.validate(result, TASK_STATUS_SCHEMA) + for status in result: + assert status['id'] == 'zz55' + assert status['status'] == 'SUCCESS' + assert not status['exception'] + assert status['ready'] + assert status['success'] + assert 'result' in status + + +def test_check_update_status_404(client): + + db = backend_db() + db.pop('classifier-cache:update-task-id', None) + + rv = client.post( + 'jobs/check-update-status', + headers=DEFAULT_REQUEST_HEADERS) + assert rv.status_code == 404 + + def test_check_task_status_success(client, mocker): mocker.patch( 'inventory_provider.tasks.worker.AsyncResult', MockedAsyncResult) @@ -133,14 +193,15 @@ def test_check_task_status_success(client, mocker): 'jobs/check-task-status/abc', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 - status = json.loads(rv.data.decode('utf-8')) - jsonschema.validate(status, TASK_STATUS_SCHEMA) - assert status['id'] == 'abc' - assert status['status'] == 'SUCCESS' - assert not status['exception'] - assert status['ready'] - assert status['success'] - assert 'result' in status + result = json.loads(rv.data.decode('utf-8')) + jsonschema.validate(result, TASK_STATUS_SCHEMA) + for status in result: + assert status['id'] == 'abc' + assert status['status'] == 'SUCCESS' + assert not status['exception'] + assert status['ready'] + assert status['success'] + assert 'result' in status def test_check_task_status_custom_status(client, mocker): @@ -153,13 +214,14 @@ def test_check_task_status_custom_status(client, mocker): 'jobs/check-task-status/xyz', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 - status = json.loads(rv.data.decode('utf-8')) - jsonschema.validate(status, TASK_STATUS_SCHEMA) - assert status['id'] == 'xyz' - assert status['status'] == 'custom' - assert not status['exception'] - assert not status['ready'] - assert not status['success'] + result = json.loads(rv.data.decode('utf-8')) + jsonschema.validate(result, TASK_STATUS_SCHEMA) + for status in result: + assert status['id'] == 'xyz' + assert status['status'] == 'custom' + assert not status['exception'] + assert not status['ready'] + assert not status['success'] def test_check_task_status_exception(client, mocker): @@ -172,15 +234,16 @@ def test_check_task_status_exception(client, mocker): 'jobs/check-task-status/123-xyz.ABC', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 - status = json.loads(rv.data.decode('utf-8')) - jsonschema.validate(status, TASK_STATUS_SCHEMA) - assert status['id'] == '123-xyz.ABC' - assert status['status'] == 'FAILURE' - assert status['exception'] - assert status['ready'] - assert not status['success'] - assert status['result']['error type'] == 'AssertionError' - assert status['result']['message'] == 'test error message' + result = json.loads(rv.data.decode('utf-8')) + jsonschema.validate(result, TASK_STATUS_SCHEMA) + for status in result: + assert status['id'] == '123-xyz.ABC' + assert status['status'] == 'FAILURE' + assert status['exception'] + assert status['ready'] + assert not status['success'] + assert status['result']['error type'] == 'AssertionError' + assert status['result']['message'] == 'test error message' def test_latchdb(client, mocked_redis):