Skip to content
Snippets Groups Projects
Commit c62de765 authored by Erik Reid's avatar Erik Reid
Browse files

request lots of keys in parallel

parent 56ee3ba6
No related branches found
No related tags found
No related merge requests found
......@@ -83,25 +83,70 @@ LOG_ENTRY_SCHEMA = {
"additionalProperties": True
}
import queue
import random
import threading
from inventory_provider.tasks import common as tasks_common
def _redis_client_proc(key_queue, value_queue, params):
try:
r = tasks_common.get_current_redis(params)
while True:
key = key_queue.get()
# contract is that None means no more requests
if not key:
break
value = r.get(key).decode('utf-8')
value = json.loads(value)
jsonschema.validate(value, LOG_ENTRY_SCHEMA)
value_queue.put(value)
except (json.JSONDecodeError, jsonschema.ValidationError):
logger.exception(f'error decoding entry for {key}')
finally:
# contract is to return None when finished
value_queue.put(None)
@routes.route("log", methods=['GET', 'POST'])
@common.require_accepts_json
def load_task_log():
tasks = {}
# r = common.get_current_redis()
r = common.get_next_redis()
response_queue = queue.Queue()
threads = []
for _ in range(10):
q = queue.Queue()
t = threading.Thread(
target=_redis_client_proc,
args=[
q,
response_queue,
current_app.config['INVENTORY_PROVIDER_CONFIG']
])
t.start()
threads.append({'thread': t, 'queue': q})
r = common.get_current_redis()
for k in r.scan_iter('joblog:*'):
value = r.get(k.decode('utf-8'))
t = random.choice(threads)
t['queue'].put(k.decode('utf-8'))
# tell all threads there are no more keys coming
for t in threads:
t['queue'].put(None)
num_finished = 0
tasks = {}
while num_finished < len(threads):
value = response_queue.get()
if not value:
logger.error(f'no data for log entry: {k.decode("utf-8")}')
continue
try:
value = json.loads(value.decode('utf-8'))
jsonschema.validate(value, LOG_ENTRY_SCHEMA)
except (json.JSONDecodeError, jsonschema.ValidationError):
logger.exception('error decoding entry for {k.decode("utf-8")}')
num_finished += 1
logger.debug(f'one worker thread finished')
continue
info = tasks.setdefault(value['uuid'], {})
......@@ -110,4 +155,7 @@ def load_task_log():
else:
info[value['type']] = value
for t in threads:
t['thread'].join(timeout=0.5) # timeout, for sanity
return jsonify(tasks)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment