monitor.py 1.49 KiB
import json
import logging
from inventory_provider.tasks.worker import app
from inventory_provider.tasks.common import get_current_redis
from threading import Thread
logger = logging.getLogger(__name__)
INFO_EVENT_TYPES = ('task-info', 'task-warning', 'task-error')
def _monitor_proc(config):
state = app.events.State()
def _log_event(event):
state.event(event)
if not event['type'].startswith('task-'):
return
key = f'joblog:{event["uuid"]}:{event["type"]}'
if event['type'] in INFO_EVENT_TYPES:
key += f':{event["clock"]}'
r = get_current_redis(config)
r.set(key, json.dumps(event))
logger.debug(f'{key}: {json.dumps(event)}')
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'*': _log_event,
})
recv.capture(limit=None, timeout=None, wakeup=True)
def start_monitoring():
thread = Thread(target=_monitor_proc, args=())
thread.start()
def clear_joblog(r):
"""
:param r:
:return:
"""
rp = r.pipeline()
for key in r.scan_iter('joblog:*'):
rp.delete(key)
rp.execute()
if __name__ == '__main__':
import os
from inventory_provider import config
logging.basicConfig(level=logging.DEBUG)
filename = os.path.join(
os.path.dirname(__file__),
'..',
'config-sentinel.json')
with open(filename) as f:
params = config.load(f)
_monitor_proc(params)