From b077e8a16e8099d1f5c76c1f80e04e43737bc32f Mon Sep 17 00:00:00 2001 From: Erik Reid <erik.reid@geant.org> Date: Fri, 8 Feb 2019 21:08:16 +0100 Subject: [PATCH] added task status updated --- inventory_provider/tasks/worker.py | 88 ++++++++++++++++++++++++------ 1 file changed, 72 insertions(+), 16 deletions(-) diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index 86d6b81b..8ee10b87 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -3,8 +3,9 @@ import logging import re from celery import bootsteps, Task, group, states +from celery.utils.log import get_task_logger from celery.result import AsyncResult - +from celery.signals import setup_logging from collections import defaultdict from lxml import etree @@ -22,11 +23,12 @@ from inventory_provider import juniper environment.setup_logging() +class InventoryTaskError(Exception): + pass class InventoryTask(Task): config = None - # logger = None def __init__(self): pass @@ -55,7 +57,7 @@ class WorkerArgs(bootsteps.Step): with open(config_filename) as f: task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) task_logger.info( - "Initializing worker with config from: %r % config_filename") + "Initializing worker with config from: %r" % config_filename) InventoryTask.config = config.load(f) @@ -72,9 +74,13 @@ app.user_options['worker'].add(worker_args) app.steps['worker'].add(WorkerArgs) +@setup_logging.connect +def _setup_logging(*args, **kwargs): + return logging.getLogger(constants.TASK_LOGGER_NAME) + @app.task def snmp_refresh_interfaces(hostname, community): - task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) + task_logger = logging.getlogger(constants.TASK_LOGGER_NAME) task_logger.debug( '>>> snmp_refresh_interfaces(%r, %r)' % (hostname, community)) @@ -189,20 +195,44 @@ def update_interface_statuses(): task_logger.debug('<<< update_interface_statuses') -@app.task -def update_junosspace_device_list(): +@app.task(base=InventoryTask, bind=True) +def update_junosspace_device_list(self): task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) task_logger.debug('>>> update_junosspace_device_list') + self.update_state( + state=states.STARTED, + meta={ + 'task': 'update_junosspace_device_list', + 'message': 'querying junosspace for managed routers' + }) + r = get_redis(InventoryTask.config) + + routers = {} for d in juniper.load_routers_from_junosspace( - InventoryTask.config["junosspace"]): - r.set( - 'junosspace:' + d['hostname'], - json.dumps(d).encode('utf-8')) + InventoryTask.config['junosspace']): + routers['junosspace:' + d['hostname']] = json.dumps(d).endode('utf-8') + + self.update_state( + state=states.STARTED, + meta={ + 'task': 'update_junosspace_device_list', + 'message': 'found %d routers, saving details' % len(routers) + }) + + for k in r.keys('junosspace:*'): + r.delete(k) + for k, v in routers.items(): + r.set(k, v) task_logger.debug('<<< update_junosspace_device_list') + return { + 'task': 'update_junosspace_device_list', + 'message': 'saved %d managed routers' % len(routers) + } + def load_netconf_data(hostname): """ @@ -263,33 +293,59 @@ def refresh_vpn_rr_peers(hostname, netconf): juniper.vpn_rr_peers(netconf)) -@app.task(base=ReleaseTask, bind=True)) -def reload_router_config(hostname): +@app.task(base=InventoryTask, bind=True) +def reload_router_config(self, hostname): task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) task_logger.debug('>>> update_router_config') + self.update_state( + state=states.STARTED, + meta={ + 'task': 'reload_router_config', + 'hostname': hostname, + 'message': 'loading router netconf data' + }) + netconf_refresh_config.apply(args=[hostname]) netconf_doc = load_netconf_data(hostname) if netconf_doc is None: - task_logger.error('no netconf data available for %r' % hostname) + raise InventoryTaskError( + 'no netconf data available for %r' % hostname) else: - + self.update_state( + state=states.STARTED, + meta={ + 'task': 'reload_router_config', + 'hostname': hostname, + 'message': 'refreshing peers' + }) refresh_ix_public_peers(hostname, netconf_doc) refresh_vpn_rr_peers(hostname, netconf_doc) community = juniper.snmp_community_string(netconf_doc) if not community: - task_logger.error( + raise InventoryTaskError( 'error extracting community string for %r' % hostname) else: + self.update_state( + state=states.STARTED, + meta={ + 'task': 'reload_router_config', + 'hostname': hostname, + 'message': 'refreshing snmp interface indexes' + }) snmp_refresh_interfaces.apply(args=[hostname, community]) # TODO: move this out of else? (i.e. clear even if netconf fails?) clear_cached_classifier_responses(hostname) task_logger.debug('<<< update_router_config') - + return { + 'task': 'reload_router_config', + 'hostname': hostname, + 'message': 'OK' + } def _derive_router_hostnames(config): r = get_redis(config) -- GitLab