Skip to content
Snippets Groups Projects
monitor.py 1.49 KiB
import json
import logging
from inventory_provider.tasks.worker import app
from inventory_provider.tasks.common import get_current_redis

from threading import Thread

logger = logging.getLogger(__name__)
INFO_EVENT_TYPES = ('task-info', 'task-warning', 'task-error')


def _monitor_proc(config):

    state = app.events.State()

    def _log_event(event):
        state.event(event)

        if not event['type'].startswith('task-'):
            return

        key = f'joblog:{event["uuid"]}:{event["type"]}'
        if event['type'] in INFO_EVENT_TYPES:
            key += f':{event["clock"]}'

        r = get_current_redis(config)
        r.set(key, json.dumps(event))

        logger.debug(f'{key}: {json.dumps(event)}')

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                '*': _log_event,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)


def start_monitoring():
    thread = Thread(target=_monitor_proc, args=())
    thread.start()


def clear_joblog(r):
    """

    :param r:
    :return:
    """
    rp = r.pipeline()
    for key in r.scan_iter('joblog:*'):
        rp.delete(key)
    rp.execute()


if __name__ == '__main__':
    import os
    from inventory_provider import config

    logging.basicConfig(level=logging.DEBUG)

    filename = os.path.join(
        os.path.dirname(__file__),
        '..',
        'config-sentinel.json')

    with open(filename) as f:
        params = config.load(f)

    _monitor_proc(params)