diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index 591875038d6d2d35ebb4ddf6f319f6dc8afbd99c..cf861ded2d10158db2b9f72bc7c5cd2d61def8ba 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -12,7 +12,7 @@ import jsonschema from inventory_provider.tasks.app import app from inventory_provider.tasks.common \ - import get_next_redis, get_current_redis, save_task_log, \ + import get_next_redis, get_current_redis, \ latch_db, get_latch, set_latch, update_latch_status from inventory_provider.tasks import data from inventory_provider import config @@ -53,7 +53,6 @@ class InventoryTask(Task): def __init__(self): self.args = [] - self.messages = [] if InventoryTask.config: return @@ -70,75 +69,17 @@ class InventoryTask(Task): InventoryTask.config = config.load(f) logging.debug("loaded config: %r" % InventoryTask.config) - def _append_to_log(self, level, message): - info = { - 'message': message, - 'level': level - } - logger.debug(json.dumps(info)) - self.messages.append(info) - - def _log(self, level, state, message): - self._append_to_log(level, message) - super().update_state(state=state, meta={'message': message}) - - def log_info(self, message, state=states.STARTED): - self._log('INFO', state, message) - - def log_warning(self, message, state=states.STARTED): - self._log('WARNING', state, message) - - def log_error(self, message, state=states.STARTED): - self._log('ERROR', state, message) - - # def update_state(self, **kwargs): - # meta = {'message': 'none', 'level': 'INFO'} - # meta.update(kwargs.setdefault('meta', dict())) - # meta['task'] = self.name - # meta['worker'] = self.request.hostname - # meta['pid'] = self.pid - # logger.debug(json.dumps( - # {'state': kwargs['state'], 'meta': str(meta)} - # )) - # super().update_state(**kwargs) - - def on_failure(self, exc, task_id, args, kwargs, einfo): - self._append_to_log('ERROR', str(exc)) - save_task_log( - InventoryTask.config, - {'messages': self.messages, 'meta': self._meta(args=args)}) - super().on_failure(exc, task_id, args, kwargs, einfo) - - def on_success(self, retval, task_id, args, kwargs): - self._append_to_log('INFO', 'DONE') - save_task_log( - InventoryTask.config, - {'messages': self.messages, 'meta': self._meta(args=args)}) - super().on_success(retval, task_id, args, kwargs) - - def _meta(self, message=None, args=None): - meta_struct = { - 'task': self.name, - 'id': self.request.id, - 'worker': self.request.hostname, - 'pid': os.getpid(), - 'warning': 'WARNING' in [m['level'] for m in self.messages], - 'error': 'ERROR' in [m['level'] for m in self.messages], - } - if message: - meta_struct['message'] = message - if args is not None: - meta_struct['args'] = args - return meta_struct - - def return_value(self, message='OK'): - """ - common method for constructing a standard task return value - :param message: text message to include in return value - :return: a serializable dict - """ - self.log_info(message) - return self._meta(message=message) + def log_info(self, message): + logger.debug(message) + self.send_event('task-info', message=message) + + def log_warning(self, message): + logger.warning(message) + self.send_event('task-warning', message=message) + + def log_error(self, message): + logger.error(message) + self.send_event('task-error', message=message) @app.task(base=InventoryTask, bind=True, name='snmp_refresh_interfaces') @@ -162,7 +103,7 @@ def snmp_refresh_interfaces(self, hostname, community): r = get_next_redis(InventoryTask.config) r.set(f'snmp-interfaces:{hostname}', json.dumps(value)) - return self.return_value(message=f'snmp info loaded from {hostname}') + self.log_info(f'snmp info loaded from {hostname}') @app.task(base=InventoryTask, bind=True, name='netconf_refresh_config') @@ -186,7 +127,7 @@ def netconf_refresh_config(self, hostname): r = get_next_redis(InventoryTask.config) r.set(f'netconf:{hostname}', netconf_str) - return self.return_value(message=f'netconf info loaded from {hostname}') + self.log_info(f'netconf info loaded from {hostname}') @app.task(base=InventoryTask, bind=True, name='update_interfaces_to_services') @@ -212,8 +153,6 @@ def update_interfaces_to_services(self): json.dumps(services)) rp.execute() - return self.return_value() - @app.task(base=InventoryTask, bind=True, name='import_unmanaged_interfaces') @log_task_entry_and_exit @@ -246,8 +185,6 @@ def import_unmanaged_interfaces(self): json.dumps([ifc])) rp.execute() - return self.return_value() - @app.task(base=InventoryTask, bind=True, name='update_access_services') @log_task_entry_and_exit @@ -277,8 +214,6 @@ def update_access_services(self): json.dumps(service)) rp.execute() - return self.return_value() - @app.task(base=InventoryTask, bind=True, name='update_lg_routers') @log_task_entry_and_exit @@ -296,8 +231,6 @@ def update_lg_routers(self): rp.set(f'opsdb:lg:{router["equipment name"]}', json.dumps(router)) rp.execute() - return self.return_value() - @app.task(base=InventoryTask, bind=True, name='update_equipment_locations') @log_task_entry_and_exit @@ -317,8 +250,6 @@ def update_equipment_locations(self): rp.set('opsdb:location:%s' % h, json.dumps(locations)) rp.execute() - return self.return_value() - @app.task(base=InventoryTask, bind=True, name='update_circuit_hierarchy') @log_task_entry_and_exit @@ -349,8 +280,6 @@ def update_circuit_hierarchy(self): rp.set('opsdb:services:children:%d' % cid, json.dumps(children)) rp.execute() - return self.return_value() - @app.task(base=InventoryTask, bind=True, name='update_geant_lambdas') @log_task_entry_and_exit @@ -370,8 +299,6 @@ def update_geant_lambdas(self): json.dumps(ld)) rp.execute() - return self.return_value() - @app.task(base=InventoryTask, bind=True, name='update_neteng_managed_device_list') @@ -386,8 +313,7 @@ def update_neteng_managed_device_list(self): r = get_next_redis(InventoryTask.config) r.set('netdash', json.dumps(routers).encode('utf-8')) - - return self.return_value(f'saved {len(routers)} managed routers') + self.log_info(f'saved {len(routers)} managed routers') def load_netconf_data(hostname): @@ -547,7 +473,8 @@ def reload_router_config(self, hostname): if new_netconf_timestamp == current_netconf_timestamp: msg = f'no timestamp change for {hostname} netconf data' logger.debug(msg) - return self.return_value(msg) + self.log_info(msg) + return # clear cached classifier responses for this router, and # refresh peering data @@ -569,8 +496,7 @@ def reload_router_config(self, hostname): snmp_refresh_interfaces.apply(args=[hostname, community]) clear_cached_classifier_responses(None) - - return self.return_value(f'updated configuration for {hostname}') + self.log_info(f'updated configuration for {hostname}') def _erase_next_db(config): @@ -612,8 +538,6 @@ def internal_refresh_phase_2(self): t = refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)]) pending_task_ids.append(t.id) - return self.return_value() - @log_task_entry_and_exit def launch_refresh_cache_all(config): @@ -709,8 +633,6 @@ def refresh_finalizer(self, pending_task_ids_json): latch_db(InventoryTask.config) self.log_info('latched current/next dbs') - return self.return_value() - @log_task_entry_and_exit def _build_service_category_interface_list(update_callback=lambda s: None):