diff --git a/inventory_provider/routes/jobs.py b/inventory_provider/routes/jobs.py index c26ab6a588041272a855065d38d13e1d028b4474..29f75a948b07f44d6e49f9cb6d5c59143a9b97fc 100644 --- a/inventory_provider/routes/jobs.py +++ b/inventory_provider/routes/jobs.py @@ -1,4 +1,4 @@ -from flask import Blueprint, Response, current_app +from flask import Blueprint, Response, current_app, jsonify from inventory_provider.tasks import worker routes = Blueprint("inventory-data-job-routes", __name__) @@ -6,10 +6,9 @@ routes = Blueprint("inventory-data-job-routes", __name__) @routes.route("/update", methods=['GET', 'POST']) def update(): - worker.start_refresh_cache_all( + job_ids = worker.launch_refresh_cache_all( current_app.config["INVENTORY_PROVIDER_CONFIG"]) - return Response("OK") - + return jsonify(job_ids) @routes.route("update-interface-statuses") def update_interface_statuses(): @@ -21,3 +20,7 @@ def update_interface_statuses(): def reload_router_config(equipment_name): worker.reload_router_config.delay(equipment_name) return Response("OK") + +@routes.route("check-task-status/<task_id>") +def check_task_status(task_id): + return jsonify(worker.check_task_status(task_id)) diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index b3e05ef0b6993cccccfb0e67f3882d92c0fa67bb..f05ba9a36d104995af8f03626c3d4859e9c614f5 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -2,7 +2,9 @@ import json import logging import re -from celery import bootsteps, Task, group +from celery import bootsteps, Task, group, states +from celery.result import AsyncResult + from collections import defaultdict from lxml import etree @@ -323,7 +325,7 @@ def _derive_router_hostnames(config): return junosspace_equipment & opsdb_equipment -def start_refresh_cache_all(config): +def launch_refresh_cache_all(config): """ utility function intended to be called outside of the worker process :param config: config structure as defined in config.py @@ -353,4 +355,17 @@ def start_refresh_cache_all(config): 'queueing router refresh jobs for %r' % hostname) subtasks.append(reload_router_config.s(hostname)) - return group(subtasks).apply_async() + return [r.id for r in group(subtasks).apply_async()] + + +def check_task_status(task_id): + r = AsyncResult(task_id, app=app) + return { + 'id': task_id, + 'status': r.status, + 'exception': r.status in states.EXCEPTION_STATES, + 'ready': r.status in states.READY_STATES, + 'success': r.status == states.SUCCESS, + 'result': r.result + } +