diff --git a/inventory_provider/routes/jobs.py b/inventory_provider/routes/jobs.py index a02b414537c3c972f931941018257878d2e675e5..023e32798e713d8f78fbb72baf59d8f35c6ed6ec 100644 --- a/inventory_provider/routes/jobs.py +++ b/inventory_provider/routes/jobs.py @@ -3,81 +3,18 @@ import logging from flask import Blueprint, Response, current_app from inventory_provider.tasks import worker -from inventory_provider.constants import TASK_LOGGER_NAME routes = Blueprint("inventory-data-job-routes", __name__) @routes.route("/update", methods=['GET', 'POST']) def update(): - task_logger = logging.getLogger(TASK_LOGGER_NAME) - worker.start_refresh_cache_all( current_app.config["INVENTORY_PROVIDER_CONFIG"]) - - # - # - # app.send_task( - # 'inventory_provider.tasks.worker.refresh_cache_all') - # db_subtasks = [ - # update_junosspace_device_list.s(), - # update_inventory_system_cache.s() - # ] - # - # ch = ( - # group(db_subtasks), - # _chain_separator_task.s(), - # - # ) - # - # - # task_logger.debug( - # 'launching task: ' - # 'inventory_provider.tasks.worker.refresh_cache_all') - # app.send_task( - # 'inventory_provider.tasks.worker.refresh_cache_all') - return Response("OK") - - -# @routes.route("/update-startup", methods=['GET', 'POST']) -# def startup_update(): -# task_logger = logging.getLogger(TASK_LOGGER_NAME) -# task_logger.debug( -# 'launching task: ' -# 'inventory_provider.tasks.worker.update_alarmsdb_cache') -# app.send_task( -# 'inventory_provider.tasks.worker.update_alarmsdb_cache') - return Response("OK") - - -# @routes.route("update-interfaces-to-services", methods=['GET']) -# def update_interfaces_to_services(): -# app.send_task( -# 'inventory_provider.tasks.worker.update_interfaces_to_services') -# return Response("OK") -# -# -# @routes.route("update-service-hierarchy", methods=['GET']) -# def update_service_hierarchy(): -# app.send_task('inventory_provider.tasks.worker.update_circuit_hierarchy') -# return Response("OK") -# -# -# @routes.route("update-equipment-locations", methods=['GET']) -# def update_equipment_locations(): -# app.send_task('inventory_provider.tasks.worker.update_equipment_locations') -# return Response("OK") - - -@routes.route("update-from-inventory-system", methods=['GET']) -def update_from_inventory_system(): - app.send_task( - 'inventory_provider.tasks.worker.update_inventory_system_cache') return Response("OK") @routes.route("update-interface-statuses") def update_interface_statuses(): - app.send_task( - 'inventory_provider.tasks.worker.update_interface_statuses') + worker.update_interface_statuses().async_start() return Response("OK") diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index be868b5d891a90542f0839efd3b8dd5777b6bb48..90a1f99d5ca68286b2fc04174ea4c8fd385cd18f 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -63,40 +63,11 @@ class InventoryTask(Task): key, etree.tostring(xml_doc, encoding='unicode')) - # @staticmethod - # def save_key_json(hostname, key, data_obj): - # InventoryTask.save_key( - # hostname, - # key, - # json.dumps(data_obj)) - # - # @staticmethod - # def save_key_etree(hostname, key, xml_doc): - # InventoryTask.save_key( - # hostname, - # key, - # etree.tostring(xml_doc, encoding='unicode')) - - -# def _wait_for_result(async_result): -# import time -# logger = logging.getLogger(constants.TASK_LOGGER_NAME) -# while not async_result.ready(): -# logger.debug("async_result not ready ... wait") -# time.sleep(5.0) -# return async_result.get() class WorkerArgs(bootsteps.Step): def __init__(self, worker, config_filename, **options): with open(config_filename) as f: InventoryTask.config = config.load(f) - # InventoryTask.logger = logging.getLogger(constants.TASK_LOGGER_NAME) - -# interfaces_key = "interface_services" -# equipment_locations_key = "equipment_locations" -# service_child_to_parents_key = "child_to_parent_circuit_relations" -# service_parent_to_children_key = "parent_to_children_circuit_relations" -# interface_status_key = "interface_statuses" def worker_args(parser): @@ -141,18 +112,6 @@ def netconf_refresh_config(self, hostname): task_logger.debug('<<< netconf_refresh_config(%r)' % hostname) -# @app.task(bind=InventoryTask) -# def update_alarmsdb_cache(self): -# logger = logging.getLogger(constants.TASK_LOGGER_NAME) -# logger.debug('STARTING: update_alarmsdb_cache') -# -# with db.connection(InventoryTask.config["alarms-db"]) as cx: -# for table_name, data in alarmsdb.load_cache(cx): -# InventoryTask.save_value_json('alarmsdb:%s' % table_name, data) -# -# logger.debug('FINISHED: update_alarmsdb_cache') - - @app.task() def update_interfaces_to_services(): task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) @@ -241,23 +200,6 @@ def update_interface_statuses(): task_logger.debug('<<< update_interface_statuses') -# @app.task() -# def update_inventory_system_cache(): -# task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) -# task_logger.debug('>>> update_inventory_system_cache') -# -# subtasks = [ -# update_interfaces_to_services.s(), -# update_circuit_hierarchy.s(), -# update_equipment_locations.s(), -# # update_interface_statuses.s() -# ] -# -# group(subtasks).apply() -# -# task_logger.debug('<<< update_inventory_system_cache') - - @app.task() def update_junosspace_device_list(): task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) @@ -291,63 +233,6 @@ def _derive_router_hostnames(config): return junosspace_equipment & opsdb_equipment -# @app.task() -# def refresh_cache_for_router(hostname): -# task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) -# task_logger.debug('>>> refresh_cache_for_router(%r)' % hostname) -# -# # TODO: !!!! extract community string from netconf data -# task_logger.error( -# 'TODO: !!!! extract community string from netconf data') -# subtasks = [ -# netconf_refresh_config.s(hostname), -# snmp_refresh_interfaces.s(hostname, '0pBiFbD') -# ] -# -# group(subtasks).apply() -# -# # TODO: clear classifier cache -# -# task_logger.debug('<<< refresh_cache_for_router(%r)' % hostname) - - -# @app.task() -# def _chain_separator_task(): -# """ -# boilerplate in order to support groups as chord elements -# cf. https://stackoverflow.com/questions/15123772/celery-chaining-groups-and-subtasks-out-of-order-execution -# cf. http://docs.celeryproject.org/en/latest/userguide/canvas.html -# ('Chaining a group together with another task will automatically upgrade it to be a chord') -# :return: -# """ # noqa E501 -# task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) -# task_logger.debug('>>>_chain_separator_task<<<') -# pass - - -# @app.task() -# def refresh_cache_all(): -# task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) -# task_logger.debug('>>> refresh_cache_all') -# -# subtasks = [ -# update_junosspace_device_list.s(), -# update_inventory_system_cache.s() -# ] -# -# group(subtasks).apply() -# -# subtasks = [] -# for hostname in _derive_router_hostnames(InventoryTask.config): -# task_logger.debug( -# 'queueing refresh_cache_for_router for %r' % hostname) -# subtasks.append(refresh_cache_for_router.s(hostname)) -# -# group(subtasks).apply() -# -# task_logger.debug('<<< refresh_cache_all') - - def start_refresh_cache_all(config): """ utility function intended to be called outside of the worker process