diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index 132dc96fb4efcea521188710209eec7cbc54be46..91a7aeb2cff67eea44a7a655d6976b774f9f4110 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -1,3 +1,4 @@ +import itertools import json import logging import os @@ -608,17 +609,23 @@ def _wait_for_tasks(task_ids, update_callback=lambda s: None): update_callback('waiting for tasks to complete: %r' % task_ids) time.sleep(FINALIZER_POLLING_FREQUENCY_S) - def _is_error(id): - status = check_task_status(id) - return status['ready'] and not status['success'] + def _task_and_children_result(id): + tasks = list(check_task_status(id)) + return { + 'error': any([t['ready'] and not t['success'] for t in tasks]), + 'ready': all([t['ready'] for t in tasks]) + } - if any([_is_error(id) for id in task_ids]): + results = dict([ + (id, _task_and_children_result(id)) + for id in task_ids]) + + if any([t['error'] for t in results.values()]): all_successful = False task_ids = [ - id for id in task_ids - if not check_task_status(id)['ready'] - ] + id for id, status in results.items() + if not status['ready']] if task_ids: raise InventoryTaskError( @@ -627,7 +634,7 @@ def _wait_for_tasks(task_ids, update_callback=lambda s: None): raise InventoryTaskError( 'some tasks finished with an error') - update_callback('pending taskscompleted in {} seconds'.format( + update_callback('pending tasks completed in {} seconds'.format( time.time() - start_time)) @@ -721,15 +728,19 @@ def _build_subnet_db(update_callback=lambda s: None): rp.execute() -def check_task_status(task_id): +def check_task_status(task_id, parent=None, forget=False): r = AsyncResult(task_id, app=app) + assert r.id == task_id # sanity + result = { - 'id': r.id, + 'id': task_id, 'status': r.status, 'exception': r.status in states.EXCEPTION_STATES, 'ready': r.status in states.READY_STATES, 'success': r.status == states.SUCCESS, + 'parent': parent } + # TODO: only discovered this case by testing, is this the only one? # ... otherwise need to pre-test json serialization if isinstance(r.result, Exception): @@ -739,4 +750,26 @@ def check_task_status(task_id): } else: result['result'] = r.result - return result + + def child_taskids(children): + # reverse-engineered, can't find documentation on this + for child in children: + if not child: + continue + if isinstance(child, list): + logger.debug(f'list: {child}') + yield from child_taskids(child) + continue + if isinstance(child, str): + yield child + continue + assert isinstance(child, AsyncResult) + yield child.id + + for child_id in child_taskids(getattr(r, 'children', []) or []): + yield from check_task_status(child_id, parent=task_id) + + if forget and result['ready']: + r.forget() + + yield result