Skip to content
Snippets Groups Projects
Commit 8d6029ff authored by Erik Reid's avatar Erik Reid
Browse files

moved the bulk of the log/redis i/o to monitory.py

parent 990e9ab8
No related branches found
No related tags found
No related merge requests found
import json
import logging
import queue
import random
import threading
from distutils.util import strtobool
import jsonschema
from flask import Blueprint, current_app, jsonify, Response, request
from inventory_provider.tasks import common as tasks_common
from inventory_provider.tasks import monitor
from inventory_provider.tasks import worker
from inventory_provider.routes import common
from inventory_provider.tasks.common import get_current_redis, get_latch
......@@ -77,87 +72,8 @@ def check_update_status():
return jsonify(list(worker.check_task_status(task_id)))
LOG_ENTRY_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"uuid": {"type": "string"},
"type": {"type": "string"},
"clock": {"type": "integer"}
},
"required": ["uuid", "type"],
"additionalProperties": True
}
def _redis_client_proc(key_queue, value_queue, params):
try:
r = tasks_common.get_current_redis(params)
while True:
key = key_queue.get()
# contract is that None means no more requests
if not key:
break
value = r.get(key).decode('utf-8')
value = json.loads(value)
jsonschema.validate(value, LOG_ENTRY_SCHEMA)
value_queue.put(value)
except (json.JSONDecodeError, jsonschema.ValidationError):
logger.exception(f'error decoding entry for {key}')
finally:
# contract is to return None when finished
value_queue.put(None)
@routes.route("log", methods=['GET', 'POST'])
@common.require_accepts_json
def load_task_log():
response_queue = queue.Queue()
threads = []
for _ in range(10):
q = queue.Queue()
t = threading.Thread(
target=_redis_client_proc,
args=[
q,
response_queue,
current_app.config['INVENTORY_PROVIDER_CONFIG']
])
t.start()
threads.append({'thread': t, 'queue': q})
r = common.get_current_redis()
for k in r.scan_iter('joblog:*'):
t = random.choice(threads)
t['queue'].put(k.decode('utf-8'))
# tell all threads there are no more keys coming
for t in threads:
t['queue'].put(None)
num_finished = 0
tasks = {}
while num_finished < len(threads):
value = response_queue.get()
if not value:
num_finished += 1
logger.debug('one worker thread finished')
continue
info = tasks.setdefault(value['uuid'], {})
if value['type'] in ['task-info', 'task-warning', 'task-error']:
info.setdefault(value['type'], []).append(value)
else:
info[value['type']] = value
for t in threads:
t['thread'].join(timeout=0.5) # timeout, for sanity
tasks = monitor.load_task_log(current_app.config['INVENTORY_PROVIDER_CONFIG'])
return jsonify(tasks)
......@@ -9,17 +9,32 @@ import json
import logging
import os
import queue
import random
import threading
import jsonschema
from redis.exceptions import RedisError
from inventory_provider import config, environment
from inventory_provider.tasks.worker import app
from inventory_provider.tasks.common import _get_redis
from inventory_provider.tasks.common import _get_redis, get_current_redis
logger = logging.getLogger(__name__)
INFO_EVENT_TYPES = ('task-info', 'task-warning', 'task-error')
LOG_ENTRY_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"uuid": {"type": "string"},
"type": {"type": "string"},
"clock": {"type": "integer"}
},
"required": ["uuid", "type"],
"additionalProperties": True
}
def _save_proc(db_queue, params, dbid):
......@@ -112,6 +127,98 @@ def clear_joblog(r):
rp.execute()
def _redis_client_proc(key_queue, value_queue, config_params):
"""
create a local redis connection, lookup the values of
the keys that come from key_queue, and put them on value_queue
i/o contract:
None arriving on key_queue means no more keys are coming
put None in value_queue means we are finished
:param key_queue:
:param value_queue:
:param config_params: app config
:return:
"""
try:
r = get_current_redis(config_params)
while True:
key = key_queue.get()
# contract is that None means no more requests
if not key:
break
value = r.get(key).decode('utf-8')
value = json.loads(value)
jsonschema.validate(value, LOG_ENTRY_SCHEMA)
value_queue.put(value)
except (json.JSONDecodeError, jsonschema.ValidationError):
logger.exception(f'error decoding entry for {key}')
finally:
# contract is to return None when finished
value_queue.put(None)
def load_task_log(config_params):
"""
load the task log in a formatted dictionary:
keys are task uuid's
values are dicts with keys like 'task-started', 'task-info', etc. ...
values of those elements are celery event dicts
the loading is done with multiple connections in parallel, since this
method is called from an api handler and when the client is far from
the redis master the cumulative latency causes nginx/gunicorn timeouts
:param config_params: app config
:return:
"""
response_queue = queue.Queue()
threads = []
for _ in range(10):
q = queue.Queue()
t = threading.Thread(
target=_redis_client_proc,
args=[q, response_queue, config_params])
t.start()
threads.append({'thread': t, 'queue': q})
r = get_current_redis(config_params)
for k in r.scan_iter('joblog:*'):
t = random.choice(threads)
t['queue'].put(k.decode('utf-8'))
# tell all threads there are no more keys coming
for t in threads:
t['queue'].put(None)
num_finished = 0
tasks = {}
# read values from response_queue until we receive
# None len(threads) times
while num_finished < len(threads):
value = response_queue.get()
if not value:
num_finished += 1
logger.debug('one worker thread finished')
continue
info = tasks.setdefault(value['uuid'], {})
info.setdefault(value['type'], []).append(value)
# cleanup like we're supposed to, even though it's python
for t in threads:
t['thread'].join(timeout=0.5) # timeout, for sanity
return tasks
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
run()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment