diff --git a/inventory_provider/routes/jobs.py b/inventory_provider/routes/jobs.py index 9d3328581ea109c35a5ce79afe783f3bd38459b1..c46064366b53cbfc5a966d5c02b650490cb20c28 100644 --- a/inventory_provider/routes/jobs.py +++ b/inventory_provider/routes/jobs.py @@ -32,16 +32,16 @@ def update(): status=503, mimetype="text/html") - job_ids = worker.launch_refresh_cache_all( + phase2_task_id = worker.launch_refresh_cache_all( current_app.config["INVENTORY_PROVIDER_CONFIG"]) - return jsonify(job_ids) + return jsonify({'task id': phase2_task_id}) @routes.route("reload-router-config/<equipment_name>", methods=['GET', 'POST']) @common.require_accepts_json def reload_router_config(equipment_name): result = worker.reload_router_config.delay(equipment_name) - return jsonify([result.id]) + return jsonify({'task id': result.id}) @routes.route("check-task-status/<task_id>", methods=['GET', 'POST']) diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index 8726aaf4885def62f661fdb36c542a566942a87e..c6d75ee608ab17dbe1c15f6fcd62a5378afb3711 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -512,6 +512,29 @@ def _erase_next_db(config): new_next=saved_latch['next']) +@app.task(base=InventoryTask, bind=True, name='internal_refresh_phase_2') +@log_task_entry_and_exit +def internal_refresh_phase_2(self): + # second batch of subtasks: + # alarms db status cache + # juniper netconf & snmp data + subtasks = [ + update_equipment_locations.apply_async(), + update_lg_routers.apply_async(), + update_access_services.apply_async(), + import_unmanaged_interfaces.apply_async() + ] + for hostname in data.derive_router_hostnames(config): + logger.debug('queueing router refresh jobs for %r' % hostname) + subtasks.append(reload_router_config.apply_async(args=[hostname])) + + pending_task_ids = [x.id for x in subtasks] + + t = refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)]) + pending_task_ids.append(t.id) + return pending_task_ids + + def launch_refresh_cache_all(config): """ utility function intended to be called outside of the worker process @@ -531,24 +554,10 @@ def launch_refresh_cache_all(config): ] [x.get() for x in subtasks] - # second batch of subtasks: - # alarms db status cache - # juniper netconf & snmp data - subtasks = [ - update_equipment_locations.apply_async(), - update_lg_routers.apply_async(), - update_access_services.apply_async(), - import_unmanaged_interfaces.apply_async() - ] - for hostname in data.derive_router_hostnames(config): - logger.debug('queueing router refresh jobs for %r' % hostname) - subtasks.append(reload_router_config.apply_async(args=[hostname])) - - pending_task_ids = [x.id for x in subtasks] - - t = refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)]) - pending_task_ids.append(t.id) - return pending_task_ids + # now launch the task whose only purpose is to + # act as a convenient parent for all of the remaining tasks + t = internal_refresh_phase_2.apply_async() + return t.id def _wait_for_tasks(task_ids, update_callback=lambda s: None): @@ -679,14 +688,13 @@ def check_task_status(task_id): 'ready': r.status in states.READY_STATES, 'success': r.status == states.SUCCESS, } - if r.result: - # TODO: only discovered this case by testing, is this the only one? - # ... otherwise need to pre-test json serialization - if isinstance(r.result, Exception): - result['result'] = { - 'error type': type(r.result).__name__, - 'message': str(r.result) - } - else: - result['result'] = r.result + # TODO: only discovered this case by testing, is this the only one? + # ... otherwise need to pre-test json serialization + if isinstance(r.result, Exception): + result['result'] = { + 'error type': type(r.result).__name__, + 'message': str(r.result) + } + else: + result['result'] = r.result return result