diff --git a/inventory_provider/juniper.py b/inventory_provider/juniper.py index c00ab985f1b55ea52aab83939c303e69286f0ec1..f30cb222866f856eb354ae285692727dd79bba10 100644 --- a/inventory_provider/juniper.py +++ b/inventory_provider/juniper.py @@ -115,7 +115,7 @@ def _rpc(hostname, ssh): ssh_private_key_file=ssh['private-key']) try: dev.open() - except EzErrors.ConnectError as e: + except [EzErrors.ConnectError, EzErrors.RpcError] as e: raise ConnectionError(str(e)) return dev.rpc diff --git a/inventory_provider/tasks/common.py b/inventory_provider/tasks/common.py index 292800572d841b3474714e68750faeeb3108dc63..2d7f699f250d08a5db6fa344896fdf20712cb159 100644 --- a/inventory_provider/tasks/common.py +++ b/inventory_provider/tasks/common.py @@ -203,9 +203,3 @@ def get_next_redis(config): 'derived next id: {}'.format(next_id)) return _get_redis(config, next_id) - - -def save_task_log(config, log): - jsonschema.validate(log, TASK_LOG_SCHEMA) - r = get_current_redis(config) - r.set(f'db:log:{log["meta"]["id"]}', json.dumps(log)) diff --git a/inventory_provider/tasks/config.py b/inventory_provider/tasks/config.py index bf535b2ee4d741ae82a2ec1145f508d75ccbbbca..911aae9f547304f3d85a7884892733434063a7a5 100644 --- a/inventory_provider/tasks/config.py +++ b/inventory_provider/tasks/config.py @@ -47,3 +47,4 @@ logger.debug('broker_url: %r' % broker_url) task_eager_propagates = True task_track_started = True +worker_send_task_events = True diff --git a/inventory_provider/tasks/monitor.py b/inventory_provider/tasks/monitor.py new file mode 100644 index 0000000000000000000000000000000000000000..5481c9b6ade1acac493a04a53665c4cca5bd93e5 --- /dev/null +++ b/inventory_provider/tasks/monitor.py @@ -0,0 +1,68 @@ +""" +standalone process that monitors celery task events and +writes them to redis for reporting + +as with the other processes, INVENTORY_PROVIDER_CONFIG_FILENAME +must be defined in the environment +""" +import json +import logging +import os +from inventory_provider import config, environment +from inventory_provider.tasks.worker import app +from inventory_provider.tasks.common import get_current_redis + +logger = logging.getLogger(__name__) +INFO_EVENT_TYPES = ('task-info', 'task-warning', 'task-error') + + +def run(): + """ + save 'task-*' events to redis, never returns + """ + environment.setup_logging() + + with open(os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']) as f: + logging.info( + 'loading config from: %r' + % os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']) + config_params = config.load(f) + + state = app.events.State() + + def _log_event(event): + state.event(event) + + if not event['type'].startswith('task-'): + return + + key = f'joblog:{event["uuid"]}:{event["type"]}' + if event['type'] in INFO_EVENT_TYPES: + key += f':{event["clock"]}' + + r = get_current_redis(config_params) + r.set(key, json.dumps(event)) + + logger.debug(f'{key}: {json.dumps(event)}') + + with app.connection() as connection: + recv = app.events.Receiver(connection, handlers={ + '*': _log_event + }) + recv.capture(limit=None, timeout=None, wakeup=True) + + +def clear_joblog(r): + """ + :param r: + :return: + """ + rp = r.pipeline() + for key in r.scan_iter('joblog:*'): + rp.delete(key) + rp.execute() + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + run() diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index 591875038d6d2d35ebb4ddf6f319f6dc8afbd99c..cf861ded2d10158db2b9f72bc7c5cd2d61def8ba 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -12,7 +12,7 @@ import jsonschema from inventory_provider.tasks.app import app from inventory_provider.tasks.common \ - import get_next_redis, get_current_redis, save_task_log, \ + import get_next_redis, get_current_redis, \ latch_db, get_latch, set_latch, update_latch_status from inventory_provider.tasks import data from inventory_provider import config @@ -53,7 +53,6 @@ class InventoryTask(Task): def __init__(self): self.args = [] - self.messages = [] if InventoryTask.config: return @@ -70,75 +69,17 @@ class InventoryTask(Task): InventoryTask.config = config.load(f) logging.debug("loaded config: %r" % InventoryTask.config) - def _append_to_log(self, level, message): - info = { - 'message': message, - 'level': level - } - logger.debug(json.dumps(info)) - self.messages.append(info) - - def _log(self, level, state, message): - self._append_to_log(level, message) - super().update_state(state=state, meta={'message': message}) - - def log_info(self, message, state=states.STARTED): - self._log('INFO', state, message) - - def log_warning(self, message, state=states.STARTED): - self._log('WARNING', state, message) - - def log_error(self, message, state=states.STARTED): - self._log('ERROR', state, message) - - # def update_state(self, **kwargs): - # meta = {'message': 'none', 'level': 'INFO'} - # meta.update(kwargs.setdefault('meta', dict())) - # meta['task'] = self.name - # meta['worker'] = self.request.hostname - # meta['pid'] = self.pid - # logger.debug(json.dumps( - # {'state': kwargs['state'], 'meta': str(meta)} - # )) - # super().update_state(**kwargs) - - def on_failure(self, exc, task_id, args, kwargs, einfo): - self._append_to_log('ERROR', str(exc)) - save_task_log( - InventoryTask.config, - {'messages': self.messages, 'meta': self._meta(args=args)}) - super().on_failure(exc, task_id, args, kwargs, einfo) - - def on_success(self, retval, task_id, args, kwargs): - self._append_to_log('INFO', 'DONE') - save_task_log( - InventoryTask.config, - {'messages': self.messages, 'meta': self._meta(args=args)}) - super().on_success(retval, task_id, args, kwargs) - - def _meta(self, message=None, args=None): - meta_struct = { - 'task': self.name, - 'id': self.request.id, - 'worker': self.request.hostname, - 'pid': os.getpid(), - 'warning': 'WARNING' in [m['level'] for m in self.messages], - 'error': 'ERROR' in [m['level'] for m in self.messages], - } - if message: - meta_struct['message'] = message - if args is not None: - meta_struct['args'] = args - return meta_struct - - def return_value(self, message='OK'): - """ - common method for constructing a standard task return value - :param message: text message to include in return value - :return: a serializable dict - """ - self.log_info(message) - return self._meta(message=message) + def log_info(self, message): + logger.debug(message) + self.send_event('task-info', message=message) + + def log_warning(self, message): + logger.warning(message) + self.send_event('task-warning', message=message) + + def log_error(self, message): + logger.error(message) + self.send_event('task-error', message=message) @app.task(base=InventoryTask, bind=True, name='snmp_refresh_interfaces') @@ -162,7 +103,7 @@ def snmp_refresh_interfaces(self, hostname, community): r = get_next_redis(InventoryTask.config) r.set(f'snmp-interfaces:{hostname}', json.dumps(value)) - return self.return_value(message=f'snmp info loaded from {hostname}') + self.log_info(f'snmp info loaded from {hostname}') @app.task(base=InventoryTask, bind=True, name='netconf_refresh_config') @@ -186,7 +127,7 @@ def netconf_refresh_config(self, hostname): r = get_next_redis(InventoryTask.config) r.set(f'netconf:{hostname}', netconf_str) - return self.return_value(message=f'netconf info loaded from {hostname}') + self.log_info(f'netconf info loaded from {hostname}') @app.task(base=InventoryTask, bind=True, name='update_interfaces_to_services') @@ -212,8 +153,6 @@ def update_interfaces_to_services(self): json.dumps(services)) rp.execute() - return self.return_value() - @app.task(base=InventoryTask, bind=True, name='import_unmanaged_interfaces') @log_task_entry_and_exit @@ -246,8 +185,6 @@ def import_unmanaged_interfaces(self): json.dumps([ifc])) rp.execute() - return self.return_value() - @app.task(base=InventoryTask, bind=True, name='update_access_services') @log_task_entry_and_exit @@ -277,8 +214,6 @@ def update_access_services(self): json.dumps(service)) rp.execute() - return self.return_value() - @app.task(base=InventoryTask, bind=True, name='update_lg_routers') @log_task_entry_and_exit @@ -296,8 +231,6 @@ def update_lg_routers(self): rp.set(f'opsdb:lg:{router["equipment name"]}', json.dumps(router)) rp.execute() - return self.return_value() - @app.task(base=InventoryTask, bind=True, name='update_equipment_locations') @log_task_entry_and_exit @@ -317,8 +250,6 @@ def update_equipment_locations(self): rp.set('opsdb:location:%s' % h, json.dumps(locations)) rp.execute() - return self.return_value() - @app.task(base=InventoryTask, bind=True, name='update_circuit_hierarchy') @log_task_entry_and_exit @@ -349,8 +280,6 @@ def update_circuit_hierarchy(self): rp.set('opsdb:services:children:%d' % cid, json.dumps(children)) rp.execute() - return self.return_value() - @app.task(base=InventoryTask, bind=True, name='update_geant_lambdas') @log_task_entry_and_exit @@ -370,8 +299,6 @@ def update_geant_lambdas(self): json.dumps(ld)) rp.execute() - return self.return_value() - @app.task(base=InventoryTask, bind=True, name='update_neteng_managed_device_list') @@ -386,8 +313,7 @@ def update_neteng_managed_device_list(self): r = get_next_redis(InventoryTask.config) r.set('netdash', json.dumps(routers).encode('utf-8')) - - return self.return_value(f'saved {len(routers)} managed routers') + self.log_info(f'saved {len(routers)} managed routers') def load_netconf_data(hostname): @@ -547,7 +473,8 @@ def reload_router_config(self, hostname): if new_netconf_timestamp == current_netconf_timestamp: msg = f'no timestamp change for {hostname} netconf data' logger.debug(msg) - return self.return_value(msg) + self.log_info(msg) + return # clear cached classifier responses for this router, and # refresh peering data @@ -569,8 +496,7 @@ def reload_router_config(self, hostname): snmp_refresh_interfaces.apply(args=[hostname, community]) clear_cached_classifier_responses(None) - - return self.return_value(f'updated configuration for {hostname}') + self.log_info(f'updated configuration for {hostname}') def _erase_next_db(config): @@ -612,8 +538,6 @@ def internal_refresh_phase_2(self): t = refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)]) pending_task_ids.append(t.id) - return self.return_value() - @log_task_entry_and_exit def launch_refresh_cache_all(config): @@ -709,8 +633,6 @@ def refresh_finalizer(self, pending_task_ids_json): latch_db(InventoryTask.config) self.log_info('latched current/next dbs') - return self.return_value() - @log_task_entry_and_exit def _build_service_category_interface_list(update_callback=lambda s: None):