diff --git a/README.md b/README.md index dc2faf5ca4906236aefe16643c9361a7479baf18..96f09e2e5b5bb9d6bdb0df60506a499b6d2ccded 100644 --- a/README.md +++ b/README.md @@ -331,6 +331,27 @@ Any non-empty responses are JSON formatted messages. } ``` +* /jobs/log + + This resource returns the state of the previous (or current) + tasks associated with a call to `/jobs/update`. The response + contains error or warning messages, if any were generated. + + ```json + { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "pending": {"type": "array", "items": {"type": "string"}}, + "errors": {"type": "array", "items": {"type": "string"}}, + "failed": {"type": "array", "items": {"type": "string"}}, + "warnings": {"type": "array", "items": {"type": "string"}}, + }, + "required": ["pending", "errors", "failed", "warnings"], + "additionalProperties": False + } + ``` + * /jobs/reload-router-config/*`equipment-name`* This resource updates the inventory network data for @@ -367,10 +388,10 @@ Any non-empty responses are JSON formatted messages. The source-equipment is the equipment that causes the trap, not the NMS that sends it. - + The response will be an object containing the metadata, formatted according to the following schema: - + ```json { "$schema": "http://json-schema.org/draft-07/schema#", @@ -679,10 +700,10 @@ Any non-empty responses are JSON formatted messages. The source-equipment is the equipment that causes the trap, not the NMS that sends it. - + The response will be an object containing the metadata, formatted according to the following schema: - + ```json { "$schema": "http://json-schema.org/draft-07/schema#", diff --git a/inventory_provider/routes/jobs.py b/inventory_provider/routes/jobs.py index 384e62a28ac2842f2edb00f95fa82e3652c6142d..74b8b253520c79e60b78f202a681b1c1ea812f44 100644 --- a/inventory_provider/routes/jobs.py +++ b/inventory_provider/routes/jobs.py @@ -1,8 +1,10 @@ import json import logging + from distutils.util import strtobool -import jsonschema from flask import Blueprint, current_app, jsonify, Response, request + +from inventory_provider.tasks import monitor from inventory_provider.tasks import worker from inventory_provider.routes import common from inventory_provider.tasks.common import get_current_redis, get_latch @@ -71,43 +73,62 @@ def check_update_status(): return jsonify(list(worker.check_task_status(task_id))) -LOG_ENTRY_SCHEMA = { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "uuid": {"type": "string"}, - "type": {"type": "string"}, - "clock": {"type": "integer"} - }, - "required": ["uuid", "type"], - "additionalProperties": True -} - - @routes.route("log", methods=['GET', 'POST']) @common.require_accepts_json def load_task_log(): - tasks = {} - # r = common.get_current_redis() - r = common.get_next_redis() - - for k in r.scan_iter('joblog:*'): - value = r.get(k.decode('utf-8')) - if not value: - logger.error(f'no data for log entry: {k.decode("utf-8")}') - continue - try: - value = json.loads(value.decode('utf-8')) - jsonschema.validate(value, LOG_ENTRY_SCHEMA) - except (json.JSONDecodeError, jsonschema.ValidationError): - logger.exception('error decoding entry for {k.decode("utf-8")}') - continue - - info = tasks.setdefault(value['uuid'], {}) - if value['type'] in ['task-info', 'task-warning', 'task-error']: - info.setdefault(value['type'], []).append(value) - else: - info[value['type']] = value - - return jsonify(tasks) + FINALIZATION_EVENTS = {'task-succeeded', 'task-failed', 'task-revoked'} + + config = current_app.config['INVENTORY_PROVIDER_CONFIG'] + r = get_current_redis(config) + + cache_key = 'joblog:cached-response' + result = r.get(cache_key) + if result: + result = json.loads(result.decode('utf-8')) + else: + result = { + 'pending': [], + 'warnings': [], + 'errors': [], + 'failed': [] + } + + found_tasks = False + for task in monitor.load_task_log( + current_app.config['INVENTORY_PROVIDER_CONFIG']).values(): + + found_tasks = True + + for event in task.get('task-warning', []): + result['warnings'].append(event['message']) + for event in task.get('task-error', []): + result['errors'].append(event['message']) + + # build the description if task-received is available + description = None + if 'task-received' in task: + event = task['task-received'][0] + description = f'{event["name"]}{event["args"]}' + description += f':{event["uuid"]}' + + if 'task-failed' in task: + if not description: + logger.error('found task-failed event without' + f'task-received: {task}') + description = task['task-failed'][0]['uuid'] + result['failed'].append(description) + + if 'task-started' in task: + finished = set(task.keys()) & FINALIZATION_EVENTS + if not finished: + if not description: + logger.error('found task-started event without' + f'task-received: {task}') + description = task['task-started'][0]['uuid'] + result['pending'].append(description) + + if found_tasks and not result['pending']: + r.set(cache_key, json.dumps(result)) + + return jsonify(result) diff --git a/inventory_provider/static/update.html b/inventory_provider/static/update.html index 18ba32d01b87676b3cef3f1f63696397f88a071f..c460d14740a8be22049c335c8f397569eea0abf6 100644 --- a/inventory_provider/static/update.html +++ b/inventory_provider/static/update.html @@ -30,21 +30,35 @@ {{ update_request_status }} </div> - <span class="grid-item" ng-show="latch_error||latch_pending"> - <table class="table table-striped" summary="update tasks"> + <span class="grid-item" ng-show="errors.length"> + <table class="table table-striped" summary="error messages"> <tr> - <th colspan="4" scope="col">update tasks</th> + <th scope="col">errorss</th> + </tr> + <tr ng-repeat="description in errors"> + <td>{{ description }}</td> + </tr> + </table> + </span> + + <span class="grid-item" ng-show="warnings.length"> + <table class="table table-striped" summary="warning messages"> + <tr> + <th scope="col">warnings</th> + </tr> + <tr ng-repeat="description in warnings"> + <td>{{ description }}</td> + </tr> + </table> + </span> + + <span class="grid-item" ng-show="pending.length"> + <table class="table table-striped" summary="pending tasks"> <tr> - <th scope="col">name</th> - <th scope="col">status</th> - <th scope="col">success</th> - <th scope="col">message</th> + <th scope="col">pending tasks</th> </tr> - <tr ng-repeat="t in tasks"> - <td>{{ t.name }}</td> - <td>{{ t.status }}</td> - <td>{{ t.success }}</td> - <td>{{ t.message }}</td> + <tr ng-repeat="description in pending"> + <td>{{ description }}</td> </tr> </table> </span> diff --git a/inventory_provider/static/update.js b/inventory_provider/static/update.js index 64e6807348e3d7fa065be42a01deb5a5c1334912..11d7004c08ef260a5238bc9883c74090476ff2c0 100644 --- a/inventory_provider/static/update.js +++ b/inventory_provider/static/update.js @@ -9,7 +9,10 @@ myApp.controller('update', function($scope, $http, $timeout) { $scope.update_request_status = ""; $scope.update_request_error = false; - $scope.tasks = []; + $scope.pending = []; + $scope.failed = []; + $scope.errors = []; + $scope.warnings = []; $scope.check_status = function() { @@ -27,9 +30,6 @@ myApp.controller('update', function($scope, $http, $timeout) { $scope.update_request_status = ""; } $timeout($scope.check_status, 5000); - if ($scope.latch_pending || $scope.latch_error) { - $scope.refresh_update_status(); - } }, /* error response */ function(rsp) { @@ -46,25 +46,25 @@ myApp.controller('update', function($scope, $http, $timeout) { $http({ method: 'GET', - // url: window.location.origin + "/jobs/check-task-status/9d1cbcd2-c377-4b7a-b969-04ce17f03f20" - url: window.location.origin + "/jobs/check-update-status" + url: window.location.origin + "/jobs/log" }).then( /* ok response */ function(rsp) { console.log('got update status rsp: ' + JSON.stringify(rsp.data).substring(0,30)); - $scope.tasks = rsp.data.map(t => ({ - id: t.id, - parent: t.parent, - status: t.status, - success: t.ready ? (t.success ? "OK" : "NO") : "-", - message: (t.result && t.result.message) ? t.result.message.substring(0,100) : "", - name: t.result ? t.result.task : "", - })); + $scope.pending = rsp.data.pending; + $scope.failed = rsp.data.failed; + $scope.warnings = rsp.data.warnings; + $scope.errors = rsp.data.errors; + $timeout($scope.refresh_update_status, 2000); }, /* error response */ function(rsp) { - // assume this is 404 ... - $scope.tasks = []; + // assume this is 404 ...? + $scope.pending = []; + $scope.failed = []; + $scope.errors = []; + $scope.warnings = []; + $timeout($scope.refresh_update_status, 5000); } ); @@ -94,4 +94,6 @@ myApp.controller('update', function($scope, $http, $timeout) { } $scope.check_status(); + $scope.refresh_update_status(); + }); \ No newline at end of file diff --git a/inventory_provider/tasks/monitor.py b/inventory_provider/tasks/monitor.py index 2639c5e7df0defc7b0cf4bd483aa03c0159cd9d2..f17869d3603af3c634e1bcb99fc312975774dc9e 100644 --- a/inventory_provider/tasks/monitor.py +++ b/inventory_provider/tasks/monitor.py @@ -9,17 +9,30 @@ import json import logging import os import queue +import random import threading +import jsonschema from redis.exceptions import RedisError from inventory_provider import config, environment from inventory_provider.tasks.worker import app -from inventory_provider.tasks.common import _get_redis +from inventory_provider.tasks.common import _get_redis, get_current_redis logger = logging.getLogger(__name__) INFO_EVENT_TYPES = ('task-info', 'task-warning', 'task-error') +LOG_ENTRY_SCHEMA = { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "uuid": {"type": "string"}, + "type": {"type": "string"}, + "clock": {"type": "integer"} + }, + "required": ["uuid", "type"], + "additionalProperties": True +} def _save_proc(db_queue, params, dbid): @@ -112,6 +125,104 @@ def clear_joblog(r): rp.execute() +def _redis_client_proc(key_queue, value_queue, config_params): + """ + create a local redis connection, lookup the values of + the keys that come from key_queue, and put them on value_queue + + i/o contract: + None arriving on key_queue means no more keys are coming + put None in value_queue means we are finished + + :param key_queue: + :param value_queue: + :param config_params: app config + :return: + """ + try: + r = get_current_redis(config_params) + while True: + key = key_queue.get() + + # contract is that None means no more requests + if not key: + break + + value = r.get(key).decode('utf-8') + value = json.loads(value) + jsonschema.validate(value, LOG_ENTRY_SCHEMA) + + value_queue.put(value) + + except (json.JSONDecodeError, jsonschema.ValidationError): + logger.exception(f'error decoding entry for {key}') + + finally: + # contract is to return None when finished + value_queue.put(None) + + +def load_task_log(config_params, ignored_keys=[]): + """ + load the task log in a formatted dictionary: + keys are task uuid's + values are dicts with keys like 'task-started', 'task-info', etc. ... + values of those elements are celery event dicts + + the loading is done with multiple connections in parallel, since this + method is called from an api handler and when the client is far from + the redis master the cumulative latency causes nginx/gunicorn timeouts + + :param config_params: app config + :param ignored_keys: list of keys to ignore if found + :return: + """ + response_queue = queue.Queue() + + threads = [] + for _ in range(10): + q = queue.Queue() + t = threading.Thread( + target=_redis_client_proc, + args=[q, response_queue, config_params]) + t.start() + threads.append({'thread': t, 'queue': q}) + + r = get_current_redis(config_params) + for k in r.scan_iter('joblog:*'): + k = k.decode('utf-8') + if k in ignored_keys: + logger.debug('ignoring key: {k}') + continue + + t = random.choice(threads) + t['queue'].put(k) + + # tell all threads there are no more keys coming + for t in threads: + t['queue'].put(None) + + num_finished = 0 + tasks = {} + # read values from response_queue until we receive + # None len(threads) times + while num_finished < len(threads): + value = response_queue.get() + if not value: + num_finished += 1 + logger.debug('one worker thread finished') + continue + + info = tasks.setdefault(value['uuid'], {}) + info.setdefault(value['type'], []).append(value) + + # cleanup like we're supposed to, even though it's python + for t in threads: + t['thread'].join(timeout=0.5) # timeout, for sanity + + return tasks + + if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) run() diff --git a/test/test_job_routes.py b/test/test_job_routes.py index b7ff1ddf570eab056224024da3b02b8f77541440..33bbde510defb4d7ca6642aa77a1292e2531c711 100644 --- a/test/test_job_routes.py +++ b/test/test_job_routes.py @@ -42,6 +42,19 @@ TASK_STATUS_SCHEMA = { "items": {"$ref": "#/definitions/task"} } +TASK_LOG_SCHEMA = { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "pending": {"type": "array", "items": {"type": "string"}}, + "errors": {"type": "array", "items": {"type": "string"}}, + "failed": {"type": "array", "items": {"type": "string"}}, + "warnings": {"type": "array", "items": {"type": "string"}}, + }, + "required": ["pending", "errors", "failed", "warnings"], + "additionalProperties": False +} + def backend_db(): return _get_redis({ @@ -260,21 +273,102 @@ def test_job_log(client): test_events = { 'joblog:AAAA:task-aaa': { - 'type': 'task-aaaa', 'uuid': 'AAAA', 'clock': 999}, + 'type': 'task-aaaa', 'uuid': 'AAAA'}, 'joblog:AAAB:task-infox': { - 'type': 'task-infox', 'uuid': 'AAAB', 'clock': 999}, + 'type': 'task-infox', 'uuid': 'AAAB'}, + + 'joblog:CCCC:task-received': { + 'type': 'task-received', + 'uuid': 'CCCC', + 'name': 'xyz', + 'args': ['z', 1] + }, + 'joblog:CCCC:task-started': { + 'type': 'task-started', 'uuid': 'CCCC'}, + 'joblog:CCCC:task-succeeded': { + 'type': 'task-succeeded', 'uuid': 'CCCC'}, + + 'joblog:TTTT:task-received': { + 'type': 'task-received', + 'uuid': 'TTTT', + 'name': 'xyz', + 'args': ['q', 123] + }, + 'joblog:TTTT:task-started': { + 'type': 'task-started', 'uuid': 'TTTT'}, + 'joblog:TTTT:task-failed': { + 'type': 'task-failed', 'uuid': 'TTTT'}, + + 'joblog:SSSS1:task-received': { + 'type': 'task-received', + 'uuid': 'SSSS', + 'name': 'xyz', + 'args': ['q', 123] + }, + 'joblog:SSSS1:task-started': { + 'type': 'task-started', 'uuid': 'SSSS'}, + 'joblog:SSSS2:task-received': { + 'type': 'task-received', + 'uuid': 'SSSS2', + 'name': 'xyz', + 'args': ['q', 123] + }, + 'joblog:SSSS2:task-started': { + 'type': 'task-started', 'uuid': 'SSSS2'}, + 'joblog:SSSS3:task-received': { + 'type': 'task-received', + 'uuid': 'SSSS3', + 'name': 'xyz', + 'args': ['q', 123] + }, + 'joblog:SSSS3:task-started': { + 'type': 'task-started', 'uuid': 'SSSS3'}, + 'joblog:BBBB:task-info:99': { - 'type': 'task-info', 'uuid': 'BBBB', 'clock': 99}, + 'type': 'task-info', + 'uuid': 'BBBB', + 'clock': 99, + 'message': 'x' + }, 'joblog:BBBB:task-info:999': { - 'type': 'task-info', 'uuid': 'BBBB', 'clock': 999}, + 'type': 'task-info', + 'uuid': 'BBBB', + 'clock': 999, + 'message': 'x' + }, + 'joblog:AAAA:task-warning:88': { - 'type': 'task-warning', 'uuid': 'AAAA', 'clock': 88}, + 'type': 'task-warning', + 'uuid': 'AAAA', + 'clock': 88, + 'message': 'x' + }, 'joblog:AAAA:task-warning:888': { - 'type': 'task-warning', 'uuid': 'AAAA', 'clock': 888}, + 'type': 'task-warning', + 'uuid': 'AAAA', + 'clock': 888, + 'message': 'x' + }, + 'joblog:AAAA:task-error:77': { - 'type': 'task-error', 'uuid': 'AAAA', 'clock': 77}, + 'type': 'task-error', + 'uuid': 'AAAA', + 'clock': 77, + 'message': 'x' + }, 'joblog:AAAA:task-error:777': { - 'type': 'task-error', 'uuid': 'AAAA', 'clock': 777} + 'type': 'task-error', + 'uuid': 'AAAA', + 'clock': 777, + 'message': 'x' + }, + 'joblog:AAAA:task-error:7777': { + 'type': 'task-error', + 'uuid': 'AAAA', + 'clock': 7777, + 'message': 'x' + } + } db = backend_db() @@ -286,4 +380,9 @@ def test_job_log(client): headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 result = json.loads(rv.data.decode('utf-8')) - assert len(result.keys()) == 3 # TODO: make a proper test + jsonschema.validate(result, TASK_LOG_SCHEMA) + + assert len(result['errors']) == 3 + assert len(result['pending']) == 3 + assert len(result['failed']) == 1 + assert len(result['warnings']) == 2