diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index 4a3a18fc9ff6498d827963f61cd7ec8ff4193803..bb09fffd66b1fd78b843a962a9aa7f11b2048f0c 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -78,22 +78,45 @@ class InventoryTask(Task): logger.exception(exc) super().on_failure(exc, task_id, args, kwargs, einfo) + def _task_return_value(self, warning, message): + """ + common method for constructing a standard task return value + :param warning: boolean (False for normal, warning-free response) + :param message: text message to include in return value + :return: a serializable dict + """ + return { + 'task': self.name, + 'warning': warning, + 'message': message + } + + def success(self, message='OK'): + return self._task_return_value(warning=False, message=message) + + def warning(self, message='WARNING'): + return self._task_return_value(warning=True, message=message) + @app.task(base=InventoryTask, bind=True, name='snmp_refresh_interfaces') @log_task_entry_and_exit def snmp_refresh_interfaces(self, hostname, community): + # TODO: [DBOARD3-242] copy from current redis in case of error value = list(snmp.get_router_snmp_indexes(hostname, community)) r = get_next_redis(InventoryTask.config) r.set('snmp-interfaces:' + hostname, json.dumps(value)) + return self.success(message=f'snmp info loaded from {hostname}') @app.task(base=InventoryTask, bind=True, name='netconf_refresh_config') @log_task_entry_and_exit def netconf_refresh_config(self, hostname): + # TODO: [DBOARD3-242] copy from current redis in case of error netconf_doc = juniper.load_config(hostname, InventoryTask.config["ssh"]) netconf_str = etree.tostring(netconf_doc, encoding='unicode') r = get_next_redis(InventoryTask.config) r.set('netconf:' + hostname, netconf_str) + return self.success(message=f'netconf info loaded from {hostname}') @app.task(base=InventoryTask, bind=True, name='update_interfaces_to_services') @@ -119,6 +142,8 @@ def update_interfaces_to_services(self): json.dumps(services)) rp.execute() + return self.success() + @app.task(base=InventoryTask, bind=True, name='import_unmanaged_interfaces') @log_task_entry_and_exit @@ -151,6 +176,8 @@ def import_unmanaged_interfaces(self): json.dumps([ifc])) rp.execute() + return self.success() + @app.task(base=InventoryTask, bind=True, name='update_access_services') @log_task_entry_and_exit @@ -180,6 +207,8 @@ def update_access_services(self): json.dumps(service)) rp.execute() + return self.success() + @app.task(base=InventoryTask, bind=True, name='update_lg_routers') @log_task_entry_and_exit @@ -197,6 +226,8 @@ def update_lg_routers(self): rp.set(f'opsdb:lg:{router["equipment name"]}', json.dumps(router)) rp.execute() + return self.success() + @app.task(base=InventoryTask, bind=True, name='update_equipment_locations') @log_task_entry_and_exit @@ -216,6 +247,8 @@ def update_equipment_locations(self): rp.set('opsdb:location:%s' % h, json.dumps(locations)) rp.execute() + return self.success() + @app.task(base=InventoryTask, bind=True, name='update_circuit_hierarchy') @log_task_entry_and_exit @@ -246,6 +279,8 @@ def update_circuit_hierarchy(self): rp.set('opsdb:services:children:%d' % cid, json.dumps(children)) rp.execute() + return self.success() + @app.task(base=InventoryTask, bind=True, name='update_geant_lambdas') @log_task_entry_and_exit @@ -265,6 +300,8 @@ def update_geant_lambdas(self): json.dumps(ld)) rp.execute() + return self.success() + @app.task(base=InventoryTask, bind=True, name='update_neteng_managed_device_list') @@ -288,10 +325,7 @@ def update_neteng_managed_device_list(self): r = get_next_redis(InventoryTask.config) r.set('netdash', json.dumps(routers).encode('utf-8')) - return { - 'task': 'update_neteng_managed_device_list', - 'message': 'saved %d managed routers' % len(routers) - } + return self.success(f'saved {len(routers)} managed routers') def load_netconf_data(hostname): @@ -440,7 +474,7 @@ def reload_router_config(self, hostname): except InventoryTaskError: pass # ok at this point if not found - # load new netconf data + # load new netconf data, in this thread netconf_refresh_config.apply(args=[hostname]) netconf_doc = load_netconf_data(hostname) @@ -451,11 +485,7 @@ def reload_router_config(self, hostname): 'no timestamp available for new netconf data' if new_netconf_timestamp == current_netconf_timestamp: logger.debug('no netconf change timestamp change, aborting') - return { - 'task': 'reload_router_config', - 'hostname': hostname, - 'message': 'OK (no change)' - } + return self.success(f'no change (timestamp not updated)') # clear cached classifier responses for this router, and # refresh peering data @@ -487,11 +517,7 @@ def reload_router_config(self, hostname): clear_cached_classifier_responses(None) - return { - 'task': 'reload_router_config', - 'hostname': hostname, - 'message': 'OK' - } + return self.success(f'updated config for {hostname}') def _erase_next_db(config): @@ -532,7 +558,8 @@ def internal_refresh_phase_2(self): t = refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)]) pending_task_ids.append(t.id) - return pending_task_ids + + return self.success() def launch_refresh_cache_all(config): @@ -628,6 +655,8 @@ def refresh_finalizer(self, pending_task_ids_json): latch_db(InventoryTask.config) _update('latched current/next dbs') + return self.success() + def _build_service_category_interface_list(update_callback=lambda s: None):