diff --git a/inventory_provider/routes/jobs.py b/inventory_provider/routes/jobs.py index 384e62a28ac2842f2edb00f95fa82e3652c6142d..1e04846bc28753f0786f538e18307b0891f37cd9 100644 --- a/inventory_provider/routes/jobs.py +++ b/inventory_provider/routes/jobs.py @@ -83,25 +83,70 @@ LOG_ENTRY_SCHEMA = { "additionalProperties": True } +import queue +import random +import threading +from inventory_provider.tasks import common as tasks_common + +def _redis_client_proc(key_queue, value_queue, params): + try: + r = tasks_common.get_current_redis(params) + while True: + key = key_queue.get() + + # contract is that None means no more requests + if not key: + break + + value = r.get(key).decode('utf-8') + value = json.loads(value) + jsonschema.validate(value, LOG_ENTRY_SCHEMA) + + value_queue.put(value) + + except (json.JSONDecodeError, jsonschema.ValidationError): + logger.exception(f'error decoding entry for {key}') + + finally: + # contract is to return None when finished + value_queue.put(None) + @routes.route("log", methods=['GET', 'POST']) @common.require_accepts_json def load_task_log(): - tasks = {} - # r = common.get_current_redis() - r = common.get_next_redis() + response_queue = queue.Queue() + + threads = [] + for _ in range(10): + q = queue.Queue() + t = threading.Thread( + target=_redis_client_proc, + args=[ + q, + response_queue, + current_app.config['INVENTORY_PROVIDER_CONFIG'] + ]) + t.start() + threads.append({'thread': t, 'queue': q}) + r = common.get_current_redis() for k in r.scan_iter('joblog:*'): - value = r.get(k.decode('utf-8')) + t = random.choice(threads) + t['queue'].put(k.decode('utf-8')) + + # tell all threads there are no more keys coming + for t in threads: + t['queue'].put(None) + + num_finished = 0 + tasks = {} + while num_finished < len(threads): + value = response_queue.get() if not value: - logger.error(f'no data for log entry: {k.decode("utf-8")}') - continue - try: - value = json.loads(value.decode('utf-8')) - jsonschema.validate(value, LOG_ENTRY_SCHEMA) - except (json.JSONDecodeError, jsonschema.ValidationError): - logger.exception('error decoding entry for {k.decode("utf-8")}') + num_finished += 1 + logger.debug(f'one worker thread finished') continue info = tasks.setdefault(value['uuid'], {}) @@ -110,4 +155,7 @@ def load_task_log(): else: info[value['type']] = value + for t in threads: + t['thread'].join(timeout=0.5) # timeout, for sanity + return jsonify(tasks)