From dbf3562bc879530a334c1d112a7d07b1f4b55004 Mon Sep 17 00:00:00 2001 From: Erik Reid <erik.reid@geant.org> Date: Thu, 28 May 2020 19:50:58 +0200 Subject: [PATCH] added a layer for saving log messages to current db --- inventory_provider/tasks/common.py | 46 +++++++ inventory_provider/tasks/worker.py | 190 ++++++++++++++--------------- 2 files changed, 138 insertions(+), 98 deletions(-) diff --git a/inventory_provider/tasks/common.py b/inventory_provider/tasks/common.py index 3a474c8e..1e201938 100644 --- a/inventory_provider/tasks/common.py +++ b/inventory_provider/tasks/common.py @@ -24,6 +24,46 @@ DB_LATCH_SCHEMA = { "additionalProperties": False } +TASK_LOG_SCHEMA = { + '$schema': 'http://json-schema.org/draft-07/schema#', + + 'definitions': { + 'meta': { + 'type': 'object', + 'properties': { + 'task': {'type': ['string', 'null']}, + 'id': {'type': ['string', 'null']}, + 'worker': {'type': ['string', 'null']}, + 'pid': {'type': 'integer'}, + 'warning': {'type': 'boolean'}, + 'error': {'type': 'boolean'} + }, + 'required': ['task', 'id', 'worker', 'pid', 'warning', 'error'], + 'additionalProperties': False + }, + 'message': { + 'type': 'object', + 'properties': { + 'message': {'type': 'string'}, + 'level': { + 'type': 'string', + 'enum': ['INFO', 'WARNING', 'ERROR'] + } + } + } + }, + + 'type': 'object', + 'properties': { + 'meta': {'$ref': '#/definitions/meta'}, + 'messages': { + 'type': 'array', + 'items': {'$ref': '#/definitions/message'} + } + }, + 'required': ['meta', 'messages'], + 'additionalProperties': False +} def get_latch(r): latch = r.get('db:latch') @@ -162,3 +202,9 @@ def get_next_redis(config): 'derived next id: {}'.format(next_id)) return _get_redis(config, next_id) + + +def save_task_log(config, log): + jsonschema.validate(log, TASK_LOG_SCHEMA) + r = get_current_redis(config) + r.set(f'db:log:{log["meta"]["id"]}', json.dumps(log)) diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index 6af7b324..56ec176c 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -12,7 +12,7 @@ import jsonschema from inventory_provider.tasks.app import app from inventory_provider.tasks.common \ - import get_next_redis, get_current_redis, \ + import get_next_redis, get_current_redis, save_task_log, \ latch_db, get_latch, set_latch, update_latch_status from inventory_provider.tasks import data from inventory_provider import config @@ -52,7 +52,8 @@ class InventoryTask(Task): def __init__(self): - self.pid = os.getpid() + self.args = [] + self.messages = [] if InventoryTask.config: return @@ -69,52 +70,84 @@ class InventoryTask(Task): InventoryTask.config = config.load(f) logging.debug("loaded config: %r" % InventoryTask.config) - def update_state(self, **kwargs): - meta = kwargs.setdefault('meta', dict()) - meta['task'] = self.name - meta['worker'] = self.request.hostname - meta['pid'] = self.pid - logger.debug(json.dumps( - {'state': kwargs['state'], 'meta': str(meta)} - )) - super().update_state(**kwargs) + def _append_to_log(self, level, message): + info = { + 'message': message, + 'level': level + } + logger.debug(json.dumps(info)) + self.messages.append(info) + + def _log(self, level, state, message): + self._append_to_log(level, message) + super().update_state(state=state, meta={'message': message}) + + def log_info(self, message, state=states.STARTED): + self._log('INFO', state, message) + + def log_warning(self, message, state=states.STARTED): + self._log('WARNING', state, message) + + def log_error(self, message, state=states.STARTED): + self._log('ERROR', state, message) + + # def update_state(self, **kwargs): + # meta = {'message': 'none', 'level': 'INFO'} + # meta.update(kwargs.setdefault('meta', dict())) + # meta['task'] = self.name + # meta['worker'] = self.request.hostname + # meta['pid'] = self.pid + # logger.debug(json.dumps( + # {'state': kwargs['state'], 'meta': str(meta)} + # )) + # super().update_state(**kwargs) def on_failure(self, exc, task_id, args, kwargs, einfo): - logger.exception(exc) + self._append_to_log('ERROR', str(exc)) + save_task_log( + InventoryTask.config, + {'messages': self.messages, 'meta': self._meta()}) super().on_failure(exc, task_id, args, kwargs, einfo) - def _task_return_value(self, warning, message): - """ - common method for constructing a standard task return value - :param warning: boolean (False for normal, warning-free response) - :param message: text message to include in return value - :return: a serializable dict - """ - return { + def on_success(self, retval, task_id, args, kwargs): + self._append_to_log('INFO', 'DONE') + save_task_log( + InventoryTask.config, + {'messages': self.messages, 'meta': self._meta()}) + super().on_success(retval, task_id, args, kwargs) + + def _meta(self, message = None): + meta_struct = { 'task': self.name, 'id': self.request.id, 'worker': self.request.hostname, - 'pid': self.pid, - 'warning': warning, - 'message': message + 'pid': os.getpid(), + 'warning': 'WARNING' in [m['level'] for m in self.messages], + 'error': 'ERROR' in [m['level'] for m in self.messages], } + if message: + meta_struct['message'] = message + return meta_struct - def success(self, message='OK'): - return self._task_return_value(warning=False, message=message) - - def warning(self, message='WARNING'): - return self._task_return_value(warning=True, message=message) + def return_value(self, message='OK'): + """ + common method for constructing a standard task return value + :param message: text message to include in return value + :return: a serializable dict + """ + self.log_info(message) + return self._meta(message=message) @app.task(base=InventoryTask, bind=True, name='snmp_refresh_interfaces') @log_task_entry_and_exit def snmp_refresh_interfaces(self, hostname, community): - warning = False try: value = list(snmp.get_router_snmp_indexes(hostname, community)) except ConnectionError: - logger.exception(f'error loading snmp data from {hostname}') - warning = True + msg = f'error loading snmp data from {hostname}' + logger.exception(msg) + self.log_warning(msg) r = get_current_redis(InventoryTask.config) value = r.get(f'snmp-interfaces:{hostname}') if not value: @@ -123,43 +156,35 @@ def snmp_refresh_interfaces(self, hostname, community): f' and no cached netconf data found') # unnecessary json encode/decode here ... could be optimized value = json.loads(value.decode('utf-8')) + self.log_warning(f'using cached snmp data for {hostname}') r = get_next_redis(InventoryTask.config) r.set(f'snmp-interfaces:{hostname}', json.dumps(value)) - - if warning: - return self.warning( - message=f'i/o error with {hostname}, using cached snmp info') - else: - return self.success(message=f'snmp info loaded from {hostname}') + return self.return_value(message=f'snmp info loaded from {hostname}') @app.task(base=InventoryTask, bind=True, name='netconf_refresh_config') @log_task_entry_and_exit def netconf_refresh_config(self, hostname): - warning = False try: netconf_doc = juniper.load_config(hostname, InventoryTask.config["ssh"]) netconf_str = etree.tostring(netconf_doc, encoding='unicode') except ConnectionError: - logger.exception(f'error loading netconf data from {hostname}') - warning = True + msg = f'error loading netconf data from {hostname}' + logger.exception(msg) + self.log_warning(msg) r = get_current_redis(InventoryTask.config) netconf_str = r.get(f'netconf:{hostname}') if not netconf_str: raise InventoryTaskError( f'netconf error with {hostname}' f' and no cached netconf data found') + self.log_warning(f'using cached netconf data for {hostname}') r = get_next_redis(InventoryTask.config) r.set(f'netconf:{hostname}', netconf_str) - - if warning: - return self.warning( - message=f'i/o error with {hostname}, using cached netconf info') - else: - return self.success(message=f'netconf info loaded from {hostname}') + return self.return_value(message=f'netconf info loaded from {hostname}') @app.task(base=InventoryTask, bind=True, name='update_interfaces_to_services') @@ -185,7 +210,7 @@ def update_interfaces_to_services(self): json.dumps(services)) rp.execute() - return self.success() + return self.return_value() @app.task(base=InventoryTask, bind=True, name='import_unmanaged_interfaces') @@ -219,7 +244,7 @@ def import_unmanaged_interfaces(self): json.dumps([ifc])) rp.execute() - return self.success() + return self.return_value() @app.task(base=InventoryTask, bind=True, name='update_access_services') @@ -250,7 +275,7 @@ def update_access_services(self): json.dumps(service)) rp.execute() - return self.success() + return self.return_value() @app.task(base=InventoryTask, bind=True, name='update_lg_routers') @@ -269,7 +294,7 @@ def update_lg_routers(self): rp.set(f'opsdb:lg:{router["equipment name"]}', json.dumps(router)) rp.execute() - return self.success() + return self.return_value() @app.task(base=InventoryTask, bind=True, name='update_equipment_locations') @@ -290,7 +315,7 @@ def update_equipment_locations(self): rp.set('opsdb:location:%s' % h, json.dumps(locations)) rp.execute() - return self.success() + return self.return_value() @app.task(base=InventoryTask, bind=True, name='update_circuit_hierarchy') @@ -322,7 +347,7 @@ def update_circuit_hierarchy(self): rp.set('opsdb:services:children:%d' % cid, json.dumps(children)) rp.execute() - return self.success() + return self.return_value() @app.task(base=InventoryTask, bind=True, name='update_geant_lambdas') @@ -343,32 +368,24 @@ def update_geant_lambdas(self): json.dumps(ld)) rp.execute() - return self.success() + return self.return_value() @app.task(base=InventoryTask, bind=True, name='update_neteng_managed_device_list') @log_task_entry_and_exit def update_neteng_managed_device_list(self): - self.update_state( - state=states.STARTED, - meta={ - 'message': 'querying netdash for managed routers' - }) + self.log_info('querying netdash for managed routers') routers = list(juniper.load_routers_from_netdash( InventoryTask.config['managed-routers'])) - self.update_state( - state=states.STARTED, - meta={ - 'message': f'found {len(routers)} routers, saving details' - }) + self.log_info(f'found {len(routers)} routers, saving details') r = get_next_redis(InventoryTask.config) r.set('netdash', json.dumps(routers).encode('utf-8')) - return self.success(f'saved {len(routers)} managed routers') + return self.return_value(f'saved {len(routers)} managed routers') def load_netconf_data(hostname): @@ -503,12 +520,7 @@ def refresh_juniper_interface_list(hostname, netconf): @app.task(base=InventoryTask, bind=True, name='reload_router_config') @log_task_entry_and_exit def reload_router_config(self, hostname): - self.update_state( - state=states.STARTED, - meta={ - 'hostname': hostname, - 'message': f'loading netconf data for {hostname}' - }) + self.log_info(f'loading netconf data for {hostname}') # get the timestamp for the current netconf data current_netconf_timestamp = None @@ -533,16 +545,11 @@ def reload_router_config(self, hostname): if new_netconf_timestamp == current_netconf_timestamp: msg = f'no timestamp change for {hostname} netconf data' logger.debug(msg) - return self.success(msg) + return self.return_value(msg) # clear cached classifier responses for this router, and # refresh peering data - self.update_state( - state=states.STARTED, - meta={ - 'hostname': hostname, - 'message': f'refreshing peers & clearing cache for {hostname}' - }) + self.log_info(f'refreshing peers & clearing cache for {hostname}') refresh_ix_public_peers(hostname, netconf_doc) refresh_vpn_rr_peers(hostname, netconf_doc) refresh_interface_address_lookups(hostname, netconf_doc) @@ -555,18 +562,13 @@ def reload_router_config(self, hostname): raise InventoryTaskError( f'error extracting community string for {hostname}') else: - self.update_state( - state=states.STARTED, - meta={ - 'hostname': hostname, - 'message': f'refreshing snmp interface indexes for {hostname}' - }) + self.log_info(f'refreshing snmp interface indexes for {hostname}') # load snmp data, in this thread snmp_refresh_interfaces.apply(args=[hostname, community]) clear_cached_classifier_responses(None) - return self.success(f'updated configuration for {hostname}') + return self.return_value(f'updated configuration for {hostname}') def _erase_next_db(config): @@ -608,7 +610,7 @@ def internal_refresh_phase_2(self): t = refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)]) pending_task_ids.append(t.id) - return self.success() + return self.return_value() @log_task_entry_and_exit @@ -687,22 +689,14 @@ def refresh_finalizer(self, pending_task_ids_json): "items": {"type": "string"} } - def _update(s): - logger.debug(s) - self.update_state( - state=states.STARTED, - meta={ - 'message': s - }) - try: task_ids = json.loads(pending_task_ids_json) logger.debug('task_ids: %r' % task_ids) jsonschema.validate(task_ids, input_schema) - _wait_for_tasks(task_ids, update_callback=_update) - _build_subnet_db(update_callback=_update) - _build_service_category_interface_list(update_callback=_update) + _wait_for_tasks(task_ids, update_callback=self.log_info) + _build_subnet_db(update_callback=self.log_info) + _build_service_category_interface_list(update_callback=self.log_info) except (jsonschema.ValidationError, json.JSONDecodeError, @@ -711,9 +705,9 @@ def refresh_finalizer(self, pending_task_ids_json): raise e latch_db(InventoryTask.config) - _update('latched current/next dbs') + self.log_info('latched current/next dbs') - return self.success() + return self.return_value() @log_task_entry_and_exit -- GitLab