Skip to content
Snippets Groups Projects
Commit cd83066e authored by Erik Reid's avatar Erik Reid
Browse files

Finished feature DBOARD3-242-better-log-api.

parents 56ee3ba6 c227db21
No related branches found
No related tags found
No related merge requests found
......@@ -331,6 +331,27 @@ Any non-empty responses are JSON formatted messages.
}
```
* /jobs/log
This resource returns the state of the previous (or current)
tasks associated with a call to `/jobs/update`. The response
contains error or warning messages, if any were generated.
```json
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"pending": {"type": "array", "items": {"type": "string"}},
"errors": {"type": "array", "items": {"type": "string"}},
"failed": {"type": "array", "items": {"type": "string"}},
"warnings": {"type": "array", "items": {"type": "string"}},
},
"required": ["pending", "errors", "failed", "warnings"],
"additionalProperties": False
}
```
* /jobs/reload-router-config/*`equipment-name`*
This resource updates the inventory network data for
......@@ -367,10 +388,10 @@ Any non-empty responses are JSON formatted messages.
The source-equipment is the equipment that causes the trap, not the NMS that
sends it.
The response will be an object containing the
metadata, formatted according to the following schema:
```json
{
"$schema": "http://json-schema.org/draft-07/schema#",
......@@ -679,10 +700,10 @@ Any non-empty responses are JSON formatted messages.
The source-equipment is the equipment that causes the trap, not the NMS that
sends it.
The response will be an object containing the
metadata, formatted according to the following schema:
```json
{
"$schema": "http://json-schema.org/draft-07/schema#",
......
import json
import logging
from distutils.util import strtobool
import jsonschema
from flask import Blueprint, current_app, jsonify, Response, request
from inventory_provider.tasks import monitor
from inventory_provider.tasks import worker
from inventory_provider.routes import common
from inventory_provider.tasks.common import get_current_redis, get_latch
......@@ -71,43 +73,62 @@ def check_update_status():
return jsonify(list(worker.check_task_status(task_id)))
LOG_ENTRY_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"uuid": {"type": "string"},
"type": {"type": "string"},
"clock": {"type": "integer"}
},
"required": ["uuid", "type"],
"additionalProperties": True
}
@routes.route("log", methods=['GET', 'POST'])
@common.require_accepts_json
def load_task_log():
tasks = {}
# r = common.get_current_redis()
r = common.get_next_redis()
for k in r.scan_iter('joblog:*'):
value = r.get(k.decode('utf-8'))
if not value:
logger.error(f'no data for log entry: {k.decode("utf-8")}')
continue
try:
value = json.loads(value.decode('utf-8'))
jsonschema.validate(value, LOG_ENTRY_SCHEMA)
except (json.JSONDecodeError, jsonschema.ValidationError):
logger.exception('error decoding entry for {k.decode("utf-8")}')
continue
info = tasks.setdefault(value['uuid'], {})
if value['type'] in ['task-info', 'task-warning', 'task-error']:
info.setdefault(value['type'], []).append(value)
else:
info[value['type']] = value
return jsonify(tasks)
FINALIZATION_EVENTS = {'task-succeeded', 'task-failed', 'task-revoked'}
config = current_app.config['INVENTORY_PROVIDER_CONFIG']
r = get_current_redis(config)
cache_key = 'joblog:cached-response'
result = r.get(cache_key)
if result:
result = json.loads(result.decode('utf-8'))
else:
result = {
'pending': [],
'warnings': [],
'errors': [],
'failed': []
}
found_tasks = False
for task in monitor.load_task_log(
current_app.config['INVENTORY_PROVIDER_CONFIG']).values():
found_tasks = True
for event in task.get('task-warning', []):
result['warnings'].append(event['message'])
for event in task.get('task-error', []):
result['errors'].append(event['message'])
# build the description if task-received is available
description = None
if 'task-received' in task:
event = task['task-received'][0]
description = f'{event["name"]}{event["args"]}'
description += f':{event["uuid"]}'
if 'task-failed' in task:
if not description:
logger.error('found task-failed event without'
f'task-received: {task}')
description = task['task-failed'][0]['uuid']
result['failed'].append(description)
if 'task-started' in task:
finished = set(task.keys()) & FINALIZATION_EVENTS
if not finished:
if not description:
logger.error('found task-started event without'
f'task-received: {task}')
description = task['task-started'][0]['uuid']
result['pending'].append(description)
if found_tasks and not result['pending']:
r.set(cache_key, json.dumps(result))
return jsonify(result)
......@@ -30,21 +30,35 @@
{{ update_request_status }}
</div>
<span class="grid-item" ng-show="latch_error||latch_pending">
<table class="table table-striped" summary="update tasks">
<span class="grid-item" ng-show="errors.length">
<table class="table table-striped" summary="error messages">
<tr>
<th colspan="4" scope="col">update tasks</th>
<th scope="col">errorss</th>
</tr>
<tr ng-repeat="description in errors">
<td>{{ description }}</td>
</tr>
</table>
</span>
<span class="grid-item" ng-show="warnings.length">
<table class="table table-striped" summary="warning messages">
<tr>
<th scope="col">warnings</th>
</tr>
<tr ng-repeat="description in warnings">
<td>{{ description }}</td>
</tr>
</table>
</span>
<span class="grid-item" ng-show="pending.length">
<table class="table table-striped" summary="pending tasks">
<tr>
<th scope="col">name</th>
<th scope="col">status</th>
<th scope="col">success</th>
<th scope="col">message</th>
<th scope="col">pending tasks</th>
</tr>
<tr ng-repeat="t in tasks">
<td>{{ t.name }}</td>
<td>{{ t.status }}</td>
<td>{{ t.success }}</td>
<td>{{ t.message }}</td>
<tr ng-repeat="description in pending">
<td>{{ description }}</td>
</tr>
</table>
</span>
......
......@@ -9,7 +9,10 @@ myApp.controller('update', function($scope, $http, $timeout) {
$scope.update_request_status = "";
$scope.update_request_error = false;
$scope.tasks = [];
$scope.pending = [];
$scope.failed = [];
$scope.errors = [];
$scope.warnings = [];
$scope.check_status = function() {
......@@ -27,9 +30,6 @@ myApp.controller('update', function($scope, $http, $timeout) {
$scope.update_request_status = "";
}
$timeout($scope.check_status, 5000);
if ($scope.latch_pending || $scope.latch_error) {
$scope.refresh_update_status();
}
},
/* error response */
function(rsp) {
......@@ -46,25 +46,25 @@ myApp.controller('update', function($scope, $http, $timeout) {
$http({
method: 'GET',
// url: window.location.origin + "/jobs/check-task-status/9d1cbcd2-c377-4b7a-b969-04ce17f03f20"
url: window.location.origin + "/jobs/check-update-status"
url: window.location.origin + "/jobs/log"
}).then(
/* ok response */
function(rsp) {
console.log('got update status rsp: ' + JSON.stringify(rsp.data).substring(0,30));
$scope.tasks = rsp.data.map(t => ({
id: t.id,
parent: t.parent,
status: t.status,
success: t.ready ? (t.success ? "OK" : "NO") : "-",
message: (t.result && t.result.message) ? t.result.message.substring(0,100) : "",
name: t.result ? t.result.task : "",
}));
$scope.pending = rsp.data.pending;
$scope.failed = rsp.data.failed;
$scope.warnings = rsp.data.warnings;
$scope.errors = rsp.data.errors;
$timeout($scope.refresh_update_status, 2000);
},
/* error response */
function(rsp) {
// assume this is 404 ...
$scope.tasks = [];
// assume this is 404 ...?
$scope.pending = [];
$scope.failed = [];
$scope.errors = [];
$scope.warnings = [];
$timeout($scope.refresh_update_status, 5000);
}
);
......@@ -94,4 +94,6 @@ myApp.controller('update', function($scope, $http, $timeout) {
}
$scope.check_status();
$scope.refresh_update_status();
});
\ No newline at end of file
......@@ -9,17 +9,30 @@ import json
import logging
import os
import queue
import random
import threading
import jsonschema
from redis.exceptions import RedisError
from inventory_provider import config, environment
from inventory_provider.tasks.worker import app
from inventory_provider.tasks.common import _get_redis
from inventory_provider.tasks.common import _get_redis, get_current_redis
logger = logging.getLogger(__name__)
INFO_EVENT_TYPES = ('task-info', 'task-warning', 'task-error')
LOG_ENTRY_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"uuid": {"type": "string"},
"type": {"type": "string"},
"clock": {"type": "integer"}
},
"required": ["uuid", "type"],
"additionalProperties": True
}
def _save_proc(db_queue, params, dbid):
......@@ -112,6 +125,104 @@ def clear_joblog(r):
rp.execute()
def _redis_client_proc(key_queue, value_queue, config_params):
"""
create a local redis connection, lookup the values of
the keys that come from key_queue, and put them on value_queue
i/o contract:
None arriving on key_queue means no more keys are coming
put None in value_queue means we are finished
:param key_queue:
:param value_queue:
:param config_params: app config
:return:
"""
try:
r = get_current_redis(config_params)
while True:
key = key_queue.get()
# contract is that None means no more requests
if not key:
break
value = r.get(key).decode('utf-8')
value = json.loads(value)
jsonschema.validate(value, LOG_ENTRY_SCHEMA)
value_queue.put(value)
except (json.JSONDecodeError, jsonschema.ValidationError):
logger.exception(f'error decoding entry for {key}')
finally:
# contract is to return None when finished
value_queue.put(None)
def load_task_log(config_params, ignored_keys=[]):
"""
load the task log in a formatted dictionary:
keys are task uuid's
values are dicts with keys like 'task-started', 'task-info', etc. ...
values of those elements are celery event dicts
the loading is done with multiple connections in parallel, since this
method is called from an api handler and when the client is far from
the redis master the cumulative latency causes nginx/gunicorn timeouts
:param config_params: app config
:param ignored_keys: list of keys to ignore if found
:return:
"""
response_queue = queue.Queue()
threads = []
for _ in range(10):
q = queue.Queue()
t = threading.Thread(
target=_redis_client_proc,
args=[q, response_queue, config_params])
t.start()
threads.append({'thread': t, 'queue': q})
r = get_current_redis(config_params)
for k in r.scan_iter('joblog:*'):
k = k.decode('utf-8')
if k in ignored_keys:
logger.debug('ignoring key: {k}')
continue
t = random.choice(threads)
t['queue'].put(k)
# tell all threads there are no more keys coming
for t in threads:
t['queue'].put(None)
num_finished = 0
tasks = {}
# read values from response_queue until we receive
# None len(threads) times
while num_finished < len(threads):
value = response_queue.get()
if not value:
num_finished += 1
logger.debug('one worker thread finished')
continue
info = tasks.setdefault(value['uuid'], {})
info.setdefault(value['type'], []).append(value)
# cleanup like we're supposed to, even though it's python
for t in threads:
t['thread'].join(timeout=0.5) # timeout, for sanity
return tasks
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
run()
......@@ -42,6 +42,19 @@ TASK_STATUS_SCHEMA = {
"items": {"$ref": "#/definitions/task"}
}
TASK_LOG_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"pending": {"type": "array", "items": {"type": "string"}},
"errors": {"type": "array", "items": {"type": "string"}},
"failed": {"type": "array", "items": {"type": "string"}},
"warnings": {"type": "array", "items": {"type": "string"}},
},
"required": ["pending", "errors", "failed", "warnings"],
"additionalProperties": False
}
def backend_db():
return _get_redis({
......@@ -260,21 +273,102 @@ def test_job_log(client):
test_events = {
'joblog:AAAA:task-aaa': {
'type': 'task-aaaa', 'uuid': 'AAAA', 'clock': 999},
'type': 'task-aaaa', 'uuid': 'AAAA'},
'joblog:AAAB:task-infox': {
'type': 'task-infox', 'uuid': 'AAAB', 'clock': 999},
'type': 'task-infox', 'uuid': 'AAAB'},
'joblog:CCCC:task-received': {
'type': 'task-received',
'uuid': 'CCCC',
'name': 'xyz',
'args': ['z', 1]
},
'joblog:CCCC:task-started': {
'type': 'task-started', 'uuid': 'CCCC'},
'joblog:CCCC:task-succeeded': {
'type': 'task-succeeded', 'uuid': 'CCCC'},
'joblog:TTTT:task-received': {
'type': 'task-received',
'uuid': 'TTTT',
'name': 'xyz',
'args': ['q', 123]
},
'joblog:TTTT:task-started': {
'type': 'task-started', 'uuid': 'TTTT'},
'joblog:TTTT:task-failed': {
'type': 'task-failed', 'uuid': 'TTTT'},
'joblog:SSSS1:task-received': {
'type': 'task-received',
'uuid': 'SSSS',
'name': 'xyz',
'args': ['q', 123]
},
'joblog:SSSS1:task-started': {
'type': 'task-started', 'uuid': 'SSSS'},
'joblog:SSSS2:task-received': {
'type': 'task-received',
'uuid': 'SSSS2',
'name': 'xyz',
'args': ['q', 123]
},
'joblog:SSSS2:task-started': {
'type': 'task-started', 'uuid': 'SSSS2'},
'joblog:SSSS3:task-received': {
'type': 'task-received',
'uuid': 'SSSS3',
'name': 'xyz',
'args': ['q', 123]
},
'joblog:SSSS3:task-started': {
'type': 'task-started', 'uuid': 'SSSS3'},
'joblog:BBBB:task-info:99': {
'type': 'task-info', 'uuid': 'BBBB', 'clock': 99},
'type': 'task-info',
'uuid': 'BBBB',
'clock': 99,
'message': 'x'
},
'joblog:BBBB:task-info:999': {
'type': 'task-info', 'uuid': 'BBBB', 'clock': 999},
'type': 'task-info',
'uuid': 'BBBB',
'clock': 999,
'message': 'x'
},
'joblog:AAAA:task-warning:88': {
'type': 'task-warning', 'uuid': 'AAAA', 'clock': 88},
'type': 'task-warning',
'uuid': 'AAAA',
'clock': 88,
'message': 'x'
},
'joblog:AAAA:task-warning:888': {
'type': 'task-warning', 'uuid': 'AAAA', 'clock': 888},
'type': 'task-warning',
'uuid': 'AAAA',
'clock': 888,
'message': 'x'
},
'joblog:AAAA:task-error:77': {
'type': 'task-error', 'uuid': 'AAAA', 'clock': 77},
'type': 'task-error',
'uuid': 'AAAA',
'clock': 77,
'message': 'x'
},
'joblog:AAAA:task-error:777': {
'type': 'task-error', 'uuid': 'AAAA', 'clock': 777}
'type': 'task-error',
'uuid': 'AAAA',
'clock': 777,
'message': 'x'
},
'joblog:AAAA:task-error:7777': {
'type': 'task-error',
'uuid': 'AAAA',
'clock': 7777,
'message': 'x'
}
}
db = backend_db()
......@@ -286,4 +380,9 @@ def test_job_log(client):
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
result = json.loads(rv.data.decode('utf-8'))
assert len(result.keys()) == 3 # TODO: make a proper test
jsonschema.validate(result, TASK_LOG_SCHEMA)
assert len(result['errors']) == 3
assert len(result['pending']) == 3
assert len(result['failed']) == 1
assert len(result['warnings']) == 2
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment