diff --git a/Changelog.md b/Changelog.md index 9fd92d47d7b60081c9d5c9b8e3b13f5ac1afc015..3942962741008f66708123693715f5b0a62a600a 100644 --- a/Changelog.md +++ b/Changelog.md @@ -2,6 +2,11 @@ All notable changes to this project will be documented in this file. +## [0.45] - 2020-06-05 +- DBOARD3-242: use cached netconf/snmp data when router is unavailable +- use celery events rather than status for logging errors & warnings +- added a monitoring process and api for exposing event messages + ## [0.44] - 2020-06-03 - DBOARD3-284: accomodate IMS API updates - DBOARD3-271: added customer-user weeding diff --git a/README.md b/README.md index dc2faf5ca4906236aefe16643c9361a7479baf18..96f09e2e5b5bb9d6bdb0df60506a499b6d2ccded 100644 --- a/README.md +++ b/README.md @@ -331,6 +331,27 @@ Any non-empty responses are JSON formatted messages. } ``` +* /jobs/log + + This resource returns the state of the previous (or current) + tasks associated with a call to `/jobs/update`. The response + contains error or warning messages, if any were generated. + + ```json + { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "pending": {"type": "array", "items": {"type": "string"}}, + "errors": {"type": "array", "items": {"type": "string"}}, + "failed": {"type": "array", "items": {"type": "string"}}, + "warnings": {"type": "array", "items": {"type": "string"}}, + }, + "required": ["pending", "errors", "failed", "warnings"], + "additionalProperties": False + } + ``` + * /jobs/reload-router-config/*`equipment-name`* This resource updates the inventory network data for @@ -367,10 +388,10 @@ Any non-empty responses are JSON formatted messages. The source-equipment is the equipment that causes the trap, not the NMS that sends it. - + The response will be an object containing the metadata, formatted according to the following schema: - + ```json { "$schema": "http://json-schema.org/draft-07/schema#", @@ -679,10 +700,10 @@ Any non-empty responses are JSON formatted messages. The source-equipment is the equipment that causes the trap, not the NMS that sends it. - + The response will be an object containing the metadata, formatted according to the following schema: - + ```json { "$schema": "http://json-schema.org/draft-07/schema#", diff --git a/inventory_provider/juniper.py b/inventory_provider/juniper.py index c00ab985f1b55ea52aab83939c303e69286f0ec1..f30cb222866f856eb354ae285692727dd79bba10 100644 --- a/inventory_provider/juniper.py +++ b/inventory_provider/juniper.py @@ -115,7 +115,7 @@ def _rpc(hostname, ssh): ssh_private_key_file=ssh['private-key']) try: dev.open() - except EzErrors.ConnectError as e: + except [EzErrors.ConnectError, EzErrors.RpcError] as e: raise ConnectionError(str(e)) return dev.rpc diff --git a/inventory_provider/routes/jobs.py b/inventory_provider/routes/jobs.py index fc07171113483ca7af59850b8af23ec98dcf1f1b..af879679526263d59d14093f99f149bb80a47169 100644 --- a/inventory_provider/routes/jobs.py +++ b/inventory_provider/routes/jobs.py @@ -1,10 +1,16 @@ +import json +import logging + from distutils.util import strtobool from flask import Blueprint, current_app, jsonify, Response, request + +from inventory_provider.tasks import monitor from inventory_provider.tasks import worker from inventory_provider.routes import common from inventory_provider.tasks.common import get_current_redis, get_latch routes = Blueprint("inventory-data-job-routes", __name__) +logger = logging.getLogger(__name__) @routes.after_request @@ -65,3 +71,63 @@ def check_update_status(): task_id = task_id.decode('utf-8') return jsonify(list(worker.check_task_status(task_id))) + + +@routes.route("log", methods=['GET', 'POST']) +@common.require_accepts_json +def load_task_log(): + + FINALIZATION_EVENTS = {'task-succeeded', 'task-failed', 'task-revoked'} + + config = current_app.config['INVENTORY_PROVIDER_CONFIG'] + r = get_current_redis(config) + + cache_key = 'joblog:cached-response' + result = r.get(cache_key) + if result: + result = json.loads(result.decode('utf-8')) + else: + result = { + 'pending': [], + 'warnings': [], + 'errors': [], + 'failed': [] + } + + found_tasks = False + for task in monitor.load_task_log( + current_app.config['INVENTORY_PROVIDER_CONFIG']).values(): + + found_tasks = True + + for event in task.get('task-warning', []): + result['warnings'].append(event['message']) + for event in task.get('task-error', []): + result['errors'].append(event['message']) + + # build the description if task-received is available + description = None + if 'task-received' in task: + event = task['task-received'][0] + description = f'{event["name"]}{event["args"]}' + + if 'task-failed' in task: + if not description: + logger.error('found task-failed event without' + f'task-received: {task}') + description = task['task-failed'][0]['uuid'] + result['failed'].append(description) + + if 'task-started' in task: + finished = set(task.keys()) & FINALIZATION_EVENTS + if not finished: + if not description: + logger.error('found task-started event without' + f'task-received: {task}') + description = task['task-started'][0]['uuid'] + result['pending'].append(description) + + if found_tasks and not result['pending']: + r.set(cache_key, json.dumps(result)) + + return jsonify(result) diff --git a/inventory_provider/snmp.py b/inventory_provider/snmp.py index ded726e74bbe26357a6a3187e1b8f0b60ce1cd06..92cc3a835a1c7e00a9abbe9b51ea11cb41e1e1ea 100644 --- a/inventory_provider/snmp.py +++ b/inventory_provider/snmp.py @@ -10,6 +10,10 @@ from pysnmp.smi import builder, compiler RFC1213_MIB_IFDESC = '1.3.6.1.2.1.2.2.1.2' +class SNMPWalkError(ConnectionError): + pass + + def _v6address_oid2str(dotted_decimal): hex_params = [] for dec in re.split(r'\.', dotted_decimal): @@ -61,15 +65,19 @@ def walk(agent_hostname, community, base_oid): # pragma: no cover # cf. http://snmplabs.com/ # pysnmp/examples/hlapi/asyncore/sync/contents.html - assert not engineErrorIndication, ( - f'snmp response engine error indication: ' - f'{str(engineErrorIndication)} - {agent_hostname}') - assert not pduErrorIndication, 'snmp response pdu error %r at %r' % ( - pduErrorIndication, - errorIndex and varBinds[int(errorIndex) - 1][0] or '?') - assert errorIndex == 0, ( - 'sanity failure: errorIndex != 0, ' - 'but no error indication') + if engineErrorIndication: + raise SNMPWalkError( + f'snmp response engine error indication: ' + f'{str(engineErrorIndication)} - {agent_hostname}') + if pduErrorIndication: + raise SNMPWalkError( + 'snmp response pdu error %r at %r' % ( + pduErrorIndication, + errorIndex and varBinds[int(errorIndex) - 1][0] or '?')) + if errorIndex != 0: + raise SNMPWalkError( + 'sanity failure: errorIndex != 0, ' + 'but no error indication') # varBinds = [ # rfc1902.ObjectType(rfc1902.ObjectIdentity(x[0]),x[1]) @@ -84,7 +92,7 @@ def walk(agent_hostname, community, base_oid): # pragma: no cover def get_router_snmp_indexes(hostname, community): for ifc in walk(hostname, community, RFC1213_MIB_IFDESC): m = re.match(r'.*\.(\d+)$', ifc['oid']) - assert m, 'sanity failure parsing oid: %r' % ifc['oid'] + assert m, f'sanity failure parsing oid: {ifc["oid"]}' yield { 'name': ifc['value'], 'index': int(m.group(1)) diff --git a/inventory_provider/static/update.html b/inventory_provider/static/update.html index 18ba32d01b87676b3cef3f1f63696397f88a071f..1c44a59273205b46ce21323781489c0f1b53025e 100644 --- a/inventory_provider/static/update.html +++ b/inventory_provider/static/update.html @@ -13,43 +13,79 @@ <body> <div ng-controller="update"> - <div class="grid-container"> - <div class="grid-item"> + <div class="container p-3 my-3"> <button type="button" class="btn btn-primary btn-lg" ng-click="launch_update()" ng-disabled="latch_pending">Update Inventory Provider</button> </div> - <div class="grid-item" ng-class="latch_error||update_request_error ? 'error' : 'ok'"> + <div class="container p-3 my-3 border" ng-class="latch_error||update_request_error ? 'error' : 'ok'"> {{ latch_info }} </div> - <div class="grid-item" ng-class="update_request_error ? 'error' : 'ok'"> + <div class="container p-3 my-3" ng-class="update_request_error ? 'error' : 'ok'"> {{ update_request_status }} </div> - <span class="grid-item" ng-show="latch_error||latch_pending"> - <table class="table table-striped" summary="update tasks"> + <div class="container p-3 my-3 border" ng-show="errors.length"> + <table class="table table-striped" summary="error messages"> <tr> - <th colspan="4" scope="col">update tasks</th> + <th> + <button ng-click="show_errors=!show_errors">{{show_errors ? "hide" : "show"}}</button> + </th> + <th scope="col">errors</th> + </tr> + <tr ng-if="show_errors" ng-repeat="description in errors"> + <td colspan="2">{{ description }}</td> + </tr> + </table> + </div> + + <div class="container p-3 my-3 border" ng-show="warnings.length"> + <table class="table table-striped" summary="warning messages"> <tr> - <th scope="col">name</th> - <th scope="col">status</th> - <th scope="col">success</th> - <th scope="col">message</th> + <th> + <button ng-click="show_warnings=!show_warnings">{{show_warnings ? "hide" : "show"}}</button> + </th> + <th scope="col">warnings</th> </tr> - <tr ng-repeat="t in tasks"> - <td>{{ t.name }}</td> - <td>{{ t.status }}</td> - <td>{{ t.success }}</td> - <td>{{ t.message }}</td> + <tr ng-if="show_warnings" ng-repeat="description in warnings"> + <td colspan="2">{{ description }}</td> </tr> </table> - </span> + </div> + + <div class="container p-3 my-3 border" ng-show="failed.length"> + <table class="table table-striped" summary="failed tasks"> + <tr> + <th> + <button ng-click="show_failed=!show_failed">{{show_failed ? "hide" : "show"}}</button> + </th> + <th scope="col">failed tasks</th> + </tr> + <tr ng-if="show_failed" ng-repeat="description in failed"> + <td colspan="2">{{ description }}</td> + </tr> + </table> + </div> + + <div class="container p-3 my-3 border" ng-show="pending.length"> + <table class="table table-striped" summary="pending tasks"> + <tr> + <th> + <button ng-click="show_pending=!show_pending">{{show_pending ? "hide" : "show"}}</button> + </th> + <th scope="col">pending tasks</th> + </tr> + <tr ng-if="show_pending" ng-repeat="description in pending"> + <td colspan="2">{{ description }}</td> + </tr> + </table> + </div> </div> - </div> + </body> </html> \ No newline at end of file diff --git a/inventory_provider/static/update.js b/inventory_provider/static/update.js index 64e6807348e3d7fa065be42a01deb5a5c1334912..a4d87b4c74c2e783cd9200d979b9a5cc4f8795b5 100644 --- a/inventory_provider/static/update.js +++ b/inventory_provider/static/update.js @@ -9,7 +9,15 @@ myApp.controller('update', function($scope, $http, $timeout) { $scope.update_request_status = ""; $scope.update_request_error = false; - $scope.tasks = []; + $scope.pending = []; + $scope.failed = []; + $scope.errors = []; + $scope.warnings = []; + + $scope.show_warnings = true; + $scope.show_errors = true; + $scope.show_pending = true; + $scope.show_failed = true; $scope.check_status = function() { @@ -25,11 +33,10 @@ myApp.controller('update', function($scope, $http, $timeout) { $scope.latch_error = rsp.data.latch.failure; if (!$scope.latch_pending) { $scope.update_request_status = ""; + } else { + $scope.update_request_status = "update is running"; } $timeout($scope.check_status, 5000); - if ($scope.latch_pending || $scope.latch_error) { - $scope.refresh_update_status(); - } }, /* error response */ function(rsp) { @@ -46,25 +53,25 @@ myApp.controller('update', function($scope, $http, $timeout) { $http({ method: 'GET', - // url: window.location.origin + "/jobs/check-task-status/9d1cbcd2-c377-4b7a-b969-04ce17f03f20" - url: window.location.origin + "/jobs/check-update-status" + url: window.location.origin + "/jobs/log" }).then( /* ok response */ function(rsp) { console.log('got update status rsp: ' + JSON.stringify(rsp.data).substring(0,30)); - $scope.tasks = rsp.data.map(t => ({ - id: t.id, - parent: t.parent, - status: t.status, - success: t.ready ? (t.success ? "OK" : "NO") : "-", - message: (t.result && t.result.message) ? t.result.message.substring(0,100) : "", - name: t.result ? t.result.task : "", - })); + $scope.pending = rsp.data.pending; + $scope.failed = rsp.data.failed; + $scope.warnings = rsp.data.warnings; + $scope.errors = rsp.data.errors; + $timeout($scope.refresh_update_status, 2000); }, /* error response */ function(rsp) { - // assume this is 404 ... - $scope.tasks = []; + // assume this is 404 ...? + $scope.pending = []; + $scope.failed = []; + $scope.errors = []; + $scope.warnings = []; + $timeout($scope.refresh_update_status, 5000); } ); @@ -94,4 +101,6 @@ myApp.controller('update', function($scope, $http, $timeout) { } $scope.check_status(); + $scope.refresh_update_status(); + }); \ No newline at end of file diff --git a/inventory_provider/tasks/common.py b/inventory_provider/tasks/common.py index 3a474c8e13d03042c3aeaaff94e1eeeb07cc5797..ac85afb7de7094517c95cf2ed5f2929b211f6afc 100644 --- a/inventory_provider/tasks/common.py +++ b/inventory_provider/tasks/common.py @@ -24,6 +24,48 @@ DB_LATCH_SCHEMA = { "additionalProperties": False } +TASK_LOG_SCHEMA = { + '$schema': 'http://json-schema.org/draft-07/schema#', + + 'definitions': { + 'meta': { + 'type': 'object', + 'properties': { + 'task': {'type': 'string'}, + 'id': {'type': 'string'}, + 'worker': {'type': 'string'}, + 'pid': {'type': 'integer'}, + 'warning': {'type': 'boolean'}, + 'error': {'type': 'boolean'}, + 'args': {'type': 'array'} + }, + 'required': ['task', 'id', 'worker', 'pid', 'warning', 'error'], + 'additionalProperties': False + }, + 'message': { + 'type': 'object', + 'properties': { + 'message': {'type': 'string'}, + 'level': { + 'type': 'string', + 'enum': ['INFO', 'WARNING', 'ERROR'] + } + } + } + }, + + 'type': 'object', + 'properties': { + 'meta': {'$ref': '#/definitions/meta'}, + 'messages': { + 'type': 'array', + 'items': {'$ref': '#/definitions/message'} + } + }, + 'required': ['meta', 'messages'], + 'additionalProperties': False +} + def get_latch(r): latch = r.get('db:latch') diff --git a/inventory_provider/tasks/config.py b/inventory_provider/tasks/config.py index bf535b2ee4d741ae82a2ec1145f508d75ccbbbca..911aae9f547304f3d85a7884892733434063a7a5 100644 --- a/inventory_provider/tasks/config.py +++ b/inventory_provider/tasks/config.py @@ -47,3 +47,4 @@ logger.debug('broker_url: %r' % broker_url) task_eager_propagates = True task_track_started = True +worker_send_task_events = True diff --git a/inventory_provider/tasks/monitor.py b/inventory_provider/tasks/monitor.py new file mode 100644 index 0000000000000000000000000000000000000000..7d7a9951b6a32bd2b85045e39757ec033c137ac8 --- /dev/null +++ b/inventory_provider/tasks/monitor.py @@ -0,0 +1,232 @@ +""" +standalone process that monitors celery task events and +writes them to redis for reporting + +as with the other processes, INVENTORY_PROVIDER_CONFIG_FILENAME +must be defined in the environment +""" +import json +import logging +import os +import queue +import random +import threading + +import jsonschema +from redis.exceptions import RedisError + +from inventory_provider import config, environment +from inventory_provider.tasks.worker import app +from inventory_provider.tasks.common import _get_redis, get_current_redis + + +logger = logging.getLogger(__name__) +INFO_EVENT_TYPES = ('task-info', 'task-warning', 'task-error') +LOG_ENTRY_SCHEMA = { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "uuid": {"type": "string"}, + "type": {"type": "string"}, + "clock": {"type": "integer"} + }, + "required": ["uuid", "type"], + "additionalProperties": True +} + + +def _save_proc(db_queue, params, dbid): + """ + save redis events to a specific db + :param q: queue for receiving events, None means to stop + :param params: + :param dbid: + :return: + """ + def _save_events(r, q): + while True: + event = q.get() + if not event: + return + r.set(event['key'], event['value']) + + while True: + try: + _save_events(_get_redis(config=params, dbid=dbid), db_queue) + # we only reach here if the event loop terminated + # normally, because None was sent + return + except RedisError: + logger.exception('redis i/o exception, reconnecting') + # TODO: do something to terminate the process ...? + + +def run(): + """ + save 'task-*' events to redis (all databases), never returns + """ + environment.setup_logging() + + with open(os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']) as f: + logging.info( + 'loading config from: %r' + % os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']) + config_params = config.load(f) + + state = app.events.State() + + threads = [] + for dbid in config_params['redis-databases']: + q = queue.Queue() + t = threading.Thread( + target=_save_proc, + args=[q, config_params, dbid]) + t.start() + threads.append({'thread': t, 'queue': q}) + + def _log_event(event): + state.event(event) + + if not event['type'].startswith('task-'): + return + + key = f'joblog:{event["uuid"]}:{event["type"]}' + if event['type'] in INFO_EVENT_TYPES: + key += f':{event["clock"]}' + + value = json.dumps(event) + for t in threads: + t['queue'].put({'key': key, 'value': value}) + + logger.debug(f'{key}: {json.dumps(event)}') + + with app.connection() as connection: + recv = app.events.Receiver(connection, handlers={ + '*': _log_event + }) + recv.capture(limit=None, timeout=None, wakeup=True) + + logger.warning('normally we should never reach here') + + # allow child threads to terminate + for t in threads: + t['queue'].put(None) + t['thread'].join() + + +def clear_joblog(r, keys_read_event=None): + """ + :param r: connection to a redis database + :param keys_read_event: optional event to signal after all keys are read + :return: + """ + rp = r.pipeline() + for key in r.scan_iter('joblog:*'): + rp.delete(key) + if keys_read_event: + assert isinstance(keys_read_event, threading.Event) # sanity + keys_read_event.set() + rp.execute() + + +def _redis_client_proc(key_queue, value_queue, config_params): + """ + create a local redis connection, lookup the values of + the keys that come from key_queue, and put them on value_queue + + i/o contract: + None arriving on key_queue means no more keys are coming + put None in value_queue means we are finished + + :param key_queue: + :param value_queue: + :param config_params: app config + :return: + """ + try: + r = get_current_redis(config_params) + while True: + key = key_queue.get() + + # contract is that None means no more requests + if not key: + break + + value = r.get(key).decode('utf-8') + value = json.loads(value) + jsonschema.validate(value, LOG_ENTRY_SCHEMA) + + value_queue.put(value) + + except (json.JSONDecodeError, jsonschema.ValidationError): + logger.exception(f'error decoding entry for {key}') + + finally: + # contract is to return None when finished + value_queue.put(None) + + +def load_task_log(config_params, ignored_keys=[]): + """ + load the task log in a formatted dictionary: + keys are task uuid's + values are dicts with keys like 'task-started', 'task-info', etc. ... + values of those elements are celery event dicts + + the loading is done with multiple connections in parallel, since this + method is called from an api handler and when the client is far from + the redis master the cumulative latency causes nginx/gunicorn timeouts + + :param config_params: app config + :param ignored_keys: list of keys to ignore if found + :return: + """ + response_queue = queue.Queue() + + threads = [] + for _ in range(10): + q = queue.Queue() + t = threading.Thread( + target=_redis_client_proc, + args=[q, response_queue, config_params]) + t.start() + threads.append({'thread': t, 'queue': q}) + + r = get_current_redis(config_params) + for k in r.scan_iter('joblog:*'): + k = k.decode('utf-8') + if k in ignored_keys: + logger.debug('ignoring key: {k}') + continue + + t = random.choice(threads) + t['queue'].put(k) + + # tell all threads there are no more keys coming + for t in threads: + t['queue'].put(None) + + num_finished = 0 + tasks = {} + # read values from response_queue until we receive + # None len(threads) times + while num_finished < len(threads): + value = response_queue.get() + if not value: + num_finished += 1 + logger.debug('one worker thread finished') + continue + + info = tasks.setdefault(value['uuid'], {}) + info.setdefault(value['type'], []).append(value) + + # cleanup like we're supposed to, even though it's python + for t in threads: + t['thread'].join(timeout=0.5) # timeout, for sanity + + return tasks + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + run() diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index 6c965ae95e535624b5d838495e9df8df00780989..c69c9642912928ff9876cb4ca5506aa0de578a6c 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -1,8 +1,11 @@ import json import logging import os +import threading import time +from redis.exceptions import RedisError + from celery import Task, states from celery.result import AsyncResult @@ -12,8 +15,9 @@ import jsonschema from inventory_provider.tasks.app import app from inventory_provider.tasks.common \ - import get_next_redis, latch_db, get_latch, set_latch, update_latch_status -from inventory_provider.tasks import data + import get_next_redis, get_current_redis, \ + latch_db, get_latch, set_latch, update_latch_status +from inventory_provider.tasks import data, monitor from inventory_provider import config from inventory_provider import environment from inventory_provider.db import db, opsdb @@ -51,7 +55,7 @@ class InventoryTask(Task): def __init__(self): - self.pid = os.getpid() + self.args = [] if InventoryTask.config: return @@ -68,62 +72,66 @@ class InventoryTask(Task): InventoryTask.config = config.load(f) logging.debug("loaded config: %r" % InventoryTask.config) - def update_state(self, **kwargs): - meta = kwargs.setdefault('meta', dict()) - meta['task'] = self.name - meta['worker'] = self.request.hostname - meta['pid'] = self.pid - logger.debug(json.dumps( - {'state': kwargs['state'], 'meta': str(meta)} - )) - super().update_state(**kwargs) - - def on_failure(self, exc, task_id, args, kwargs, einfo): - logger.exception(exc) - super().on_failure(exc, task_id, args, kwargs, einfo) - - def _task_return_value(self, warning, message): - """ - common method for constructing a standard task return value - :param warning: boolean (False for normal, warning-free response) - :param message: text message to include in return value - :return: a serializable dict - """ - return { - 'task': self.name, - 'id': self.request.id, - 'worker': self.request.hostname, - 'pid': self.pid, - 'warning': warning, - 'message': message - } + def log_info(self, message): + logger.debug(message) + self.send_event('task-info', message=message) - def success(self, message='OK'): - return self._task_return_value(warning=False, message=message) + def log_warning(self, message): + logger.warning(message) + self.send_event('task-warning', message=message) - def warning(self, message='WARNING'): - return self._task_return_value(warning=True, message=message) + def log_error(self, message): + logger.error(message) + self.send_event('task-error', message=message) @app.task(base=InventoryTask, bind=True, name='snmp_refresh_interfaces') @log_task_entry_and_exit def snmp_refresh_interfaces(self, hostname, community): - # TODO: [DBOARD3-242] copy from current redis in case of error - value = list(snmp.get_router_snmp_indexes(hostname, community)) + try: + value = list(snmp.get_router_snmp_indexes(hostname, community)) + except ConnectionError: + msg = f'error loading snmp data from {hostname}' + logger.exception(msg) + self.log_warning(msg) + r = get_current_redis(InventoryTask.config) + value = r.get(f'snmp-interfaces:{hostname}') + if not value: + raise InventoryTaskError( + f'snmp error with {hostname}' + f' and no cached netconf data found') + # unnecessary json encode/decode here ... could be optimized + value = json.loads(value.decode('utf-8')) + self.log_warning(f'using cached snmp data for {hostname}') + r = get_next_redis(InventoryTask.config) - r.set('snmp-interfaces:' + hostname, json.dumps(value)) - return self.success(message=f'snmp info loaded from {hostname}') + r.set(f'snmp-interfaces:{hostname}', json.dumps(value)) + self.log_info(f'snmp info loaded from {hostname}') @app.task(base=InventoryTask, bind=True, name='netconf_refresh_config') @log_task_entry_and_exit def netconf_refresh_config(self, hostname): - # TODO: [DBOARD3-242] copy from current redis in case of error - netconf_doc = juniper.load_config(hostname, InventoryTask.config["ssh"]) - netconf_str = etree.tostring(netconf_doc, encoding='unicode') + + try: + netconf_doc = juniper.load_config( + hostname, InventoryTask.config["ssh"]) + netconf_str = etree.tostring(netconf_doc, encoding='unicode') + except ConnectionError: + msg = f'error loading netconf data from {hostname}' + logger.exception(msg) + self.log_warning(msg) + r = get_current_redis(InventoryTask.config) + netconf_str = r.get(f'netconf:{hostname}') + if not netconf_str: + raise InventoryTaskError( + f'netconf error with {hostname}' + f' and no cached netconf data found') + self.log_warning(f'using cached netconf data for {hostname}') + r = get_next_redis(InventoryTask.config) - r.set('netconf:' + hostname, netconf_str) - return self.success(message=f'netconf info loaded from {hostname}') + r.set(f'netconf:{hostname}', netconf_str) + self.log_info(f'netconf info loaded from {hostname}') @app.task(base=InventoryTask, bind=True, name='update_interfaces_to_services') @@ -149,8 +157,6 @@ def update_interfaces_to_services(self): json.dumps(services)) rp.execute() - return self.success() - @app.task(base=InventoryTask, bind=True, name='import_unmanaged_interfaces') @log_task_entry_and_exit @@ -183,8 +189,6 @@ def import_unmanaged_interfaces(self): json.dumps([ifc])) rp.execute() - return self.success() - @app.task(base=InventoryTask, bind=True, name='update_access_services') @log_task_entry_and_exit @@ -214,8 +218,6 @@ def update_access_services(self): json.dumps(service)) rp.execute() - return self.success() - @app.task(base=InventoryTask, bind=True, name='update_lg_routers') @log_task_entry_and_exit @@ -233,8 +235,6 @@ def update_lg_routers(self): rp.set(f'opsdb:lg:{router["equipment name"]}', json.dumps(router)) rp.execute() - return self.success() - @app.task(base=InventoryTask, bind=True, name='update_equipment_locations') @log_task_entry_and_exit @@ -254,8 +254,6 @@ def update_equipment_locations(self): rp.set('opsdb:location:%s' % h, json.dumps(locations)) rp.execute() - return self.success() - @app.task(base=InventoryTask, bind=True, name='update_circuit_hierarchy') @log_task_entry_and_exit @@ -286,8 +284,6 @@ def update_circuit_hierarchy(self): rp.set('opsdb:services:children:%d' % cid, json.dumps(children)) rp.execute() - return self.success() - @app.task(base=InventoryTask, bind=True, name='update_geant_lambdas') @log_task_entry_and_exit @@ -307,32 +303,21 @@ def update_geant_lambdas(self): json.dumps(ld)) rp.execute() - return self.success() - @app.task(base=InventoryTask, bind=True, name='update_neteng_managed_device_list') @log_task_entry_and_exit def update_neteng_managed_device_list(self): - self.update_state( - state=states.STARTED, - meta={ - 'message': 'querying netdash for managed routers' - }) + self.log_info('querying netdash for managed routers') routers = list(juniper.load_routers_from_netdash( InventoryTask.config['managed-routers'])) - self.update_state( - state=states.STARTED, - meta={ - 'message': f'found {len(routers)} routers, saving details' - }) + self.log_info(f'found {len(routers)} routers, saving details') r = get_next_redis(InventoryTask.config) r.set('netdash', json.dumps(routers).encode('utf-8')) - - return self.success(f'saved {len(routers)} managed routers') + self.log_info(f'saved {len(routers)} managed routers') def load_netconf_data(hostname): @@ -467,12 +452,7 @@ def refresh_juniper_interface_list(hostname, netconf): @app.task(base=InventoryTask, bind=True, name='reload_router_config') @log_task_entry_and_exit def reload_router_config(self, hostname): - self.update_state( - state=states.STARTED, - meta={ - 'hostname': hostname, - 'message': f'loading netconf data for {hostname}' - }) + self.log_info(f'loading netconf data for {hostname}') # get the timestamp for the current netconf data current_netconf_timestamp = None @@ -497,16 +477,12 @@ def reload_router_config(self, hostname): if new_netconf_timestamp == current_netconf_timestamp: msg = f'no timestamp change for {hostname} netconf data' logger.debug(msg) - return self.success(msg) + self.log_info(msg) + return # clear cached classifier responses for this router, and # refresh peering data - self.update_state( - state=states.STARTED, - meta={ - 'hostname': hostname, - 'message': f'refreshing peers & clearing cache for {hostname}' - }) + self.log_info(f'refreshing peers & clearing cache for {hostname}') refresh_ix_public_peers(hostname, netconf_doc) refresh_vpn_rr_peers(hostname, netconf_doc) refresh_interface_address_lookups(hostname, netconf_doc) @@ -519,18 +495,12 @@ def reload_router_config(self, hostname): raise InventoryTaskError( f'error extracting community string for {hostname}') else: - self.update_state( - state=states.STARTED, - meta={ - 'hostname': hostname, - 'message': f'refreshing snmp interface indexes for {hostname}' - }) + self.log_info(f'refreshing snmp interface indexes for {hostname}') # load snmp data, in this thread snmp_refresh_interfaces.apply(args=[hostname, community]) clear_cached_classifier_responses(None) - - return self.success(f'updated configuration for {hostname}') + self.log_info(f'updated configuration for {hostname}') def _erase_next_db(config): @@ -572,8 +542,6 @@ def internal_refresh_phase_2(self): t = refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)]) pending_task_ids.append(t.id) - return self.success() - @log_task_entry_and_exit def launch_refresh_cache_all(config): @@ -582,25 +550,42 @@ def launch_refresh_cache_all(config): :param config: config structure as defined in config.py :return: """ - _erase_next_db(config) - update_latch_status(config, pending=True) - - # TODO: [DBOARD3-242] catch exceptions & reset latch status - - # first batch of subtasks: refresh cached opsdb data - subtasks = [ - update_neteng_managed_device_list.apply_async(), - update_interfaces_to_services.apply_async(), - update_geant_lambdas.apply_async(), - update_circuit_hierarchy.apply_async() - ] - [x.get() for x in subtasks] - - # now launch the task whose only purpose is to - # act as a convenient parent for all of the remaining tasks - t = internal_refresh_phase_2.apply_async() - return t.id + try: + _erase_next_db(config) + update_latch_status(config, pending=True) + + # call monitor.clear_joblog in a thread, since + # deletion might be slow and can be done in parallel + def _clear_log_proc(wait_event): + monitor.clear_joblog(get_current_redis(config), wait_event) + + keys_captured_event = threading.Event() + threading.Thread( + target=_clear_log_proc, + args=[keys_captured_event]).start() + if not keys_captured_event.wait(timeout=60.0): + # wait a reasonable time + logging.error('timed out waiting for log keys to be read') + + # first batch of subtasks: refresh cached opsdb data + subtasks = [ + update_neteng_managed_device_list.apply_async(), + update_interfaces_to_services.apply_async(), + update_geant_lambdas.apply_async(), + update_circuit_hierarchy.apply_async() + ] + [x.get() for x in subtasks] + + # now launch the task whose only purpose is to + # act as a convenient parent for all of the remaining tasks + t = internal_refresh_phase_2.apply_async() + return t.id + + except RedisError: + update_latch_status(config, pending=False, failure=True) + logger.exception('error launching refresh subtasks') + raise def _wait_for_tasks(task_ids, update_callback=lambda s: None): @@ -645,39 +630,32 @@ def _wait_for_tasks(task_ids, update_callback=lambda s: None): @log_task_entry_and_exit def refresh_finalizer(self, pending_task_ids_json): + # TODO: if more types of errors appear, use a finally block + input_schema = { "$schema": "http://json-schema.org/draft-07/schema#", "type": "array", "items": {"type": "string"} } - def _update(s): - logger.debug(s) - self.update_state( - state=states.STARTED, - meta={ - 'message': s - }) - try: task_ids = json.loads(pending_task_ids_json) logger.debug('task_ids: %r' % task_ids) jsonschema.validate(task_ids, input_schema) - _wait_for_tasks(task_ids, update_callback=_update) - _build_subnet_db(update_callback=_update) - _build_service_category_interface_list(update_callback=_update) + _wait_for_tasks(task_ids, update_callback=self.log_info) + _build_subnet_db(update_callback=self.log_info) + _build_service_category_interface_list(update_callback=self.log_info) except (jsonschema.ValidationError, json.JSONDecodeError, - InventoryTaskError) as e: + InventoryTaskError, + RedisError) as e: update_latch_status(InventoryTask.config, failure=True) raise e latch_db(InventoryTask.config) - _update('latched current/next dbs') - - return self.success() + self.log_info('latched current/next dbs') @log_task_entry_and_exit diff --git a/setup.py b/setup.py index 35ab81204f7ec07fd64a138a6eb76ce4c95c7ed8..58be9cbf2bb693d9f1c731df938b11afe8a2c3fb 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup, find_packages setup( name='inventory-provider', - version="0.44", + version="0.45", author='GEANT', author_email='swd@geant.org', description='Dashboard inventory provider', @@ -23,5 +23,10 @@ setup( 'requests', 'netifaces' ], + entry_points={ + 'console_scripts': [ + 'monitor-tasks=inventory_provider.tasks.monitor:run' + ] + }, include_package_data=True, ) diff --git a/test/conftest.py b/test/conftest.py index c783073e0d97144f5d1dc8293125b71be4410553..a4f05322340f3610f70da983beb5cc4857d2e08c 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -209,6 +209,12 @@ def mocked_worker_module( with open(data_config_filename) as f: worker.InventoryTask.config = config.load(f) + def _mocked_send_event(*kargs, **kwargs): + pass + mocker.patch( + 'inventory_provider.tasks.worker.InventoryTask.send_event', + _mocked_send_event) + def _mocked_snmp_interfaces(hostname, community): return json.loads(cached_test_data['snmp-interfaces:' + hostname]) mocker.patch( diff --git a/test/test_job_routes.py b/test/test_job_routes.py index 4f2b473c3abe34cc5c0744e0b48cde14c96522eb..33bbde510defb4d7ca6642aa77a1292e2531c711 100644 --- a/test/test_job_routes.py +++ b/test/test_job_routes.py @@ -42,6 +42,19 @@ TASK_STATUS_SCHEMA = { "items": {"$ref": "#/definitions/task"} } +TASK_LOG_SCHEMA = { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "pending": {"type": "array", "items": {"type": "string"}}, + "errors": {"type": "array", "items": {"type": "string"}}, + "failed": {"type": "array", "items": {"type": "string"}}, + "warnings": {"type": "array", "items": {"type": "string"}}, + }, + "required": ["pending", "errors", "failed", "warnings"], + "additionalProperties": False +} + def backend_db(): return _get_redis({ @@ -254,3 +267,122 @@ def test_latchdb(client, mocked_redis): assert rv.status_code == 200 latch = json.loads(rv.data.decode('utf-8')) jsonschema.validate(latch, DB_LATCH_SCHEMA) + + +def test_job_log(client): + + test_events = { + 'joblog:AAAA:task-aaa': { + 'type': 'task-aaaa', 'uuid': 'AAAA'}, + 'joblog:AAAB:task-infox': { + 'type': 'task-infox', 'uuid': 'AAAB'}, + + 'joblog:CCCC:task-received': { + 'type': 'task-received', + 'uuid': 'CCCC', + 'name': 'xyz', + 'args': ['z', 1] + }, + 'joblog:CCCC:task-started': { + 'type': 'task-started', 'uuid': 'CCCC'}, + 'joblog:CCCC:task-succeeded': { + 'type': 'task-succeeded', 'uuid': 'CCCC'}, + + 'joblog:TTTT:task-received': { + 'type': 'task-received', + 'uuid': 'TTTT', + 'name': 'xyz', + 'args': ['q', 123] + }, + 'joblog:TTTT:task-started': { + 'type': 'task-started', 'uuid': 'TTTT'}, + 'joblog:TTTT:task-failed': { + 'type': 'task-failed', 'uuid': 'TTTT'}, + + 'joblog:SSSS1:task-received': { + 'type': 'task-received', + 'uuid': 'SSSS', + 'name': 'xyz', + 'args': ['q', 123] + }, + 'joblog:SSSS1:task-started': { + 'type': 'task-started', 'uuid': 'SSSS'}, + 'joblog:SSSS2:task-received': { + 'type': 'task-received', + 'uuid': 'SSSS2', + 'name': 'xyz', + 'args': ['q', 123] + }, + 'joblog:SSSS2:task-started': { + 'type': 'task-started', 'uuid': 'SSSS2'}, + 'joblog:SSSS3:task-received': { + 'type': 'task-received', + 'uuid': 'SSSS3', + 'name': 'xyz', + 'args': ['q', 123] + }, + 'joblog:SSSS3:task-started': { + 'type': 'task-started', 'uuid': 'SSSS3'}, + + 'joblog:BBBB:task-info:99': { + 'type': 'task-info', + 'uuid': 'BBBB', + 'clock': 99, + 'message': 'x' + }, + 'joblog:BBBB:task-info:999': { + 'type': 'task-info', + 'uuid': 'BBBB', + 'clock': 999, + 'message': 'x' + }, + + 'joblog:AAAA:task-warning:88': { + 'type': 'task-warning', + 'uuid': 'AAAA', + 'clock': 88, + 'message': 'x' + }, + 'joblog:AAAA:task-warning:888': { + 'type': 'task-warning', + 'uuid': 'AAAA', + 'clock': 888, + 'message': 'x' + }, + + 'joblog:AAAA:task-error:77': { + 'type': 'task-error', + 'uuid': 'AAAA', + 'clock': 77, + 'message': 'x' + }, + 'joblog:AAAA:task-error:777': { + 'type': 'task-error', + 'uuid': 'AAAA', + 'clock': 777, + 'message': 'x' + }, + 'joblog:AAAA:task-error:7777': { + 'type': 'task-error', + 'uuid': 'AAAA', + 'clock': 7777, + 'message': 'x' + } + + } + + db = backend_db() + for k, v in test_events.items(): + db[k] = json.dumps(v) + + rv = client.post( + 'jobs/log', + headers=DEFAULT_REQUEST_HEADERS) + assert rv.status_code == 200 + result = json.loads(rv.data.decode('utf-8')) + jsonschema.validate(result, TASK_LOG_SCHEMA) + + assert len(result['errors']) == 3 + assert len(result['pending']) == 3 + assert len(result['failed']) == 1 + assert len(result['warnings']) == 2 diff --git a/test/test_monitoring.py b/test/test_monitoring.py new file mode 100644 index 0000000000000000000000000000000000000000..66fa82851b4a4ccbeba942185f20a9567699e0e8 --- /dev/null +++ b/test/test_monitoring.py @@ -0,0 +1,77 @@ +import contextlib +import json +import os +from unittest import mock +from inventory_provider.tasks import monitor +from inventory_provider.tasks.common import _get_redis + +CONNECTION_FINGERPRINT = "SDF@#$@#" + +TEST_MANAGEMENT_EVENTS = [ + {'type': 'task-aaaa', 'uuid': 'AAAA', 'clock': 999}, + {'type': 'task-infox', 'uuid': 'AAAA', 'clock': 999} +] + +TEST_LOG_EVENTS = [ + {'type': 'task-info', 'uuid': 'AAAA', 'clock': 99}, + {'type': 'task-info', 'uuid': 'AAAA', 'clock': 999}, + {'type': 'task-warning', 'uuid': 'AAAA', 'clock': 88}, + {'type': 'task-warning', 'uuid': 'AAAA', 'clock': 888}, + {'type': 'task-error', 'uuid': 'AAAA', 'clock': 77}, + {'type': 'task-error', 'uuid': 'AAAA', 'clock': 777} +] + + +@contextlib.contextmanager +def mocked_connection(): + yield CONNECTION_FINGERPRINT + + +class MockedState(): + def __init__(self): + pass + + def event(self, e): + pass + + +class MockedReceiver(): + def __init__(self, connection, handlers): + assert connection == CONNECTION_FINGERPRINT + self.handlers = handlers + + def capture(self, **kwargs): + # write test events to log, check in the test case + for e in TEST_MANAGEMENT_EVENTS + TEST_LOG_EVENTS: + self.handlers['*'](e) + + +def backend_db(): + return _get_redis({ + 'redis': { + 'hostname': None, + 'port': None + }, + 'redis-databases': [0, 7] + }).db + + +@mock.patch('inventory_provider.tasks.monitor.app.events.State', MockedState) +@mock.patch( + 'inventory_provider.tasks.monitor.app.connection', mocked_connection) +@mock.patch( + 'inventory_provider.tasks.monitor.app.events.Receiver', MockedReceiver) +def test_latchdb(data_config_filename, mocked_redis): + + os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME'] = data_config_filename + monitor.run() + + db = backend_db() + + for e in TEST_MANAGEMENT_EVENTS: + expected_key = f'joblog:{e["uuid"]}:{e["type"]}' + assert e == json.loads(db[expected_key]) + + for e in TEST_LOG_EVENTS: + expected_key = f'joblog:{e["uuid"]}:{e["type"]}:{e["clock"]}' + assert e == json.loads(db[expected_key])