diff --git a/inventory_provider/routes/jobs.py b/inventory_provider/routes/jobs.py index fe01ef8678b35125a49c4fb4e323706fce968352..a02b414537c3c972f931941018257878d2e675e5 100644 --- a/inventory_provider/routes/jobs.py +++ b/inventory_provider/routes/jobs.py @@ -1,8 +1,8 @@ import logging -from flask import Blueprint, Response +from flask import Blueprint, Response, current_app -from inventory_provider.tasks.app import app +from inventory_provider.tasks import worker from inventory_provider.constants import TASK_LOGGER_NAME routes = Blueprint("inventory-data-job-routes", __name__) @@ -11,11 +11,31 @@ routes = Blueprint("inventory-data-job-routes", __name__) @routes.route("/update", methods=['GET', 'POST']) def update(): task_logger = logging.getLogger(TASK_LOGGER_NAME) - task_logger.debug( - 'launching task: ' - 'inventory_provider.tasks.worker.refresh_cache_all') - app.send_task( - 'inventory_provider.tasks.worker.refresh_cache_all') + + worker.start_refresh_cache_all( + current_app.config["INVENTORY_PROVIDER_CONFIG"]) + + # + # + # app.send_task( + # 'inventory_provider.tasks.worker.refresh_cache_all') + # db_subtasks = [ + # update_junosspace_device_list.s(), + # update_inventory_system_cache.s() + # ] + # + # ch = ( + # group(db_subtasks), + # _chain_separator_task.s(), + # + # ) + # + # + # task_logger.debug( + # 'launching task: ' + # 'inventory_provider.tasks.worker.refresh_cache_all') + # app.send_task( + # 'inventory_provider.tasks.worker.refresh_cache_all') return Response("OK") diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index 9992365b70eec6975a492327e6208949c38daf8f..1eb87d32a8811823d73f5d7d5829738e08d9cdc9 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -3,7 +3,6 @@ import logging import re from celery import bootsteps, Task, group -from celery.result import allow_join_result from collections import defaultdict from lxml import etree @@ -25,7 +24,7 @@ environment.setup_logging() class InventoryTask(Task): config = None - logger = None + # logger = None def __init__(self): pass @@ -49,7 +48,7 @@ class InventoryTask(Task): "sanity failure: expected string data as value" r = get_redis(InventoryTask.config) r.set(name=key, value=value) - InventoryTask.logger.debug("saved %s" % key) + # InventoryTask.logger.debug("saved %s" % key) return "OK" @staticmethod @@ -79,12 +78,19 @@ class InventoryTask(Task): # etree.tostring(xml_doc, encoding='unicode')) +# def _wait_for_result(async_result): +# import time +# logger = logging.getLogger(constants.TASK_LOGGER_NAME) +# while not async_result.ready(): +# logger.debug("async_result not ready ... wait") +# time.sleep(5.0) +# return async_result.get() + class WorkerArgs(bootsteps.Step): def __init__(self, worker, config_filename, **options): with open(config_filename) as f: InventoryTask.config = config.load(f) - InventoryTask.logger = logging.getLogger(constants.TASK_LOGGER_NAME) - + # InventoryTask.logger = logging.getLogger(constants.TASK_LOGGER_NAME) # interfaces_key = "interface_services" # equipment_locations_key = "equipment_locations" @@ -108,8 +114,8 @@ app.steps['worker'].add(WorkerArgs) @app.task(bind=InventoryTask) def snmp_refresh_interfaces(self, hostname, community): - logger = logging.getLogger(constants.TASK_LOGGER_NAME) - logger.debug('STARTING: snmp_refresh_interfaces(%r, %r)' + task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) + task_logger.debug('>>> snmp_refresh_interfaces(%r, %r)' % (hostname, community)) InventoryTask.save_value_json( @@ -119,20 +125,20 @@ def snmp_refresh_interfaces(self, hostname, community): community, InventoryTask.config))) - logger.debug('FINISHED: snmp_refresh_interfaces(%r, %r)' + task_logger.debug('<<< snmp_refresh_interfaces(%r, %r)' % (hostname, community)) @app.task(bind=InventoryTask) def netconf_refresh_config(self, hostname): - logger = logging.getLogger(constants.TASK_LOGGER_NAME) - logger.debug('STARTING: netconf_refresh_config(%r)' % hostname) + task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) + task_logger.debug('>>> netconf_refresh_config(%r)' % hostname) InventoryTask.save_value_etree( 'netconf:' + hostname, juniper.load_config(hostname, InventoryTask.config["ssh"])) - logger.debug('FINISHED: netconf_refresh_config(%r)' % hostname) + task_logger.debug('<<< netconf_refresh_config(%r)' % hostname) # @app.task(bind=InventoryTask) @@ -149,6 +155,9 @@ def netconf_refresh_config(self, hostname): @app.task() def update_interfaces_to_services(): + task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) + task_logger.debug('>>> update_interfaces_to_services') + interface_services = defaultdict(list) with db.connection(InventoryTask.config["ops-db"]) as cx: for service in opsdb.get_circuits(cx): @@ -164,9 +173,14 @@ def update_interfaces_to_services(): 'opsdb:interface_services:' + equipment_interface, json.dumps(services)) + task_logger.debug('<<< update_interfaces_to_services') + @app.task() def update_equipment_locations(): + task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) + task_logger.debug('>>> update_equipment_locations') + r = get_redis(InventoryTask.config) for key in r.scan_iter('opsdb:location:*'): r.delete(key) @@ -174,9 +188,14 @@ def update_equipment_locations(): for ld in opsdb.get_equipment_location_data(cx): r.set('opsdb:location:%s' % ld['equipment_name'], json.dumps(ld)) + task_logger.debug('<<< update_equipment_locations') + @app.task() def update_circuit_hierarchy(): + task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) + task_logger.debug('>>> update_circuit_hierarchy') + # TODO: integers are not JSON keys with db.connection(InventoryTask.config["ops-db"]) as cx: child_to_parents = defaultdict(list) @@ -198,9 +217,14 @@ def update_circuit_hierarchy(): for cid, children in child_to_parents.items(): r.set('opsdb:services:children:%d' % cid, json.dumps(children)) + task_logger.debug('<<< update_circuit_hierarchy') + @app.task() def update_interface_statuses(): + task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) + task_logger.debug('>>> update_interface_statuses') + with db.connection(InventoryTask.config["ops-db"]) as cx: services = opsdb.get_circuits(cx) with db.connection(InventoryTask.config["alarms-db"]) as cx: @@ -214,39 +238,42 @@ def update_interface_statuses(): service["interface_name"]) InventoryTask.save_value(key, status) + task_logger.debug('<<< update_interface_statuses') -@app.task() -def update_inventory_system_cache(): - logger = logging.getLogger(constants.TASK_LOGGER_NAME) - logger.error('HERE: update_inventory_system_cache') - g = group([ - update_interfaces_to_services.s(), - update_circuit_hierarchy.s(), - update_equipment_locations.s(), - update_interface_statuses.s() - ]) - g.apply_async() +# @app.task() +# def update_inventory_system_cache(): +# task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) +# task_logger.debug('>>> update_inventory_system_cache') +# +# subtasks = [ +# update_interfaces_to_services.s(), +# update_circuit_hierarchy.s(), +# update_equipment_locations.s(), +# # update_interface_statuses.s() +# ] +# +# group(subtasks).apply() +# +# task_logger.debug('<<< update_inventory_system_cache') @app.task() def update_junosspace_device_list(): - logger = logging.getLogger(constants.TASK_LOGGER_NAME) - logger.error('HERE: update_junosspace_device_list') + task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) + task_logger.debug('>>> update_junosspace_device_list') r = get_redis(InventoryTask.config) - - logger.error(InventoryTask.config["junosspace"]) for d in juniper.load_routers_from_junosspace( InventoryTask.config["junosspace"]): - logger.error(d) r.set( 'junosspace:' + d['hostname'], json.dumps(d).encode('utf-8')) + task_logger.debug('<<< update_junosspace_device_list') -def _derive_router_hostnames(config): +def _derive_router_hostnames(config): r = get_redis(config) junosspace_equipment = set() for k in r.keys('junosspace:*'): @@ -264,39 +291,96 @@ def _derive_router_hostnames(config): return junosspace_equipment & opsdb_equipment -@app.task() -def refresh_cache_all(): +# @app.task() +# def refresh_cache_for_router(hostname): +# task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) +# task_logger.debug('>>> refresh_cache_for_router(%r)' % hostname) +# +# # TODO: !!!! extract community string from netconf data +# task_logger.error( +# 'TODO: !!!! extract community string from netconf data') +# subtasks = [ +# netconf_refresh_config.s(hostname), +# snmp_refresh_interfaces.s(hostname, '0pBiFbD') +# ] +# +# group(subtasks).apply() +# +# # TODO: clear classifier cache +# +# task_logger.debug('<<< refresh_cache_for_router(%r)' % hostname) + + +# @app.task() +# def _chain_separator_task(): +# """ +# boilerplate in order to support groups as chord elements +# cf. https://stackoverflow.com/questions/15123772/celery-chaining-groups-and-subtasks-out-of-order-execution +# cf. http://docs.celeryproject.org/en/latest/userguide/canvas.html +# ('Chaining a group together with another task will automatically upgrade it to be a chord') +# :return: +# """ # noqa E501 +# task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) +# task_logger.debug('>>>_chain_separator_task<<<') +# pass + + +# @app.task() +# def refresh_cache_all(): +# task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) +# task_logger.debug('>>> refresh_cache_all') +# +# subtasks = [ +# update_junosspace_device_list.s(), +# update_inventory_system_cache.s() +# ] +# +# group(subtasks).apply() +# +# subtasks = [] +# for hostname in _derive_router_hostnames(InventoryTask.config): +# task_logger.debug( +# 'queueing refresh_cache_for_router for %r' % hostname) +# subtasks.append(refresh_cache_for_router.s(hostname)) +# +# group(subtasks).apply() +# +# task_logger.debug('<<< refresh_cache_all') + + +def start_refresh_cache_all(config): + """ + utility function intended to be called outside of the worker process + :param config: config structure as defined in config.py + :return: + """ task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) - task_logger.debug('starting update_junosspace_device_list,' - 'update_inventory_system_cache') - g = group([ + # first batch of subtasks: refresh cached opsdb data + subtasks = [ update_junosspace_device_list.s(), - update_inventory_system_cache.s() - ]) - - results = g.apply_async() - with allow_join_result(): - results.join() + update_interfaces_to_services.s(), + update_circuit_hierarchy.s() + ] - for hostname in _derive_router_hostnames(InventoryTask.config): - task_logger.info("fetching details for: %r" % hostname) + results = group(subtasks).apply_async() + results.join() + # second batch of subtasks: + # alarms db status cache + # juniper netconf & snmp data + subtasks = [ + update_interface_statuses.s() + ] + for hostname in _derive_router_hostnames(config): task_logger.debug( - 'launching task: ' - 'inventory_provider.tasks.worker.netconf_refresh_config' - '(%s)' % hostname) - app.send_task( - 'inventory_provider.tasks.worker.netconf_refresh_config', - args=[hostname]) + 'queueing router refresh jobs for %r' % hostname) - task_logger.debug( - 'launching task: ' - 'inventory_provider.tasks.worker.snmp_refresh_interfaces' - '(%s)' % hostname) # TODO: !!!! extract community string from netconf data - app.send_task( - 'inventory_provider.tasks.worker.snmp_refresh_interfaces', - args=[hostname, '0pBiFbD']) + task_logger.error( + 'TODO: !!!! extract community string from netconf data') + subtasks.append(netconf_refresh_config.s(hostname)) + # TODO: these should be synchronous, and then cleanup classifier cache + subtasks.append(snmp_refresh_interfaces.s(hostname, '0pBiFbD')) - return "OK" + return group(subtasks).apply_async()