Skip to content
Snippets Groups Projects
Commit 5f30c24b authored by Erik Reid's avatar Erik Reid
Browse files

wait on task and children based on id

parent 364ebb9b
No related branches found
No related tags found
No related merge requests found
import itertools
import json
import logging
import os
......@@ -608,17 +609,23 @@ def _wait_for_tasks(task_ids, update_callback=lambda s: None):
update_callback('waiting for tasks to complete: %r' % task_ids)
time.sleep(FINALIZER_POLLING_FREQUENCY_S)
def _is_error(id):
status = check_task_status(id)
return status['ready'] and not status['success']
def _task_and_children_result(id):
tasks = list(check_task_status(id))
return {
'error': any([t['ready'] and not t['success'] for t in tasks]),
'ready': all([t['ready'] for t in tasks])
}
if any([_is_error(id) for id in task_ids]):
results = dict([
(id, _task_and_children_result(id))
for id in task_ids])
if any([t['error'] for t in results.values()]):
all_successful = False
task_ids = [
id for id in task_ids
if not check_task_status(id)['ready']
]
id for id, status in results.items()
if not status['ready']]
if task_ids:
raise InventoryTaskError(
......@@ -627,7 +634,7 @@ def _wait_for_tasks(task_ids, update_callback=lambda s: None):
raise InventoryTaskError(
'some tasks finished with an error')
update_callback('pending taskscompleted in {} seconds'.format(
update_callback('pending tasks completed in {} seconds'.format(
time.time() - start_time))
......@@ -721,15 +728,19 @@ def _build_subnet_db(update_callback=lambda s: None):
rp.execute()
def check_task_status(task_id):
def check_task_status(task_id, parent=None, forget=False):
r = AsyncResult(task_id, app=app)
assert r.id == task_id # sanity
result = {
'id': r.id,
'id': task_id,
'status': r.status,
'exception': r.status in states.EXCEPTION_STATES,
'ready': r.status in states.READY_STATES,
'success': r.status == states.SUCCESS,
'parent': parent
}
# TODO: only discovered this case by testing, is this the only one?
# ... otherwise need to pre-test json serialization
if isinstance(r.result, Exception):
......@@ -739,4 +750,26 @@ def check_task_status(task_id):
}
else:
result['result'] = r.result
return result
def child_taskids(children):
# reverse-engineered, can't find documentation on this
for child in children:
if not child:
continue
if isinstance(child, list):
logger.debug(f'list: {child}')
yield from child_taskids(child)
continue
if isinstance(child, str):
yield child
continue
assert isinstance(child, AsyncResult)
yield child.id
for child_id in child_taskids(getattr(r, 'children', []) or []):
yield from check_task_status(child_id, parent=task_id)
if forget and result['ready']:
r.forget()
yield result
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment