Skip to content
Snippets Groups Projects
Commit fe1f8fb6 authored by Erik Reid's avatar Erik Reid
Browse files

only return the parent id of all pending tasks

parent 250a779f
No related branches found
No related tags found
No related merge requests found
......@@ -32,16 +32,16 @@ def update():
status=503,
mimetype="text/html")
job_ids = worker.launch_refresh_cache_all(
phase2_task_id = worker.launch_refresh_cache_all(
current_app.config["INVENTORY_PROVIDER_CONFIG"])
return jsonify(job_ids)
return jsonify({'task id': phase2_task_id})
@routes.route("reload-router-config/<equipment_name>", methods=['GET', 'POST'])
@common.require_accepts_json
def reload_router_config(equipment_name):
result = worker.reload_router_config.delay(equipment_name)
return jsonify([result.id])
return jsonify({'task id': result.id})
@routes.route("check-task-status/<task_id>", methods=['GET', 'POST'])
......
......@@ -512,6 +512,29 @@ def _erase_next_db(config):
new_next=saved_latch['next'])
@app.task(base=InventoryTask, bind=True, name='internal_refresh_phase_2')
@log_task_entry_and_exit
def internal_refresh_phase_2(self):
# second batch of subtasks:
# alarms db status cache
# juniper netconf & snmp data
subtasks = [
update_equipment_locations.apply_async(),
update_lg_routers.apply_async(),
update_access_services.apply_async(),
import_unmanaged_interfaces.apply_async()
]
for hostname in data.derive_router_hostnames(config):
logger.debug('queueing router refresh jobs for %r' % hostname)
subtasks.append(reload_router_config.apply_async(args=[hostname]))
pending_task_ids = [x.id for x in subtasks]
t = refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)])
pending_task_ids.append(t.id)
return pending_task_ids
def launch_refresh_cache_all(config):
"""
utility function intended to be called outside of the worker process
......@@ -531,24 +554,10 @@ def launch_refresh_cache_all(config):
]
[x.get() for x in subtasks]
# second batch of subtasks:
# alarms db status cache
# juniper netconf & snmp data
subtasks = [
update_equipment_locations.apply_async(),
update_lg_routers.apply_async(),
update_access_services.apply_async(),
import_unmanaged_interfaces.apply_async()
]
for hostname in data.derive_router_hostnames(config):
logger.debug('queueing router refresh jobs for %r' % hostname)
subtasks.append(reload_router_config.apply_async(args=[hostname]))
pending_task_ids = [x.id for x in subtasks]
t = refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)])
pending_task_ids.append(t.id)
return pending_task_ids
# now launch the task whose only purpose is to
# act as a convenient parent for all of the remaining tasks
t = internal_refresh_phase_2.apply_async()
return t.id
def _wait_for_tasks(task_ids, update_callback=lambda s: None):
......@@ -679,14 +688,13 @@ def check_task_status(task_id):
'ready': r.status in states.READY_STATES,
'success': r.status == states.SUCCESS,
}
if r.result:
# TODO: only discovered this case by testing, is this the only one?
# ... otherwise need to pre-test json serialization
if isinstance(r.result, Exception):
result['result'] = {
'error type': type(r.result).__name__,
'message': str(r.result)
}
else:
result['result'] = r.result
# TODO: only discovered this case by testing, is this the only one?
# ... otherwise need to pre-test json serialization
if isinstance(r.result, Exception):
result['result'] = {
'error type': type(r.result).__name__,
'message': str(r.result)
}
else:
result['result'] = r.result
return result
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment