From 97bf661ffb875b11ba2f1abe69a071eebe025b6c Mon Sep 17 00:00:00 2001 From: Erik Reid <erik.reid@geant.org> Date: Sat, 2 Feb 2019 09:27:13 +0100 Subject: [PATCH] combine all inventory cache tasks to one --- inventory_provider/routes/jobs.py | 38 ++++++++++++++---------------- inventory_provider/tasks/worker.py | 23 +++++++++++++++++- 2 files changed, 40 insertions(+), 21 deletions(-) diff --git a/inventory_provider/routes/jobs.py b/inventory_provider/routes/jobs.py index 12f3616b..eddba59b 100644 --- a/inventory_provider/routes/jobs.py +++ b/inventory_provider/routes/jobs.py @@ -48,31 +48,29 @@ def update(): # return Response("OK") -@routes.route("update-interfaces-to-services", methods=['GET']) -def update_interfaces_to_services(): - app.send_task( - 'inventory_provider.tasks.worker.update_interfaces_to_services') - return Response("OK") - - -@routes.route("update-service-hierarchy", methods=['GET']) -def update_service_hierarchy(): - app.send_task('inventory_provider.tasks.worker.update_circuit_hierarchy') - return Response("OK") - - -@routes.route("update-equipment-locations", methods=['GET']) -def update_equipment_locations(): - app.send_task('inventory_provider.tasks.worker.update_equipment_locations') - return Response("OK") +# @routes.route("update-interfaces-to-services", methods=['GET']) +# def update_interfaces_to_services(): +# app.send_task( +# 'inventory_provider.tasks.worker.update_interfaces_to_services') +# return Response("OK") +# +# +# @routes.route("update-service-hierarchy", methods=['GET']) +# def update_service_hierarchy(): +# app.send_task('inventory_provider.tasks.worker.update_circuit_hierarchy') +# return Response("OK") +# +# +# @routes.route("update-equipment-locations", methods=['GET']) +# def update_equipment_locations(): +# app.send_task('inventory_provider.tasks.worker.update_equipment_locations') +# return Response("OK") @routes.route("update-from-inventory-system", methods=['GET']) def update_from_inventory_system(): app.send_task( - 'inventory_provider.tasks.worker.update_interfaces_to_services') - app.send_task('inventory_provider.tasks.worker.update_circuit_hierarchy') - app.send_task('inventory_provider.tasks.worker.update_equipment_locations') + 'inventory_provider.tasks.worker.update_inventory_system_cache') return Response("OK") diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index 87b41bd4..a069c08c 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -1,7 +1,9 @@ import json import logging -from celery import bootsteps, Task +from celery import bootsteps, Task, group + + from collections import defaultdict from lxml import etree @@ -14,8 +16,10 @@ from inventory_provider.db import db, opsdb, alarmsdb from inventory_provider import snmp from inventory_provider import juniper + environment.setup_logging() +# TODO: error callback (cf. http://docs.celeryproject.org/en/latest/userguide/calling.html#linking-callbacks-errbacks) class InventoryTask(Task): @@ -142,6 +146,9 @@ def netconf_refresh_config(self, hostname): @app.task() def update_interfaces_to_services(): + logger = logging.getLogger(constants.TASK_LOGGER_NAME) + logger.error('HERE: update_interfaces_to_services') + r = get_redis(InventoryTask.config) with db.connection(InventoryTask.config["ops-db"]) as cx: services = opsdb.get_circuits(cx) @@ -221,3 +228,17 @@ def update_interface_statuses(): service["equipment"], service["interface_name"] )) + + +@app.task() +def update_inventory_system_cache(): + logger = logging.getLogger(constants.TASK_LOGGER_NAME) + logger.error('HERE: update_inventory_system_cache') + + g = group([ + update_interfaces_to_services.s(), + update_circuit_hierarchy.s(), + update_equipment_locations.s(), + update_interface_statuses.s() + ]) + g.apply_async() -- GitLab