diff --git a/inventory_provider/routes/jobs.py b/inventory_provider/routes/jobs.py index 634e7bf248ca3c863cde05392f966392e21639ad..87a9f29fd7a7111869548cc5c5f8270021502eb1 100644 --- a/inventory_provider/routes/jobs.py +++ b/inventory_provider/routes/jobs.py @@ -1,14 +1,9 @@ -import json import logging -import queue -import random -import threading from distutils.util import strtobool -import jsonschema from flask import Blueprint, current_app, jsonify, Response, request -from inventory_provider.tasks import common as tasks_common +from inventory_provider.tasks import monitor from inventory_provider.tasks import worker from inventory_provider.routes import common from inventory_provider.tasks.common import get_current_redis, get_latch @@ -77,87 +72,8 @@ def check_update_status(): return jsonify(list(worker.check_task_status(task_id))) -LOG_ENTRY_SCHEMA = { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "uuid": {"type": "string"}, - "type": {"type": "string"}, - "clock": {"type": "integer"} - }, - "required": ["uuid", "type"], - "additionalProperties": True -} - - -def _redis_client_proc(key_queue, value_queue, params): - try: - r = tasks_common.get_current_redis(params) - while True: - key = key_queue.get() - - # contract is that None means no more requests - if not key: - break - - value = r.get(key).decode('utf-8') - value = json.loads(value) - jsonschema.validate(value, LOG_ENTRY_SCHEMA) - - value_queue.put(value) - - except (json.JSONDecodeError, jsonschema.ValidationError): - logger.exception(f'error decoding entry for {key}') - - finally: - # contract is to return None when finished - value_queue.put(None) - - @routes.route("log", methods=['GET', 'POST']) @common.require_accepts_json def load_task_log(): - - response_queue = queue.Queue() - - threads = [] - for _ in range(10): - q = queue.Queue() - t = threading.Thread( - target=_redis_client_proc, - args=[ - q, - response_queue, - current_app.config['INVENTORY_PROVIDER_CONFIG'] - ]) - t.start() - threads.append({'thread': t, 'queue': q}) - - r = common.get_current_redis() - for k in r.scan_iter('joblog:*'): - t = random.choice(threads) - t['queue'].put(k.decode('utf-8')) - - # tell all threads there are no more keys coming - for t in threads: - t['queue'].put(None) - - num_finished = 0 - tasks = {} - while num_finished < len(threads): - value = response_queue.get() - if not value: - num_finished += 1 - logger.debug('one worker thread finished') - continue - - info = tasks.setdefault(value['uuid'], {}) - if value['type'] in ['task-info', 'task-warning', 'task-error']: - info.setdefault(value['type'], []).append(value) - else: - info[value['type']] = value - - for t in threads: - t['thread'].join(timeout=0.5) # timeout, for sanity - + tasks = monitor.load_task_log(current_app.config['INVENTORY_PROVIDER_CONFIG']) return jsonify(tasks) diff --git a/inventory_provider/tasks/monitor.py b/inventory_provider/tasks/monitor.py index 2639c5e7df0defc7b0cf4bd483aa03c0159cd9d2..d12bd5c489a2e11a801849703a6f6edd9720b892 100644 --- a/inventory_provider/tasks/monitor.py +++ b/inventory_provider/tasks/monitor.py @@ -9,17 +9,32 @@ import json import logging import os import queue +import random import threading +import jsonschema from redis.exceptions import RedisError from inventory_provider import config, environment from inventory_provider.tasks.worker import app -from inventory_provider.tasks.common import _get_redis +from inventory_provider.tasks.common import _get_redis, get_current_redis logger = logging.getLogger(__name__) INFO_EVENT_TYPES = ('task-info', 'task-warning', 'task-error') +LOG_ENTRY_SCHEMA = { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "uuid": {"type": "string"}, + "type": {"type": "string"}, + "clock": {"type": "integer"} + }, + "required": ["uuid", "type"], + "additionalProperties": True +} + + def _save_proc(db_queue, params, dbid): @@ -112,6 +127,98 @@ def clear_joblog(r): rp.execute() +def _redis_client_proc(key_queue, value_queue, config_params): + """ + create a local redis connection, lookup the values of + the keys that come from key_queue, and put them on value_queue + + i/o contract: + None arriving on key_queue means no more keys are coming + put None in value_queue means we are finished + + :param key_queue: + :param value_queue: + :param config_params: app config + :return: + """ + try: + r = get_current_redis(config_params) + while True: + key = key_queue.get() + + # contract is that None means no more requests + if not key: + break + + value = r.get(key).decode('utf-8') + value = json.loads(value) + jsonschema.validate(value, LOG_ENTRY_SCHEMA) + + value_queue.put(value) + + except (json.JSONDecodeError, jsonschema.ValidationError): + logger.exception(f'error decoding entry for {key}') + + finally: + # contract is to return None when finished + value_queue.put(None) + + +def load_task_log(config_params): + """ + load the task log in a formatted dictionary: + keys are task uuid's + values are dicts with keys like 'task-started', 'task-info', etc. ... + values of those elements are celery event dicts + + the loading is done with multiple connections in parallel, since this + method is called from an api handler and when the client is far from + the redis master the cumulative latency causes nginx/gunicorn timeouts + + :param config_params: app config + :return: + """ + response_queue = queue.Queue() + + threads = [] + for _ in range(10): + q = queue.Queue() + t = threading.Thread( + target=_redis_client_proc, + args=[q, response_queue, config_params]) + t.start() + threads.append({'thread': t, 'queue': q}) + + r = get_current_redis(config_params) + for k in r.scan_iter('joblog:*'): + t = random.choice(threads) + t['queue'].put(k.decode('utf-8')) + + # tell all threads there are no more keys coming + for t in threads: + t['queue'].put(None) + + num_finished = 0 + tasks = {} + # read values from response_queue until we receive + # None len(threads) times + while num_finished < len(threads): + value = response_queue.get() + if not value: + num_finished += 1 + logger.debug('one worker thread finished') + continue + + info = tasks.setdefault(value['uuid'], {}) + info.setdefault(value['type'], []).append(value) + + # cleanup like we're supposed to, even though it's python + for t in threads: + t['thread'].join(timeout=0.5) # timeout, for sanity + + return tasks + + if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) run()