Skip to content
Snippets Groups Projects
Commit 4ecc9abc authored by Erik Reid's avatar Erik Reid
Browse files

Finished feature DBOARD3-244-make-detailed-subtask-results-available.

parents 364ebb9b 5172e785
No related branches found
Tags 0.19
No related merge requests found
......@@ -32,8 +32,10 @@ def update():
status=503,
mimetype="text/html")
phase2_task_id = worker.launch_refresh_cache_all(
current_app.config["INVENTORY_PROVIDER_CONFIG"])
phase2_task_id = worker.launch_refresh_cache_all(config)
r = common.get_current_redis()
r.set('classifier-cache:update-task-id', phase2_task_id.encode('utf-8'))
return jsonify({'task id': phase2_task_id})
......@@ -47,4 +49,19 @@ def reload_router_config(equipment_name):
@routes.route("check-task-status/<task_id>", methods=['GET', 'POST'])
@common.require_accepts_json
def check_task_status(task_id):
return jsonify(worker.check_task_status(task_id))
return jsonify(list(worker.check_task_status(task_id)))
@routes.route("check-update-status", methods=['GET', 'POST'])
@common.require_accepts_json
def check_update_status():
r = common.get_current_redis()
task_id = r.get('classifier-cache:update-task-id')
if not task_id:
return Response(
response='no pending update task found',
status=404,
mimetype="text/html")
task_id = task_id.decode('utf-8')
return jsonify(list(worker.check_task_status(task_id)))
......@@ -584,6 +584,8 @@ def launch_refresh_cache_all(config):
update_latch_status(config, pending=True)
# TODO: [DBOARD3-242] catch exceptions & reset latch status
# first batch of subtasks: refresh cached opsdb data
subtasks = [
update_neteng_managed_device_list.apply_async(),
......@@ -608,17 +610,23 @@ def _wait_for_tasks(task_ids, update_callback=lambda s: None):
update_callback('waiting for tasks to complete: %r' % task_ids)
time.sleep(FINALIZER_POLLING_FREQUENCY_S)
def _is_error(id):
status = check_task_status(id)
return status['ready'] and not status['success']
def _task_and_children_result(id):
tasks = list(check_task_status(id))
return {
'error': any([t['ready'] and not t['success'] for t in tasks]),
'ready': all([t['ready'] for t in tasks])
}
results = dict([
(id, _task_and_children_result(id))
for id in task_ids])
if any([_is_error(id) for id in task_ids]):
if any([t['error'] for t in results.values()]):
all_successful = False
task_ids = [
id for id in task_ids
if not check_task_status(id)['ready']
]
id for id, status in results.items()
if not status['ready']]
if task_ids:
raise InventoryTaskError(
......@@ -627,7 +635,7 @@ def _wait_for_tasks(task_ids, update_callback=lambda s: None):
raise InventoryTaskError(
'some tasks finished with an error')
update_callback('pending taskscompleted in {} seconds'.format(
update_callback('pending tasks completed in {} seconds'.format(
time.time() - start_time))
......@@ -721,15 +729,19 @@ def _build_subnet_db(update_callback=lambda s: None):
rp.execute()
def check_task_status(task_id):
def check_task_status(task_id, parent=None, forget=False):
r = AsyncResult(task_id, app=app)
assert r.id == task_id # sanity
result = {
'id': r.id,
'id': task_id,
'status': r.status,
'exception': r.status in states.EXCEPTION_STATES,
'ready': r.status in states.READY_STATES,
'success': r.status == states.SUCCESS,
'parent': parent
}
# TODO: only discovered this case by testing, is this the only one?
# ... otherwise need to pre-test json serialization
if isinstance(r.result, Exception):
......@@ -739,4 +751,26 @@ def check_task_status(task_id):
}
else:
result['result'] = r.result
return result
def child_taskids(children):
# reverse-engineered, can't find documentation on this
for child in children:
if not child:
continue
if isinstance(child, list):
logger.debug(f'list: {child}')
yield from child_taskids(child)
continue
if isinstance(child, str):
yield child
continue
assert isinstance(child, AsyncResult)
yield child.id
for child_id in child_taskids(getattr(r, 'children', []) or []):
yield from check_task_status(child_id, parent=task_id)
if forget and result['ready']:
r.forget()
yield result
import json
import jsonschema
from inventory_provider.tasks.common import DB_LATCH_SCHEMA
from inventory_provider.tasks.common import _get_redis, DB_LATCH_SCHEMA
DEFAULT_REQUEST_HEADERS = {
"Content-type": "application/json",
"Accept": ["application/json"]
......@@ -19,20 +20,39 @@ TASK_ID_RESPONSE_SCHEMA = {
TASK_STATUS_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {"type": "string"},
"status": {"type": "string"},
"exception": {"type": "boolean"},
"ready": {"type": "boolean"},
"success": {"type": "boolean"},
"result": {"type": ["object", "null"]}
"definitions": {
"task": {
"type": "object",
"properties": {
"id": {"type": "string"},
"status": {"type": "string"},
"exception": {"type": "boolean"},
"ready": {"type": "boolean"},
"success": {"type": "boolean"},
"result": {"type": ["object", "null"]},
"parent": {"type": ["string", "null"]}
},
"required": [
"id", "status", "exception", "ready", "success", "parent"],
"additionalProperties": False
}
},
"required": ["id", "status", "exception", "ready", "success"],
"additionalProperties": False
"type": "array",
"items": {"$ref": "#/definitions/task"}
}
def backend_db():
return _get_redis({
'redis': {
'hostname': None,
'port': None
},
'redis-databases': [0, 7]
}).db
def test_job_update_all(client, mocker):
expected_task_id = 'xyz@123#456'
launch_refresh_cache_all = mocker.patch(
......@@ -47,6 +67,10 @@ def test_job_update_all(client, mocker):
jsonschema.validate(refresh_task_response, TASK_ID_RESPONSE_SCHEMA)
assert refresh_task_response['task id'] == expected_task_id
db = backend_db()
assert db['classifier-cache:update-task-id'] \
== expected_task_id.encode('utf-8')
def test_job_update_force_pending(client, mocker):
expected_task_id = 'asf#asdf%111'
......@@ -123,6 +147,42 @@ def test_reload_router_config(client, mocker):
assert refresh_task_response['task id'] == 'bogus task id'
def test_check_update_status(client, mocker):
db = backend_db()
db['classifier-cache:update-task-id'] = 'zz55'
mocker.patch(
'inventory_provider.tasks.worker.AsyncResult', MockedAsyncResult)
MockedAsyncResult.status = 'SUCCESS' # celery.states.SUCCESS
MockedAsyncResult.result = {'absab': 1, 'def': 'aaabbb'}
rv = client.post(
'jobs/check-update-status',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
result = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(result, TASK_STATUS_SCHEMA)
for status in result:
assert status['id'] == 'zz55'
assert status['status'] == 'SUCCESS'
assert not status['exception']
assert status['ready']
assert status['success']
assert 'result' in status
def test_check_update_status_404(client):
db = backend_db()
db.pop('classifier-cache:update-task-id', None)
rv = client.post(
'jobs/check-update-status',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 404
def test_check_task_status_success(client, mocker):
mocker.patch(
'inventory_provider.tasks.worker.AsyncResult', MockedAsyncResult)
......@@ -133,14 +193,15 @@ def test_check_task_status_success(client, mocker):
'jobs/check-task-status/abc',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
status = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(status, TASK_STATUS_SCHEMA)
assert status['id'] == 'abc'
assert status['status'] == 'SUCCESS'
assert not status['exception']
assert status['ready']
assert status['success']
assert 'result' in status
result = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(result, TASK_STATUS_SCHEMA)
for status in result:
assert status['id'] == 'abc'
assert status['status'] == 'SUCCESS'
assert not status['exception']
assert status['ready']
assert status['success']
assert 'result' in status
def test_check_task_status_custom_status(client, mocker):
......@@ -153,13 +214,14 @@ def test_check_task_status_custom_status(client, mocker):
'jobs/check-task-status/xyz',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
status = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(status, TASK_STATUS_SCHEMA)
assert status['id'] == 'xyz'
assert status['status'] == 'custom'
assert not status['exception']
assert not status['ready']
assert not status['success']
result = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(result, TASK_STATUS_SCHEMA)
for status in result:
assert status['id'] == 'xyz'
assert status['status'] == 'custom'
assert not status['exception']
assert not status['ready']
assert not status['success']
def test_check_task_status_exception(client, mocker):
......@@ -172,15 +234,16 @@ def test_check_task_status_exception(client, mocker):
'jobs/check-task-status/123-xyz.ABC',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
status = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(status, TASK_STATUS_SCHEMA)
assert status['id'] == '123-xyz.ABC'
assert status['status'] == 'FAILURE'
assert status['exception']
assert status['ready']
assert not status['success']
assert status['result']['error type'] == 'AssertionError'
assert status['result']['message'] == 'test error message'
result = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(result, TASK_STATUS_SCHEMA)
for status in result:
assert status['id'] == '123-xyz.ABC'
assert status['status'] == 'FAILURE'
assert status['exception']
assert status['ready']
assert not status['success']
assert status['result']['error type'] == 'AssertionError'
assert status['result']['message'] == 'test error message'
def test_latchdb(client, mocked_redis):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment