import json import logging from inventory_provider.tasks.worker import app from inventory_provider.tasks.common import get_current_redis from threading import Thread logger = logging.getLogger(__name__) INFO_EVENT_TYPES = ('task-info', 'task-warning', 'task-error') def _monitor_proc(config): state = app.events.State() def _log_event(event): state.event(event) if not event['type'].startswith('task-'): return key = f'joblog:{event["uuid"]}:{event["type"]}' if event['type'] in INFO_EVENT_TYPES: key += f':{event["clock"]}' r = get_current_redis(config) r.set(key, json.dumps(event)) logger.debug(f'{key}: {json.dumps(event)}') with app.connection() as connection: recv = app.events.Receiver(connection, handlers={ '*': _log_event, }) recv.capture(limit=None, timeout=None, wakeup=True) def start_monitoring(): thread = Thread(target=_monitor_proc, args=()) thread.start() def clear_joblog(r): """ :param r: :return: """ rp = r.pipeline() for key in r.scan_iter('joblog:*'): rp.delete(key) rp.execute() if __name__ == '__main__': import os from inventory_provider import config logging.basicConfig(level=logging.DEBUG) filename = os.path.join( os.path.dirname(__file__), '..', 'config-sentinel.json') with open(filename) as f: params = config.load(f) _monitor_proc(params)