""" standalone process that monitors celery task events and writes them to redis for reporting as with the other processes, INVENTORY_PROVIDER_CONFIG_FILENAME must be defined in the environment """ import json import logging import os import queue import random import threading import jsonschema from redis.exceptions import RedisError from inventory_provider import config, environment from inventory_provider.tasks.worker import app from inventory_provider.tasks.common import _get_redis, get_current_redis logger = logging.getLogger(__name__) INFO_EVENT_TYPES = ('task-info', 'task-warning', 'task-error') LOG_ENTRY_SCHEMA = { "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": { "uuid": {"type": "string"}, "type": {"type": "string"}, "clock": {"type": "integer"} }, "required": ["uuid", "type"], "additionalProperties": True } def _save_proc(db_queue, params, dbid): """ save redis events to a specific db :param q: queue for receiving events, None means to stop :param params: :param dbid: :return: """ def _save_events(r, q): while True: event = q.get() if not event: return r.set(event['key'], event['value']) while True: try: _save_events(_get_redis(config=params, dbid=dbid), db_queue) # we only reach here if the event loop terminated # normally, because None was sent return except RedisError: logger.exception('redis i/o exception, reconnecting') # TODO: do something to terminate the process ...? def run(): """ save 'task-*' events to redis (all databases), never returns """ environment.setup_logging() with open(os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']) as f: logging.info( 'loading config from: %r' % os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']) config_params = config.load(f) state = app.events.State() threads = [] for dbid in config_params['redis-databases']: q = queue.Queue() t = threading.Thread( target=_save_proc, args=[q, config_params, dbid]) t.start() threads.append({'thread': t, 'queue': q}) def _log_event(event): state.event(event) if not event['type'].startswith('task-'): return key = f'joblog:{event["uuid"]}:{event["type"]}' if event['type'] in INFO_EVENT_TYPES: key += f':{event["clock"]}' value = json.dumps(event) for t in threads: t['queue'].put({'key': key, 'value': value}) logger.debug(f'{key}: {json.dumps(event)}') with app.connection() as connection: recv = app.events.Receiver(connection, handlers={ '*': _log_event }) recv.capture(limit=None, timeout=None, wakeup=True) logger.warning('normally we should never reach here') # allow child threads to terminate for t in threads: t['queue'].put(None) t['thread'].join() def clear_joblog(r): """ :param r: connection to a redis database :return: """ rp = r.pipeline() for key in r.scan_iter('joblog:*'): rp.delete(key) rp.execute() def _redis_client_proc(key_queue, value_queue, config_params): """ create a local redis connection, lookup the values of the keys that come from key_queue, and put them on value_queue i/o contract: None arriving on key_queue means no more keys are coming put None in value_queue means we are finished :param key_queue: :param value_queue: :param config_params: app config :return: """ try: r = get_current_redis(config_params) while True: key = key_queue.get() # contract is that None means no more requests if not key: break value = r.get(key).decode('utf-8') value = json.loads(value) jsonschema.validate(value, LOG_ENTRY_SCHEMA) value_queue.put(value) except (json.JSONDecodeError, jsonschema.ValidationError): logger.exception(f'error decoding entry for {key}') finally: # contract is to return None when finished value_queue.put(None) def load_task_log(config_params, ignored_keys=[]): """ load the task log in a formatted dictionary: keys are task uuid's values are dicts with keys like 'task-started', 'task-info', etc. ... values of those elements are celery event dicts the loading is done with multiple connections in parallel, since this method is called from an api handler and when the client is far from the redis master the cumulative latency causes nginx/gunicorn timeouts :param config_params: app config :param ignored_keys: list of keys to ignore if found :return: """ response_queue = queue.Queue() threads = [] for _ in range(10): q = queue.Queue() t = threading.Thread( target=_redis_client_proc, args=[q, response_queue, config_params]) t.start() threads.append({'thread': t, 'queue': q}) r = get_current_redis(config_params) for k in r.scan_iter('joblog:*'): k = k.decode('utf-8') if k in ignored_keys: logger.debug('ignoring key: {k}') continue t = random.choice(threads) t['queue'].put(k) # tell all threads there are no more keys coming for t in threads: t['queue'].put(None) num_finished = 0 tasks = {} # read values from response_queue until we receive # None len(threads) times while num_finished < len(threads): value = response_queue.get() if not value: num_finished += 1 logger.debug('one worker thread finished') continue info = tasks.setdefault(value['uuid'], {}) info.setdefault(value['type'], []).append(value) # cleanup like we're supposed to, even though it's python for t in threads: t['thread'].join(timeout=0.5) # timeout, for sanity return tasks if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) run()