diff --git a/inventory_provider/config.py b/inventory_provider/config.py index f2cac43591ebb490e19e5d45da858de1fba3f4e0..aa6ef5ffe623a9037c39fef4bdba1156e22b5ba1 100644 --- a/inventory_provider/config.py +++ b/inventory_provider/config.py @@ -8,7 +8,7 @@ CONFIG_SCHEMA = { "definitions": { "timeout": { "type": "number", - "maximum": 10, # sanity + "maximum": 60, # sanity "exclusiveMinimum": 0 }, "database-credentials": { diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index 4a3a18fc9ff6498d827963f61cd7ec8ff4193803..132dc96fb4efcea521188710209eec7cbc54be46 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -35,7 +35,7 @@ def log_task_entry_and_exit(f): def _w(*args, **kwargs): logger.debug(f'>>> {f.__name__}{args}') try: - return f(*args, *kwargs) + return f(*args, **kwargs) finally: logger.debug(f'<<< {f.__name__}{args}') return _w @@ -51,6 +51,8 @@ class InventoryTask(Task): def __init__(self): + self.pid = os.getpid() + if InventoryTask.config: return @@ -69,6 +71,8 @@ class InventoryTask(Task): def update_state(self, **kwargs): meta = kwargs.setdefault('meta', dict()) meta['task'] = self.name + meta['worker'] = self.request.hostname + meta['pid'] = self.pid logger.debug(json.dumps( {'state': kwargs['state'], 'meta': str(meta)} )) @@ -78,22 +82,48 @@ class InventoryTask(Task): logger.exception(exc) super().on_failure(exc, task_id, args, kwargs, einfo) + def _task_return_value(self, warning, message): + """ + common method for constructing a standard task return value + :param warning: boolean (False for normal, warning-free response) + :param message: text message to include in return value + :return: a serializable dict + """ + return { + 'task': self.name, + 'id': self.request.id, + 'worker': self.request.hostname, + 'pid': self.pid, + 'warning': warning, + 'message': message + } + + def success(self, message='OK'): + return self._task_return_value(warning=False, message=message) + + def warning(self, message='WARNING'): + return self._task_return_value(warning=True, message=message) + @app.task(base=InventoryTask, bind=True, name='snmp_refresh_interfaces') @log_task_entry_and_exit def snmp_refresh_interfaces(self, hostname, community): + # TODO: [DBOARD3-242] copy from current redis in case of error value = list(snmp.get_router_snmp_indexes(hostname, community)) r = get_next_redis(InventoryTask.config) r.set('snmp-interfaces:' + hostname, json.dumps(value)) + return self.success(message=f'snmp info loaded from {hostname}') @app.task(base=InventoryTask, bind=True, name='netconf_refresh_config') @log_task_entry_and_exit def netconf_refresh_config(self, hostname): + # TODO: [DBOARD3-242] copy from current redis in case of error netconf_doc = juniper.load_config(hostname, InventoryTask.config["ssh"]) netconf_str = etree.tostring(netconf_doc, encoding='unicode') r = get_next_redis(InventoryTask.config) r.set('netconf:' + hostname, netconf_str) + return self.success(message=f'netconf info loaded from {hostname}') @app.task(base=InventoryTask, bind=True, name='update_interfaces_to_services') @@ -119,6 +149,8 @@ def update_interfaces_to_services(self): json.dumps(services)) rp.execute() + return self.success() + @app.task(base=InventoryTask, bind=True, name='import_unmanaged_interfaces') @log_task_entry_and_exit @@ -151,6 +183,8 @@ def import_unmanaged_interfaces(self): json.dumps([ifc])) rp.execute() + return self.success() + @app.task(base=InventoryTask, bind=True, name='update_access_services') @log_task_entry_and_exit @@ -180,6 +214,8 @@ def update_access_services(self): json.dumps(service)) rp.execute() + return self.success() + @app.task(base=InventoryTask, bind=True, name='update_lg_routers') @log_task_entry_and_exit @@ -197,6 +233,8 @@ def update_lg_routers(self): rp.set(f'opsdb:lg:{router["equipment name"]}', json.dumps(router)) rp.execute() + return self.success() + @app.task(base=InventoryTask, bind=True, name='update_equipment_locations') @log_task_entry_and_exit @@ -216,6 +254,8 @@ def update_equipment_locations(self): rp.set('opsdb:location:%s' % h, json.dumps(locations)) rp.execute() + return self.success() + @app.task(base=InventoryTask, bind=True, name='update_circuit_hierarchy') @log_task_entry_and_exit @@ -246,6 +286,8 @@ def update_circuit_hierarchy(self): rp.set('opsdb:services:children:%d' % cid, json.dumps(children)) rp.execute() + return self.success() + @app.task(base=InventoryTask, bind=True, name='update_geant_lambdas') @log_task_entry_and_exit @@ -265,6 +307,8 @@ def update_geant_lambdas(self): json.dumps(ld)) rp.execute() + return self.success() + @app.task(base=InventoryTask, bind=True, name='update_neteng_managed_device_list') @@ -288,10 +332,7 @@ def update_neteng_managed_device_list(self): r = get_next_redis(InventoryTask.config) r.set('netdash', json.dumps(routers).encode('utf-8')) - return { - 'task': 'update_neteng_managed_device_list', - 'message': 'saved %d managed routers' % len(routers) - } + return self.success(f'saved {len(routers)} managed routers') def load_netconf_data(hostname): @@ -368,6 +409,7 @@ def _refresh_peers(hostname, key_base, peers): rp.execute() +@log_task_entry_and_exit def refresh_ix_public_peers(hostname, netconf): _refresh_peers( hostname, @@ -375,6 +417,7 @@ def refresh_ix_public_peers(hostname, netconf): juniper.ix_public_peers(netconf)) +@log_task_entry_and_exit def refresh_vpn_rr_peers(hostname, netconf): _refresh_peers( hostname, @@ -382,6 +425,7 @@ def refresh_vpn_rr_peers(hostname, netconf): juniper.vpn_rr_peers(netconf)) +@log_task_entry_and_exit def refresh_interface_address_lookups(hostname, netconf): _refresh_peers( hostname, @@ -389,6 +433,7 @@ def refresh_interface_address_lookups(hostname, netconf): juniper.interface_addresses(netconf)) +@log_task_entry_and_exit def refresh_juniper_interface_list(hostname, netconf): logger.debug( 'removing cached netconf-interfaces for %r' % hostname) @@ -440,7 +485,7 @@ def reload_router_config(self, hostname): except InventoryTaskError: pass # ok at this point if not found - # load new netconf data + # load new netconf data, in this thread netconf_refresh_config.apply(args=[hostname]) netconf_doc = load_netconf_data(hostname) @@ -451,11 +496,7 @@ def reload_router_config(self, hostname): 'no timestamp available for new netconf data' if new_netconf_timestamp == current_netconf_timestamp: logger.debug('no netconf change timestamp change, aborting') - return { - 'task': 'reload_router_config', - 'hostname': hostname, - 'message': 'OK (no change)' - } + return self.success(f'no change (timestamp not updated)') # clear cached classifier responses for this router, and # refresh peering data @@ -487,11 +528,7 @@ def reload_router_config(self, hostname): clear_cached_classifier_responses(None) - return { - 'task': 'reload_router_config', - 'hostname': hostname, - 'message': 'OK' - } + return self.success(f'updated config for {hostname}') def _erase_next_db(config): @@ -532,9 +569,11 @@ def internal_refresh_phase_2(self): t = refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)]) pending_task_ids.append(t.id) - return pending_task_ids + + return self.success() +@log_task_entry_and_exit def launch_refresh_cache_all(config): """ utility function intended to be called outside of the worker process @@ -628,7 +667,10 @@ def refresh_finalizer(self, pending_task_ids_json): latch_db(InventoryTask.config) _update('latched current/next dbs') + return self.success() + +@log_task_entry_and_exit def _build_service_category_interface_list(update_callback=lambda s: None): def _classify(ifc):