monitor.py 6.22 KiB
"""
standalone process that monitors celery task events and
writes them to redis for reporting
as with the other processes, INVENTORY_PROVIDER_CONFIG_FILENAME
must be defined in the environment
"""
import json
import logging
import os
import queue
import random
import threading
import jsonschema
from redis.exceptions import RedisError
from inventory_provider import config, environment
from inventory_provider.tasks.worker import app
from inventory_provider.tasks.common import _get_redis, get_current_redis
logger = logging.getLogger(__name__)
INFO_EVENT_TYPES = ('task-info', 'task-warning', 'task-error')
LOG_ENTRY_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"uuid": {"type": "string"},
"type": {"type": "string"},
"clock": {"type": "integer"}
},
"required": ["uuid", "type"],
"additionalProperties": True
}
def _save_proc(db_queue, params, dbid):
"""
save redis events to a specific db
:param q: queue for receiving events, None means to stop
:param params:
:param dbid:
:return:
"""
def _save_events(r, q):
while True:
event = q.get()
if not event:
return
r.set(event['key'], event['value'])
while True:
try:
_save_events(_get_redis(config=params, dbid=dbid), db_queue)
# we only reach here if the event loop terminated
# normally, because None was sent
return
except RedisError:
logger.exception('redis i/o exception, reconnecting')
# TODO: do something to terminate the process ...?
def run():
"""
save 'task-*' events to redis (all databases), never returns
"""
environment.setup_logging()
with open(os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']) as f:
logging.info(
'loading config from: %r'
% os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME'])
config_params = config.load(f)
state = app.events.State()
threads = []
for dbid in config_params['redis-databases']:
q = queue.Queue()
t = threading.Thread(
target=_save_proc,
args=[q, config_params, dbid])
t.start()
threads.append({'thread': t, 'queue': q})
def _log_event(event):
state.event(event)
if not event['type'].startswith('task-'):
return
key = f'joblog:{event["uuid"]}:{event["type"]}'
if event['type'] in INFO_EVENT_TYPES:
key += f':{event["clock"]}'
value = json.dumps(event)
for t in threads:
t['queue'].put({'key': key, 'value': value})
logger.debug(f'{key}: {json.dumps(event)}')
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'*': _log_event
})
recv.capture(limit=None, timeout=None, wakeup=True)
logger.warning('normally we should never reach here')
# allow child threads to terminate
for t in threads:
t['queue'].put(None)
t['thread'].join()
def clear_joblog(r):
"""
:param r: connection to a redis database
:return:
"""
rp = r.pipeline()
for key in r.scan_iter('joblog:*'):
rp.delete(key)
rp.execute()
def _redis_client_proc(key_queue, value_queue, config_params):
"""
create a local redis connection, lookup the values of
the keys that come from key_queue, and put them on value_queue
i/o contract:
None arriving on key_queue means no more keys are coming
put None in value_queue means we are finished
:param key_queue:
:param value_queue:
:param config_params: app config
:return:
"""
try:
r = get_current_redis(config_params)
while True:
key = key_queue.get()
# contract is that None means no more requests
if not key:
break
value = r.get(key).decode('utf-8')
value = json.loads(value)
jsonschema.validate(value, LOG_ENTRY_SCHEMA)
value_queue.put(value)
except (json.JSONDecodeError, jsonschema.ValidationError):
logger.exception(f'error decoding entry for {key}')
finally:
# contract is to return None when finished
value_queue.put(None)
def load_task_log(config_params, ignored_keys=[]):
"""
load the task log in a formatted dictionary:
keys are task uuid's
values are dicts with keys like 'task-started', 'task-info', etc. ...
values of those elements are celery event dicts
the loading is done with multiple connections in parallel, since this
method is called from an api handler and when the client is far from
the redis master the cumulative latency causes nginx/gunicorn timeouts
:param config_params: app config
:param ignored_keys: list of keys to ignore if found
:return:
"""
response_queue = queue.Queue()
threads = []
for _ in range(10):
q = queue.Queue()
t = threading.Thread(
target=_redis_client_proc,
args=[q, response_queue, config_params])
t.start()
threads.append({'thread': t, 'queue': q})
r = get_current_redis(config_params)
for k in r.scan_iter('joblog:*'):
k = k.decode('utf-8')
if k in ignored_keys:
logger.debug('ignoring key: {k}')
continue
t = random.choice(threads)
t['queue'].put(k)
# tell all threads there are no more keys coming
for t in threads:
t['queue'].put(None)
num_finished = 0
tasks = {}
# read values from response_queue until we receive
# None len(threads) times
while num_finished < len(threads):
value = response_queue.get()
if not value:
num_finished += 1
logger.debug('one worker thread finished')
continue
info = tasks.setdefault(value['uuid'], {})
info.setdefault(value['type'], []).append(value)
# cleanup like we're supposed to, even though it's python
for t in threads:
t['thread'].join(timeout=0.5) # timeout, for sanity
return tasks
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
run()