Skip to content
Snippets Groups Projects
Commit 56ee3ba6 authored by Erik Reid's avatar Erik Reid
Browse files

Finished feature DBOARD3-242-monitor-log-alpha-api.

parents e879940f b665f0fa
No related branches found
No related tags found
No related merge requests found
import json
import logging
from distutils.util import strtobool
import jsonschema
from flask import Blueprint, current_app, jsonify, Response, request
from inventory_provider.tasks import worker
from inventory_provider.routes import common
from inventory_provider.tasks.common import get_current_redis, get_latch
routes = Blueprint("inventory-data-job-routes", __name__)
logger = logging.getLogger(__name__)
@routes.after_request
......@@ -65,3 +69,45 @@ def check_update_status():
task_id = task_id.decode('utf-8')
return jsonify(list(worker.check_task_status(task_id)))
LOG_ENTRY_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"uuid": {"type": "string"},
"type": {"type": "string"},
"clock": {"type": "integer"}
},
"required": ["uuid", "type"],
"additionalProperties": True
}
@routes.route("log", methods=['GET', 'POST'])
@common.require_accepts_json
def load_task_log():
tasks = {}
# r = common.get_current_redis()
r = common.get_next_redis()
for k in r.scan_iter('joblog:*'):
value = r.get(k.decode('utf-8'))
if not value:
logger.error(f'no data for log entry: {k.decode("utf-8")}')
continue
try:
value = json.loads(value.decode('utf-8'))
jsonschema.validate(value, LOG_ENTRY_SCHEMA)
except (json.JSONDecodeError, jsonschema.ValidationError):
logger.exception('error decoding entry for {k.decode("utf-8")}')
continue
info = tasks.setdefault(value['uuid'], {})
if value['type'] in ['task-info', 'task-warning', 'task-error']:
info.setdefault(value['type'], []).append(value)
else:
info[value['type']] = value
return jsonify(tasks)
......@@ -8,17 +8,49 @@ must be defined in the environment
import json
import logging
import os
import queue
import threading
from redis.exceptions import RedisError
from inventory_provider import config, environment
from inventory_provider.tasks.worker import app
from inventory_provider.tasks.common import get_current_redis
from inventory_provider.tasks.common import _get_redis
logger = logging.getLogger(__name__)
INFO_EVENT_TYPES = ('task-info', 'task-warning', 'task-error')
def _save_proc(db_queue, params, dbid):
"""
save redis events to a specific db
:param q: queue for receiving events, None means to stop
:param params:
:param dbid:
:return:
"""
def _save_events(r, q):
while True:
event = q.get()
if not event:
return
r.set(event['key'], event['value'])
while True:
try:
_save_events(_get_redis(config=params, dbid=dbid), db_queue)
# we only reach here if the event loop terminated
# normally, because None was sent
return
except RedisError:
logger.exception('redis i/o exception, reconnecting')
# TODO: do something to terminate the process ...?
def run():
"""
save 'task-*' events to redis, never returns
save 'task-*' events to redis (all databases), never returns
"""
environment.setup_logging()
......@@ -30,6 +62,15 @@ def run():
state = app.events.State()
threads = []
for dbid in config_params['redis-databases']:
q = queue.Queue()
t = threading.Thread(
target=_save_proc,
args=[q, config_params, dbid])
t.start()
threads.append({'thread': t, 'queue': q})
def _log_event(event):
state.event(event)
......@@ -40,8 +81,9 @@ def run():
if event['type'] in INFO_EVENT_TYPES:
key += f':{event["clock"]}'
r = get_current_redis(config_params)
r.set(key, json.dumps(event))
value = json.dumps(event)
for t in threads:
t['queue'].put({'key': key, 'value': value})
logger.debug(f'{key}: {json.dumps(event)}')
......@@ -51,10 +93,17 @@ def run():
})
recv.capture(limit=None, timeout=None, wakeup=True)
logger.warning('normally we should never reach here')
# allow child threads to terminate
for t in threads:
t['queue'].put(None)
t['thread'].join()
def clear_joblog(r):
"""
:param r:
:param r: connection to a redis database
:return:
"""
rp = r.pipeline()
......
......@@ -3,6 +3,8 @@ import logging
import os
import time
from redis.exceptions import RedisError
from celery import Task, states
from celery.result import AsyncResult
......@@ -14,7 +16,7 @@ from inventory_provider.tasks.app import app
from inventory_provider.tasks.common \
import get_next_redis, get_current_redis, \
latch_db, get_latch, set_latch, update_latch_status
from inventory_provider.tasks import data
from inventory_provider.tasks import data, monitor
from inventory_provider import config
from inventory_provider import environment
from inventory_provider.db import db, opsdb
......@@ -547,25 +549,31 @@ def launch_refresh_cache_all(config):
:param config: config structure as defined in config.py
:return:
"""
_erase_next_db(config)
update_latch_status(config, pending=True)
# TODO: [DBOARD3-242] catch exceptions & reset latch status
# first batch of subtasks: refresh cached opsdb data
subtasks = [
update_neteng_managed_device_list.apply_async(),
update_interfaces_to_services.apply_async(),
update_geant_lambdas.apply_async(),
update_circuit_hierarchy.apply_async()
]
[x.get() for x in subtasks]
# now launch the task whose only purpose is to
# act as a convenient parent for all of the remaining tasks
t = internal_refresh_phase_2.apply_async()
return t.id
try:
_erase_next_db(config)
monitor.clear_joblog(get_current_redis(config))
update_latch_status(config, pending=True)
# first batch of subtasks: refresh cached opsdb data
subtasks = [
update_neteng_managed_device_list.apply_async(),
update_interfaces_to_services.apply_async(),
update_geant_lambdas.apply_async(),
update_circuit_hierarchy.apply_async()
]
[x.get() for x in subtasks]
# now launch the task whose only purpose is to
# act as a convenient parent for all of the remaining tasks
t = internal_refresh_phase_2.apply_async()
return t.id
except RedisError:
update_latch_status(config, pending=False, failure=True)
logger.exception('error launching refresh subtasks')
raise
def _wait_for_tasks(task_ids, update_callback=lambda s: None):
......
......@@ -254,3 +254,36 @@ def test_latchdb(client, mocked_redis):
assert rv.status_code == 200
latch = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(latch, DB_LATCH_SCHEMA)
def test_job_log(client):
test_events = {
'joblog:AAAA:task-aaa': {
'type': 'task-aaaa', 'uuid': 'AAAA', 'clock': 999},
'joblog:AAAB:task-infox': {
'type': 'task-infox', 'uuid': 'AAAB', 'clock': 999},
'joblog:BBBB:task-info:99': {
'type': 'task-info', 'uuid': 'BBBB', 'clock': 99},
'joblog:BBBB:task-info:999': {
'type': 'task-info', 'uuid': 'BBBB', 'clock': 999},
'joblog:AAAA:task-warning:88': {
'type': 'task-warning', 'uuid': 'AAAA', 'clock': 88},
'joblog:AAAA:task-warning:888': {
'type': 'task-warning', 'uuid': 'AAAA', 'clock': 888},
'joblog:AAAA:task-error:77': {
'type': 'task-error', 'uuid': 'AAAA', 'clock': 77},
'joblog:AAAA:task-error:777': {
'type': 'task-error', 'uuid': 'AAAA', 'clock': 777}
}
db = backend_db()
for k, v in test_events.items():
db[k] = json.dumps(v)
rv = client.post(
'jobs/log',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
result = json.loads(rv.data.decode('utf-8'))
assert len(result.keys()) == 3 # TODO: make a proper test
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment