diff --git a/inventory_provider/juniper.py b/inventory_provider/juniper.py index c00ab985f1b55ea52aab83939c303e69286f0ec1..f30cb222866f856eb354ae285692727dd79bba10 100644 --- a/inventory_provider/juniper.py +++ b/inventory_provider/juniper.py @@ -115,7 +115,7 @@ def _rpc(hostname, ssh): ssh_private_key_file=ssh['private-key']) try: dev.open() - except EzErrors.ConnectError as e: + except [EzErrors.ConnectError, EzErrors.RpcError] as e: raise ConnectionError(str(e)) return dev.rpc diff --git a/inventory_provider/snmp.py b/inventory_provider/snmp.py index ded726e74bbe26357a6a3187e1b8f0b60ce1cd06..7947cf35382cff01595b33f5d42f5f54b6536e2f 100644 --- a/inventory_provider/snmp.py +++ b/inventory_provider/snmp.py @@ -10,6 +10,10 @@ from pysnmp.smi import builder, compiler RFC1213_MIB_IFDESC = '1.3.6.1.2.1.2.2.1.2' +class SNMPWalkError(ConnectionError): + pass + + def _v6address_oid2str(dotted_decimal): hex_params = [] for dec in re.split(r'\.', dotted_decimal): @@ -61,15 +65,19 @@ def walk(agent_hostname, community, base_oid): # pragma: no cover # cf. http://snmplabs.com/ # pysnmp/examples/hlapi/asyncore/sync/contents.html - assert not engineErrorIndication, ( - f'snmp response engine error indication: ' - f'{str(engineErrorIndication)} - {agent_hostname}') - assert not pduErrorIndication, 'snmp response pdu error %r at %r' % ( - pduErrorIndication, - errorIndex and varBinds[int(errorIndex) - 1][0] or '?') - assert errorIndex == 0, ( - 'sanity failure: errorIndex != 0, ' - 'but no error indication') + if engineErrorIndication: + raise SNMPWalkError( + f'snmp response engine error indication: ' + f'{str(engineErrorIndication)} - {agent_hostname}') + if pduErrorIndication: + raise SNMPWalkError( + 'snmp response pdu error %r at %r' % ( + pduErrorIndication, + errorIndex and varBinds[int(errorIndex) - 1][0] or '?')) + if errorIndex == 0: + raise SNMPWalkError( + 'sanity failure: errorIndex != 0, ' + 'but no error indication') # varBinds = [ # rfc1902.ObjectType(rfc1902.ObjectIdentity(x[0]),x[1]) @@ -84,7 +92,7 @@ def walk(agent_hostname, community, base_oid): # pragma: no cover def get_router_snmp_indexes(hostname, community): for ifc in walk(hostname, community, RFC1213_MIB_IFDESC): m = re.match(r'.*\.(\d+)$', ifc['oid']) - assert m, 'sanity failure parsing oid: %r' % ifc['oid'] + assert m, f'sanity failure parsing oid: {ifc["oid"]}' yield { 'name': ifc['value'], 'index': int(m.group(1)) diff --git a/inventory_provider/tasks/common.py b/inventory_provider/tasks/common.py index 3a474c8e13d03042c3aeaaff94e1eeeb07cc5797..ac85afb7de7094517c95cf2ed5f2929b211f6afc 100644 --- a/inventory_provider/tasks/common.py +++ b/inventory_provider/tasks/common.py @@ -24,6 +24,48 @@ DB_LATCH_SCHEMA = { "additionalProperties": False } +TASK_LOG_SCHEMA = { + '$schema': 'http://json-schema.org/draft-07/schema#', + + 'definitions': { + 'meta': { + 'type': 'object', + 'properties': { + 'task': {'type': 'string'}, + 'id': {'type': 'string'}, + 'worker': {'type': 'string'}, + 'pid': {'type': 'integer'}, + 'warning': {'type': 'boolean'}, + 'error': {'type': 'boolean'}, + 'args': {'type': 'array'} + }, + 'required': ['task', 'id', 'worker', 'pid', 'warning', 'error'], + 'additionalProperties': False + }, + 'message': { + 'type': 'object', + 'properties': { + 'message': {'type': 'string'}, + 'level': { + 'type': 'string', + 'enum': ['INFO', 'WARNING', 'ERROR'] + } + } + } + }, + + 'type': 'object', + 'properties': { + 'meta': {'$ref': '#/definitions/meta'}, + 'messages': { + 'type': 'array', + 'items': {'$ref': '#/definitions/message'} + } + }, + 'required': ['meta', 'messages'], + 'additionalProperties': False +} + def get_latch(r): latch = r.get('db:latch') diff --git a/inventory_provider/tasks/config.py b/inventory_provider/tasks/config.py index bf535b2ee4d741ae82a2ec1145f508d75ccbbbca..911aae9f547304f3d85a7884892733434063a7a5 100644 --- a/inventory_provider/tasks/config.py +++ b/inventory_provider/tasks/config.py @@ -47,3 +47,4 @@ logger.debug('broker_url: %r' % broker_url) task_eager_propagates = True task_track_started = True +worker_send_task_events = True diff --git a/inventory_provider/tasks/monitor.py b/inventory_provider/tasks/monitor.py new file mode 100644 index 0000000000000000000000000000000000000000..5481c9b6ade1acac493a04a53665c4cca5bd93e5 --- /dev/null +++ b/inventory_provider/tasks/monitor.py @@ -0,0 +1,68 @@ +""" +standalone process that monitors celery task events and +writes them to redis for reporting + +as with the other processes, INVENTORY_PROVIDER_CONFIG_FILENAME +must be defined in the environment +""" +import json +import logging +import os +from inventory_provider import config, environment +from inventory_provider.tasks.worker import app +from inventory_provider.tasks.common import get_current_redis + +logger = logging.getLogger(__name__) +INFO_EVENT_TYPES = ('task-info', 'task-warning', 'task-error') + + +def run(): + """ + save 'task-*' events to redis, never returns + """ + environment.setup_logging() + + with open(os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']) as f: + logging.info( + 'loading config from: %r' + % os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']) + config_params = config.load(f) + + state = app.events.State() + + def _log_event(event): + state.event(event) + + if not event['type'].startswith('task-'): + return + + key = f'joblog:{event["uuid"]}:{event["type"]}' + if event['type'] in INFO_EVENT_TYPES: + key += f':{event["clock"]}' + + r = get_current_redis(config_params) + r.set(key, json.dumps(event)) + + logger.debug(f'{key}: {json.dumps(event)}') + + with app.connection() as connection: + recv = app.events.Receiver(connection, handlers={ + '*': _log_event + }) + recv.capture(limit=None, timeout=None, wakeup=True) + + +def clear_joblog(r): + """ + :param r: + :return: + """ + rp = r.pipeline() + for key in r.scan_iter('joblog:*'): + rp.delete(key) + rp.execute() + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + run() diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index 6c965ae95e535624b5d838495e9df8df00780989..48058c7cb3d2ad2db081e0a5b5c6d68649eae73f 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -12,7 +12,8 @@ import jsonschema from inventory_provider.tasks.app import app from inventory_provider.tasks.common \ - import get_next_redis, latch_db, get_latch, set_latch, update_latch_status + import get_next_redis, get_current_redis, \ + latch_db, get_latch, set_latch, update_latch_status from inventory_provider.tasks import data from inventory_provider import config from inventory_provider import environment @@ -51,7 +52,7 @@ class InventoryTask(Task): def __init__(self): - self.pid = os.getpid() + self.args = [] if InventoryTask.config: return @@ -68,62 +69,66 @@ class InventoryTask(Task): InventoryTask.config = config.load(f) logging.debug("loaded config: %r" % InventoryTask.config) - def update_state(self, **kwargs): - meta = kwargs.setdefault('meta', dict()) - meta['task'] = self.name - meta['worker'] = self.request.hostname - meta['pid'] = self.pid - logger.debug(json.dumps( - {'state': kwargs['state'], 'meta': str(meta)} - )) - super().update_state(**kwargs) - - def on_failure(self, exc, task_id, args, kwargs, einfo): - logger.exception(exc) - super().on_failure(exc, task_id, args, kwargs, einfo) - - def _task_return_value(self, warning, message): - """ - common method for constructing a standard task return value - :param warning: boolean (False for normal, warning-free response) - :param message: text message to include in return value - :return: a serializable dict - """ - return { - 'task': self.name, - 'id': self.request.id, - 'worker': self.request.hostname, - 'pid': self.pid, - 'warning': warning, - 'message': message - } + def log_info(self, message): + logger.debug(message) + self.send_event('task-info', message=message) - def success(self, message='OK'): - return self._task_return_value(warning=False, message=message) + def log_warning(self, message): + logger.warning(message) + self.send_event('task-warning', message=message) - def warning(self, message='WARNING'): - return self._task_return_value(warning=True, message=message) + def log_error(self, message): + logger.error(message) + self.send_event('task-error', message=message) @app.task(base=InventoryTask, bind=True, name='snmp_refresh_interfaces') @log_task_entry_and_exit def snmp_refresh_interfaces(self, hostname, community): - # TODO: [DBOARD3-242] copy from current redis in case of error - value = list(snmp.get_router_snmp_indexes(hostname, community)) + try: + value = list(snmp.get_router_snmp_indexes(hostname, community)) + except ConnectionError: + msg = f'error loading snmp data from {hostname}' + logger.exception(msg) + self.log_warning(msg) + r = get_current_redis(InventoryTask.config) + value = r.get(f'snmp-interfaces:{hostname}') + if not value: + raise InventoryTaskError( + f'snmp error with {hostname}' + f' and no cached netconf data found') + # unnecessary json encode/decode here ... could be optimized + value = json.loads(value.decode('utf-8')) + self.log_warning(f'using cached snmp data for {hostname}') + r = get_next_redis(InventoryTask.config) - r.set('snmp-interfaces:' + hostname, json.dumps(value)) - return self.success(message=f'snmp info loaded from {hostname}') + r.set(f'snmp-interfaces:{hostname}', json.dumps(value)) + self.log_info(f'snmp info loaded from {hostname}') @app.task(base=InventoryTask, bind=True, name='netconf_refresh_config') @log_task_entry_and_exit def netconf_refresh_config(self, hostname): - # TODO: [DBOARD3-242] copy from current redis in case of error - netconf_doc = juniper.load_config(hostname, InventoryTask.config["ssh"]) - netconf_str = etree.tostring(netconf_doc, encoding='unicode') + + try: + netconf_doc = juniper.load_config( + hostname, InventoryTask.config["ssh"]) + netconf_str = etree.tostring(netconf_doc, encoding='unicode') + except ConnectionError: + msg = f'error loading netconf data from {hostname}' + logger.exception(msg) + self.log_warning(msg) + r = get_current_redis(InventoryTask.config) + netconf_str = r.get(f'netconf:{hostname}') + if not netconf_str: + raise InventoryTaskError( + f'netconf error with {hostname}' + f' and no cached netconf data found') + self.log_warning(f'using cached netconf data for {hostname}') + r = get_next_redis(InventoryTask.config) - r.set('netconf:' + hostname, netconf_str) - return self.success(message=f'netconf info loaded from {hostname}') + r.set(f'netconf:{hostname}', netconf_str) + self.log_info(f'netconf info loaded from {hostname}') @app.task(base=InventoryTask, bind=True, name='update_interfaces_to_services') @@ -149,8 +154,6 @@ def update_interfaces_to_services(self): json.dumps(services)) rp.execute() - return self.success() - @app.task(base=InventoryTask, bind=True, name='import_unmanaged_interfaces') @log_task_entry_and_exit @@ -183,8 +186,6 @@ def import_unmanaged_interfaces(self): json.dumps([ifc])) rp.execute() - return self.success() - @app.task(base=InventoryTask, bind=True, name='update_access_services') @log_task_entry_and_exit @@ -214,8 +215,6 @@ def update_access_services(self): json.dumps(service)) rp.execute() - return self.success() - @app.task(base=InventoryTask, bind=True, name='update_lg_routers') @log_task_entry_and_exit @@ -233,8 +232,6 @@ def update_lg_routers(self): rp.set(f'opsdb:lg:{router["equipment name"]}', json.dumps(router)) rp.execute() - return self.success() - @app.task(base=InventoryTask, bind=True, name='update_equipment_locations') @log_task_entry_and_exit @@ -254,8 +251,6 @@ def update_equipment_locations(self): rp.set('opsdb:location:%s' % h, json.dumps(locations)) rp.execute() - return self.success() - @app.task(base=InventoryTask, bind=True, name='update_circuit_hierarchy') @log_task_entry_and_exit @@ -286,8 +281,6 @@ def update_circuit_hierarchy(self): rp.set('opsdb:services:children:%d' % cid, json.dumps(children)) rp.execute() - return self.success() - @app.task(base=InventoryTask, bind=True, name='update_geant_lambdas') @log_task_entry_and_exit @@ -307,32 +300,21 @@ def update_geant_lambdas(self): json.dumps(ld)) rp.execute() - return self.success() - @app.task(base=InventoryTask, bind=True, name='update_neteng_managed_device_list') @log_task_entry_and_exit def update_neteng_managed_device_list(self): - self.update_state( - state=states.STARTED, - meta={ - 'message': 'querying netdash for managed routers' - }) + self.log_info('querying netdash for managed routers') routers = list(juniper.load_routers_from_netdash( InventoryTask.config['managed-routers'])) - self.update_state( - state=states.STARTED, - meta={ - 'message': f'found {len(routers)} routers, saving details' - }) + self.log_info(f'found {len(routers)} routers, saving details') r = get_next_redis(InventoryTask.config) r.set('netdash', json.dumps(routers).encode('utf-8')) - - return self.success(f'saved {len(routers)} managed routers') + self.log_info(f'saved {len(routers)} managed routers') def load_netconf_data(hostname): @@ -467,12 +449,7 @@ def refresh_juniper_interface_list(hostname, netconf): @app.task(base=InventoryTask, bind=True, name='reload_router_config') @log_task_entry_and_exit def reload_router_config(self, hostname): - self.update_state( - state=states.STARTED, - meta={ - 'hostname': hostname, - 'message': f'loading netconf data for {hostname}' - }) + self.log_info(f'loading netconf data for {hostname}') # get the timestamp for the current netconf data current_netconf_timestamp = None @@ -497,16 +474,12 @@ def reload_router_config(self, hostname): if new_netconf_timestamp == current_netconf_timestamp: msg = f'no timestamp change for {hostname} netconf data' logger.debug(msg) - return self.success(msg) + self.log_info(msg) + return # clear cached classifier responses for this router, and # refresh peering data - self.update_state( - state=states.STARTED, - meta={ - 'hostname': hostname, - 'message': f'refreshing peers & clearing cache for {hostname}' - }) + self.log_info(f'refreshing peers & clearing cache for {hostname}') refresh_ix_public_peers(hostname, netconf_doc) refresh_vpn_rr_peers(hostname, netconf_doc) refresh_interface_address_lookups(hostname, netconf_doc) @@ -519,18 +492,12 @@ def reload_router_config(self, hostname): raise InventoryTaskError( f'error extracting community string for {hostname}') else: - self.update_state( - state=states.STARTED, - meta={ - 'hostname': hostname, - 'message': f'refreshing snmp interface indexes for {hostname}' - }) + self.log_info(f'refreshing snmp interface indexes for {hostname}') # load snmp data, in this thread snmp_refresh_interfaces.apply(args=[hostname, community]) clear_cached_classifier_responses(None) - - return self.success(f'updated configuration for {hostname}') + self.log_info(f'updated configuration for {hostname}') def _erase_next_db(config): @@ -572,8 +539,6 @@ def internal_refresh_phase_2(self): t = refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)]) pending_task_ids.append(t.id) - return self.success() - @log_task_entry_and_exit def launch_refresh_cache_all(config): @@ -651,22 +616,14 @@ def refresh_finalizer(self, pending_task_ids_json): "items": {"type": "string"} } - def _update(s): - logger.debug(s) - self.update_state( - state=states.STARTED, - meta={ - 'message': s - }) - try: task_ids = json.loads(pending_task_ids_json) logger.debug('task_ids: %r' % task_ids) jsonschema.validate(task_ids, input_schema) - _wait_for_tasks(task_ids, update_callback=_update) - _build_subnet_db(update_callback=_update) - _build_service_category_interface_list(update_callback=_update) + _wait_for_tasks(task_ids, update_callback=self.log_info) + _build_subnet_db(update_callback=self.log_info) + _build_service_category_interface_list(update_callback=self.log_info) except (jsonschema.ValidationError, json.JSONDecodeError, @@ -675,9 +632,7 @@ def refresh_finalizer(self, pending_task_ids_json): raise e latch_db(InventoryTask.config) - _update('latched current/next dbs') - - return self.success() + self.log_info('latched current/next dbs') @log_task_entry_and_exit diff --git a/test/conftest.py b/test/conftest.py index c783073e0d97144f5d1dc8293125b71be4410553..a4f05322340f3610f70da983beb5cc4857d2e08c 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -209,6 +209,12 @@ def mocked_worker_module( with open(data_config_filename) as f: worker.InventoryTask.config = config.load(f) + def _mocked_send_event(*kargs, **kwargs): + pass + mocker.patch( + 'inventory_provider.tasks.worker.InventoryTask.send_event', + _mocked_send_event) + def _mocked_snmp_interfaces(hostname, community): return json.loads(cached_test_data['snmp-interfaces:' + hostname]) mocker.patch( diff --git a/test/test_monitoring.py b/test/test_monitoring.py new file mode 100644 index 0000000000000000000000000000000000000000..66fa82851b4a4ccbeba942185f20a9567699e0e8 --- /dev/null +++ b/test/test_monitoring.py @@ -0,0 +1,77 @@ +import contextlib +import json +import os +from unittest import mock +from inventory_provider.tasks import monitor +from inventory_provider.tasks.common import _get_redis + +CONNECTION_FINGERPRINT = "SDF@#$@#" + +TEST_MANAGEMENT_EVENTS = [ + {'type': 'task-aaaa', 'uuid': 'AAAA', 'clock': 999}, + {'type': 'task-infox', 'uuid': 'AAAA', 'clock': 999} +] + +TEST_LOG_EVENTS = [ + {'type': 'task-info', 'uuid': 'AAAA', 'clock': 99}, + {'type': 'task-info', 'uuid': 'AAAA', 'clock': 999}, + {'type': 'task-warning', 'uuid': 'AAAA', 'clock': 88}, + {'type': 'task-warning', 'uuid': 'AAAA', 'clock': 888}, + {'type': 'task-error', 'uuid': 'AAAA', 'clock': 77}, + {'type': 'task-error', 'uuid': 'AAAA', 'clock': 777} +] + + +@contextlib.contextmanager +def mocked_connection(): + yield CONNECTION_FINGERPRINT + + +class MockedState(): + def __init__(self): + pass + + def event(self, e): + pass + + +class MockedReceiver(): + def __init__(self, connection, handlers): + assert connection == CONNECTION_FINGERPRINT + self.handlers = handlers + + def capture(self, **kwargs): + # write test events to log, check in the test case + for e in TEST_MANAGEMENT_EVENTS + TEST_LOG_EVENTS: + self.handlers['*'](e) + + +def backend_db(): + return _get_redis({ + 'redis': { + 'hostname': None, + 'port': None + }, + 'redis-databases': [0, 7] + }).db + + +@mock.patch('inventory_provider.tasks.monitor.app.events.State', MockedState) +@mock.patch( + 'inventory_provider.tasks.monitor.app.connection', mocked_connection) +@mock.patch( + 'inventory_provider.tasks.monitor.app.events.Receiver', MockedReceiver) +def test_latchdb(data_config_filename, mocked_redis): + + os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME'] = data_config_filename + monitor.run() + + db = backend_db() + + for e in TEST_MANAGEMENT_EVENTS: + expected_key = f'joblog:{e["uuid"]}:{e["type"]}' + assert e == json.loads(db[expected_key]) + + for e in TEST_LOG_EVENTS: + expected_key = f'joblog:{e["uuid"]}:{e["type"]}:{e["clock"]}' + assert e == json.loads(db[expected_key])