""" standalone process that monitors celery task events and writes them to redis for reporting as with the other processes, INVENTORY_PROVIDER_CONFIG_FILENAME must be defined in the environment """ import json import logging import os import queue import threading from inventory_provider import config, environment from inventory_provider.tasks.worker import app from inventory_provider.tasks.common import get_current_redis, _get_redis logger = logging.getLogger(__name__) INFO_EVENT_TYPES = ('task-info', 'task-warning', 'task-error') def _save_proc(db_queue, params, dbid): """ save redis events to a specific db :param q: :param params: :param dbid: :return: """ def _save_events(r, q): while True: event = q.get() r.set(event['key'], event['value']) while True: try: _save_events(_get_redis(config=params, dbid=dbid), db_queue) except: logger.exception('redis i/o exception, reconnecting') # TODO: do something to terminate the process ...? def run(): """ save 'task-*' events to redis (all databases), never returns """ environment.setup_logging() with open(os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']) as f: logging.info( 'loading config from: %r' % os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']) config_params = config.load(f) state = app.events.State() db_queues = [] for dbid in config_params['redis-databases']: q = queue.Queue() t = threading.Thread( target=_save_proc, args=[q, config_params, dbid]) t.start() db_queues.append(q) # TODO: graceful shutdown? save threads and join later? def _log_event(event): state.event(event) if not event['type'].startswith('task-'): return key = f'joblog:{event["uuid"]}:{event["type"]}' if event['type'] in INFO_EVENT_TYPES: key += f':{event["clock"]}' value = json.dumps(event) for q in db_queues: q.put({'key': key, 'value': value}) logger.debug(f'{key}: {json.dumps(event)}') with app.connection() as connection: recv = app.events.Receiver(connection, handlers={ '*': _log_event }) recv.capture(limit=None, timeout=None, wakeup=True) def clear_joblog(r): """ :param r: connection to a redis database :return: """ rp = r.pipeline() for key in r.scan_iter('joblog:*'): rp.delete(key) rp.execute() if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) run()