From ce2fdfdf7c1aedc9a4713b9363f015e36a9407fe Mon Sep 17 00:00:00 2001 From: Erik Reid <erik.reid@geant.org> Date: Fri, 8 Feb 2019 15:43:21 +0100 Subject: [PATCH] api for checking task status --- inventory_provider/routes/jobs.py | 11 +++++++---- inventory_provider/tasks/worker.py | 21 ++++++++++++++++++--- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/inventory_provider/routes/jobs.py b/inventory_provider/routes/jobs.py index c26ab6a5..29f75a94 100644 --- a/inventory_provider/routes/jobs.py +++ b/inventory_provider/routes/jobs.py @@ -1,4 +1,4 @@ -from flask import Blueprint, Response, current_app +from flask import Blueprint, Response, current_app, jsonify from inventory_provider.tasks import worker routes = Blueprint("inventory-data-job-routes", __name__) @@ -6,10 +6,9 @@ routes = Blueprint("inventory-data-job-routes", __name__) @routes.route("/update", methods=['GET', 'POST']) def update(): - worker.start_refresh_cache_all( + job_ids = worker.launch_refresh_cache_all( current_app.config["INVENTORY_PROVIDER_CONFIG"]) - return Response("OK") - + return jsonify(job_ids) @routes.route("update-interface-statuses") def update_interface_statuses(): @@ -21,3 +20,7 @@ def update_interface_statuses(): def reload_router_config(equipment_name): worker.reload_router_config.delay(equipment_name) return Response("OK") + +@routes.route("check-task-status/<task_id>") +def check_task_status(task_id): + return jsonify(worker.check_task_status(task_id)) diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index b3e05ef0..f05ba9a3 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -2,7 +2,9 @@ import json import logging import re -from celery import bootsteps, Task, group +from celery import bootsteps, Task, group, states +from celery.result import AsyncResult + from collections import defaultdict from lxml import etree @@ -323,7 +325,7 @@ def _derive_router_hostnames(config): return junosspace_equipment & opsdb_equipment -def start_refresh_cache_all(config): +def launch_refresh_cache_all(config): """ utility function intended to be called outside of the worker process :param config: config structure as defined in config.py @@ -353,4 +355,17 @@ def start_refresh_cache_all(config): 'queueing router refresh jobs for %r' % hostname) subtasks.append(reload_router_config.s(hostname)) - return group(subtasks).apply_async() + return [r.id for r in group(subtasks).apply_async()] + + +def check_task_status(task_id): + r = AsyncResult(task_id, app=app) + return { + 'id': task_id, + 'status': r.status, + 'exception': r.status in states.EXCEPTION_STATES, + 'ready': r.status in states.READY_STATES, + 'success': r.status == states.SUCCESS, + 'result': r.result + } + -- GitLab