diff --git a/inventory_provider/routes/jobs.py b/inventory_provider/routes/jobs.py index fc07171113483ca7af59850b8af23ec98dcf1f1b..384e62a28ac2842f2edb00f95fa82e3652c6142d 100644 --- a/inventory_provider/routes/jobs.py +++ b/inventory_provider/routes/jobs.py @@ -1,10 +1,14 @@ +import json +import logging from distutils.util import strtobool +import jsonschema from flask import Blueprint, current_app, jsonify, Response, request from inventory_provider.tasks import worker from inventory_provider.routes import common from inventory_provider.tasks.common import get_current_redis, get_latch routes = Blueprint("inventory-data-job-routes", __name__) +logger = logging.getLogger(__name__) @routes.after_request @@ -65,3 +69,45 @@ def check_update_status(): task_id = task_id.decode('utf-8') return jsonify(list(worker.check_task_status(task_id))) + + +LOG_ENTRY_SCHEMA = { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "uuid": {"type": "string"}, + "type": {"type": "string"}, + "clock": {"type": "integer"} + }, + "required": ["uuid", "type"], + "additionalProperties": True +} + + +@routes.route("log", methods=['GET', 'POST']) +@common.require_accepts_json +def load_task_log(): + + tasks = {} + # r = common.get_current_redis() + r = common.get_next_redis() + + for k in r.scan_iter('joblog:*'): + value = r.get(k.decode('utf-8')) + if not value: + logger.error(f'no data for log entry: {k.decode("utf-8")}') + continue + try: + value = json.loads(value.decode('utf-8')) + jsonschema.validate(value, LOG_ENTRY_SCHEMA) + except (json.JSONDecodeError, jsonschema.ValidationError): + logger.exception('error decoding entry for {k.decode("utf-8")}') + continue + + info = tasks.setdefault(value['uuid'], {}) + if value['type'] in ['task-info', 'task-warning', 'task-error']: + info.setdefault(value['type'], []).append(value) + else: + info[value['type']] = value + + return jsonify(tasks) diff --git a/inventory_provider/tasks/monitor.py b/inventory_provider/tasks/monitor.py index 5481c9b6ade1acac493a04a53665c4cca5bd93e5..2639c5e7df0defc7b0cf4bd483aa03c0159cd9d2 100644 --- a/inventory_provider/tasks/monitor.py +++ b/inventory_provider/tasks/monitor.py @@ -8,17 +8,49 @@ must be defined in the environment import json import logging import os +import queue +import threading + +from redis.exceptions import RedisError + from inventory_provider import config, environment from inventory_provider.tasks.worker import app -from inventory_provider.tasks.common import get_current_redis +from inventory_provider.tasks.common import _get_redis + logger = logging.getLogger(__name__) INFO_EVENT_TYPES = ('task-info', 'task-warning', 'task-error') +def _save_proc(db_queue, params, dbid): + """ + save redis events to a specific db + :param q: queue for receiving events, None means to stop + :param params: + :param dbid: + :return: + """ + def _save_events(r, q): + while True: + event = q.get() + if not event: + return + r.set(event['key'], event['value']) + + while True: + try: + _save_events(_get_redis(config=params, dbid=dbid), db_queue) + # we only reach here if the event loop terminated + # normally, because None was sent + return + except RedisError: + logger.exception('redis i/o exception, reconnecting') + # TODO: do something to terminate the process ...? + + def run(): """ - save 'task-*' events to redis, never returns + save 'task-*' events to redis (all databases), never returns """ environment.setup_logging() @@ -30,6 +62,15 @@ def run(): state = app.events.State() + threads = [] + for dbid in config_params['redis-databases']: + q = queue.Queue() + t = threading.Thread( + target=_save_proc, + args=[q, config_params, dbid]) + t.start() + threads.append({'thread': t, 'queue': q}) + def _log_event(event): state.event(event) @@ -40,8 +81,9 @@ def run(): if event['type'] in INFO_EVENT_TYPES: key += f':{event["clock"]}' - r = get_current_redis(config_params) - r.set(key, json.dumps(event)) + value = json.dumps(event) + for t in threads: + t['queue'].put({'key': key, 'value': value}) logger.debug(f'{key}: {json.dumps(event)}') @@ -51,10 +93,17 @@ def run(): }) recv.capture(limit=None, timeout=None, wakeup=True) + logger.warning('normally we should never reach here') + + # allow child threads to terminate + for t in threads: + t['queue'].put(None) + t['thread'].join() + def clear_joblog(r): """ - :param r: + :param r: connection to a redis database :return: """ rp = r.pipeline() diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index 48058c7cb3d2ad2db081e0a5b5c6d68649eae73f..d4779f6fe85135ff0fa2aa1188317f9593f81579 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -3,6 +3,8 @@ import logging import os import time +from redis.exceptions import RedisError + from celery import Task, states from celery.result import AsyncResult @@ -14,7 +16,7 @@ from inventory_provider.tasks.app import app from inventory_provider.tasks.common \ import get_next_redis, get_current_redis, \ latch_db, get_latch, set_latch, update_latch_status -from inventory_provider.tasks import data +from inventory_provider.tasks import data, monitor from inventory_provider import config from inventory_provider import environment from inventory_provider.db import db, opsdb @@ -547,25 +549,31 @@ def launch_refresh_cache_all(config): :param config: config structure as defined in config.py :return: """ - _erase_next_db(config) - - update_latch_status(config, pending=True) - # TODO: [DBOARD3-242] catch exceptions & reset latch status - - # first batch of subtasks: refresh cached opsdb data - subtasks = [ - update_neteng_managed_device_list.apply_async(), - update_interfaces_to_services.apply_async(), - update_geant_lambdas.apply_async(), - update_circuit_hierarchy.apply_async() - ] - [x.get() for x in subtasks] - - # now launch the task whose only purpose is to - # act as a convenient parent for all of the remaining tasks - t = internal_refresh_phase_2.apply_async() - return t.id + try: + _erase_next_db(config) + monitor.clear_joblog(get_current_redis(config)) + + update_latch_status(config, pending=True) + + # first batch of subtasks: refresh cached opsdb data + subtasks = [ + update_neteng_managed_device_list.apply_async(), + update_interfaces_to_services.apply_async(), + update_geant_lambdas.apply_async(), + update_circuit_hierarchy.apply_async() + ] + [x.get() for x in subtasks] + + # now launch the task whose only purpose is to + # act as a convenient parent for all of the remaining tasks + t = internal_refresh_phase_2.apply_async() + return t.id + + except RedisError: + update_latch_status(config, pending=False, failure=True) + logger.exception('error launching refresh subtasks') + raise def _wait_for_tasks(task_ids, update_callback=lambda s: None): diff --git a/test/test_job_routes.py b/test/test_job_routes.py index 4f2b473c3abe34cc5c0744e0b48cde14c96522eb..b7ff1ddf570eab056224024da3b02b8f77541440 100644 --- a/test/test_job_routes.py +++ b/test/test_job_routes.py @@ -254,3 +254,36 @@ def test_latchdb(client, mocked_redis): assert rv.status_code == 200 latch = json.loads(rv.data.decode('utf-8')) jsonschema.validate(latch, DB_LATCH_SCHEMA) + + +def test_job_log(client): + + test_events = { + 'joblog:AAAA:task-aaa': { + 'type': 'task-aaaa', 'uuid': 'AAAA', 'clock': 999}, + 'joblog:AAAB:task-infox': { + 'type': 'task-infox', 'uuid': 'AAAB', 'clock': 999}, + 'joblog:BBBB:task-info:99': { + 'type': 'task-info', 'uuid': 'BBBB', 'clock': 99}, + 'joblog:BBBB:task-info:999': { + 'type': 'task-info', 'uuid': 'BBBB', 'clock': 999}, + 'joblog:AAAA:task-warning:88': { + 'type': 'task-warning', 'uuid': 'AAAA', 'clock': 88}, + 'joblog:AAAA:task-warning:888': { + 'type': 'task-warning', 'uuid': 'AAAA', 'clock': 888}, + 'joblog:AAAA:task-error:77': { + 'type': 'task-error', 'uuid': 'AAAA', 'clock': 77}, + 'joblog:AAAA:task-error:777': { + 'type': 'task-error', 'uuid': 'AAAA', 'clock': 777} + } + + db = backend_db() + for k, v in test_events.items(): + db[k] = json.dumps(v) + + rv = client.post( + 'jobs/log', + headers=DEFAULT_REQUEST_HEADERS) + assert rv.status_code == 200 + result = json.loads(rv.data.decode('utf-8')) + assert len(result.keys()) == 3 # TODO: make a proper test