diff --git a/changelog b/changelog index b39b9219ce6ba4949f9781811f6058b420b7a2fc..c0517b2e1e91fb1b82ee5627d3a82c3328ee18b0 100644 --- a/changelog +++ b/changelog @@ -38,4 +38,6 @@ 0.21: added parsing of 'logical-systems' (DBOARD3-150) 0.22: return a skeleton response for unknown interfaces (DBOARD3-169) 0.23: use redis pipelines where possible -0.24: optimization, don't do aggressive checking when deleting during rebuilding +0.24: optimization, don't do aggressive pre-delete checking rebuilding +0.25: propagate errors when waiting for tasks to complete +0.26: NOT SUITABLE FOR PRODUCTION! filter qfx* routers until space is synced with opsdb diff --git a/inventory_provider/juniper.py b/inventory_provider/juniper.py index d7eca477fb5ecf567c1dc4121e928c9d878ebb1f..ef0b5ee02248d3320419c5ad0bbde85d110eae6f 100644 --- a/inventory_provider/juniper.py +++ b/inventory_provider/juniper.py @@ -3,6 +3,7 @@ import re import ipaddress from jnpr.junos import Device +from jnpr.junos import exception as EzErrors from lxml import etree import netifaces import requests @@ -156,7 +157,10 @@ def _rpc(hostname, ssh): host=hostname, user=ssh['username'], ssh_private_key_file=ssh['private-key']) - dev.open() + try: + dev.open() + except EzErrors.ConnectError as e: + raise ConnectionError(str(e)) return dev.rpc diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index 85e658db855ca1fabf66c80cbad43c10215880cc..39832cb6355612bee20572a9eeabb9671c4d8591 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -27,6 +27,8 @@ FINALIZER_TIMEOUT_S = 300 environment.setup_logging() +logger = logging.getLogger(__name__) + class InventoryTaskError(Exception): pass @@ -54,16 +56,18 @@ class InventoryTask(Task): logging.debug("loaded config: %r" % InventoryTask.config) def update_state(self, **kwargs): - logger = logging.getLogger(__name__) logger.debug(json.dumps( - {'state': kwargs['state'], 'meta': kwargs['meta']} + {'state': kwargs['state'], 'meta': str(kwargs['meta'])} )) super().update_state(**kwargs) + def on_failure(self, exc, task_id, args, kwargs, einfo): + logger.exception(exc) + super().on_failure(exc, task_id, args, kwargs, einfo) + -@app.task -def snmp_refresh_interfaces(hostname, community): - logger = logging.getLogger(__name__) +@app.task(base=InventoryTask, bind=True) +def snmp_refresh_interfaces(self, hostname, community): logger.debug( '>>> snmp_refresh_interfaces(%r, %r)' % (hostname, community)) @@ -76,9 +80,8 @@ def snmp_refresh_interfaces(hostname, community): '<<< snmp_refresh_interfaces(%r, %r)' % (hostname, community)) -@app.task -def netconf_refresh_config(hostname): - logger = logging.getLogger(__name__) +@app.task(base=InventoryTask, bind=True) +def netconf_refresh_config(self, hostname): logger.debug('>>> netconf_refresh_config(%r)' % hostname) netconf_doc = juniper.load_config(hostname, InventoryTask.config["ssh"]) @@ -90,9 +93,8 @@ def netconf_refresh_config(hostname): logger.debug('<<< netconf_refresh_config(%r)' % hostname) -@app.task -def update_interfaces_to_services(): - logger = logging.getLogger(__name__) +@app.task(base=InventoryTask, bind=True) +def update_interfaces_to_services(self): logger.debug('>>> update_interfaces_to_services') interface_services = defaultdict(list) @@ -115,9 +117,8 @@ def update_interfaces_to_services(): logger.debug('<<< update_interfaces_to_services') -@app.task -def update_equipment_locations(): - logger = logging.getLogger(__name__) +@app.task(base=InventoryTask, bind=True) +def update_equipment_locations(self): logger.debug('>>> update_equipment_locations') r = get_next_redis(InventoryTask.config) @@ -134,9 +135,8 @@ def update_equipment_locations(): logger.debug('<<< update_equipment_locations') -@app.task -def update_circuit_hierarchy(): - logger = logging.getLogger(__name__) +@app.task(base=InventoryTask, bind=True) +def update_circuit_hierarchy(self): logger.debug('>>> update_circuit_hierarchy') # TODO: integers are not JSON keys @@ -165,9 +165,8 @@ def update_circuit_hierarchy(): logger.debug('<<< update_circuit_hierarchy') -@app.task -def update_geant_lambdas(): - logger = logging.getLogger(__name__) +@app.task(base=InventoryTask, bind=True) +def update_geant_lambdas(self): logger.debug('>>> update_geant_lambdas') r = get_next_redis(InventoryTask.config) @@ -186,7 +185,6 @@ def update_geant_lambdas(): @app.task(base=InventoryTask, bind=True) def update_junosspace_device_list(self): - logger = logging.getLogger(__name__) logger.debug('>>> update_junosspace_device_list') self.update_state( @@ -240,7 +238,6 @@ def load_netconf_data(hostname): def clear_cached_classifier_responses(hostname=None): - logger = logging.getLogger(__name__) if hostname: logger.debug( 'removing cached classifier responses for %r' % hostname) @@ -273,7 +270,6 @@ def clear_cached_classifier_responses(hostname=None): def _refresh_peers(hostname, key_base, peers): - logger = logging.getLogger(__name__) logger.debug( 'removing cached %s for %r' % (key_base, hostname)) r = get_next_redis(InventoryTask.config) @@ -321,7 +317,6 @@ def refresh_interface_address_lookups(hostname, netconf): def refresh_juniper_interface_list(hostname, netconf): - logger = logging.getLogger(__name__) logger.debug( 'removing cached netconf-interfaces for %r' % hostname) @@ -351,7 +346,6 @@ def refresh_juniper_interface_list(hostname, netconf): @app.task(base=InventoryTask, bind=True) def reload_router_config(self, hostname): - logger = logging.getLogger(__name__) logger.debug('>>> reload_router_config') self.update_state( @@ -433,7 +427,6 @@ def reload_router_config(self, hostname): def _derive_router_hostnames(config): - logger = logging.getLogger(__name__) r = get_next_redis(config) junosspace_equipment = set() for k in r.keys('junosspace:*'): @@ -478,8 +471,6 @@ def launch_refresh_cache_all(config): :param config: config structure as defined in config.py :return: """ - logger = logging.getLogger(__name__) - _erase_next_db(config) # first batch of subtasks: refresh cached opsdb data @@ -498,6 +489,11 @@ def launch_refresh_cache_all(config): update_equipment_locations.apply_async(), ] for hostname in _derive_router_hostnames(config): + # TODO: remove this filter when the qfx* switches are configured + if not hostname.startswith('mx'): + logger.error( + 'TEMP TEMP!!! skipping loading of host: %r' % hostname) + continue logger.debug('queueing router refresh jobs for %r' % hostname) subtasks.append(reload_router_config.apply_async(args=[hostname])) @@ -542,7 +538,6 @@ def _wait_for_tasks(task_ids, update_callback=lambda s: None): @app.task(base=InventoryTask, bind=True) def refresh_finalizer(self, pending_task_ids_json): - logger = logging.getLogger(__name__) logger.debug('>>> refresh_finalizer') logger.debug('task_ids: %r' % pending_task_ids_json) diff --git a/setup.py b/setup.py index 8bc98c6dc9256e7dba35d923ed286760254f3ec2..a186f511eff699a98491a677f0bf0f81e1f23e86 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup, find_packages setup( name='inventory-provider', - version="0.25", + version="0.26", author='GEANT', author_email='swd@geant.org', description='Dashboard inventory provider',