Skip to content
Snippets Groups Projects
monitor.py 6.22 KiB
"""
standalone process that monitors celery task events and
writes them to redis for reporting

as with the other processes, INVENTORY_PROVIDER_CONFIG_FILENAME
must be defined in the environment
"""
import json
import logging
import os
import queue
import random
import threading

import jsonschema
from redis.exceptions import RedisError

from inventory_provider import config, environment
from inventory_provider.tasks.worker import app
from inventory_provider.tasks.common import _get_redis, get_current_redis


logger = logging.getLogger(__name__)
INFO_EVENT_TYPES = ('task-info', 'task-warning', 'task-error')
LOG_ENTRY_SCHEMA = {
    "$schema": "http://json-schema.org/draft-07/schema#",
    "type": "object",
    "properties": {
        "uuid": {"type": "string"},
        "type": {"type": "string"},
        "clock": {"type": "integer"}
    },
    "required": ["uuid", "type"],
    "additionalProperties": True
}


def _save_proc(db_queue, params, dbid):
    """
    save redis events to a specific db
    :param q: queue for receiving events, None means to stop
    :param params:
    :param dbid:
    :return:
    """
    def _save_events(r, q):
        while True:
            event = q.get()
            if not event:
                return
            r.set(event['key'], event['value'])

    while True:
        try:
            _save_events(_get_redis(config=params, dbid=dbid), db_queue)
            # we only reach here if the event loop terminated
            # normally, because None was sent
            return
        except RedisError:
            logger.exception('redis i/o exception, reconnecting')
            # TODO: do something to terminate the process ...?


def run():
    """
    save 'task-*' events to redis (all databases), never returns
    """
    environment.setup_logging()

    with open(os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']) as f:
        logging.info(
            'loading config from: %r'
            % os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME'])
        config_params = config.load(f)

    state = app.events.State()

    threads = []
    for dbid in config_params['redis-databases']:
        q = queue.Queue()
        t = threading.Thread(
            target=_save_proc,
            args=[q, config_params, dbid])
        t.start()
        threads.append({'thread': t, 'queue': q})

    def _log_event(event):
        state.event(event)

        if not event['type'].startswith('task-'):
            return

        key = f'joblog:{event["uuid"]}:{event["type"]}'
        if event['type'] in INFO_EVENT_TYPES:
            key += f':{event["clock"]}'

        value = json.dumps(event)
        for t in threads:
            t['queue'].put({'key': key, 'value': value})

        logger.debug(f'{key}: {json.dumps(event)}')

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                '*': _log_event
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

    logger.warning('normally we should never reach here')

    # allow child threads to terminate
    for t in threads:
        t['queue'].put(None)
        t['thread'].join()


def clear_joblog(r):
    """
    :param r: connection to a redis database
    :return:
    """
    rp = r.pipeline()
    for key in r.scan_iter('joblog:*'):
        rp.delete(key)
    rp.execute()


def _redis_client_proc(key_queue, value_queue, config_params):
    """
    create a local redis connection, lookup the values of
    the keys that come from key_queue, and put them on value_queue

    i/o contract:
        None arriving on key_queue means no more keys are coming
        put None in value_queue means we are finished

    :param key_queue:
    :param value_queue:
    :param config_params: app config
    :return:
    """
    try:
        r = get_current_redis(config_params)
        while True:
            key = key_queue.get()

            # contract is that None means no more requests
            if not key:
                break

            value = r.get(key).decode('utf-8')
            value = json.loads(value)
            jsonschema.validate(value, LOG_ENTRY_SCHEMA)

            value_queue.put(value)

    except (json.JSONDecodeError, jsonschema.ValidationError):
        logger.exception(f'error decoding entry for {key}')

    finally:
        # contract is to return None when finished
        value_queue.put(None)


def load_task_log(config_params, ignored_keys=[]):
    """
    load the task log in a formatted dictionary:
      keys are task uuid's
      values are dicts with keys like 'task-started', 'task-info', etc. ...
      values of those elements are celery event dicts

    the loading is done with multiple connections in parallel, since this
    method is called from an api handler and when the client is far from
    the redis master the cumulative latency causes nginx/gunicorn timeouts

    :param config_params: app config
    :param ignored_keys: list of keys to ignore if found
    :return:
    """
    response_queue = queue.Queue()

    threads = []
    for _ in range(10):
        q = queue.Queue()
        t = threading.Thread(
            target=_redis_client_proc,
            args=[q, response_queue, config_params])
        t.start()
        threads.append({'thread': t, 'queue': q})

    r = get_current_redis(config_params)
    for k in r.scan_iter('joblog:*'):
        k = k.decode('utf-8')
        if k in ignored_keys:
            logger.debug('ignoring key: {k}')
            continue

        t = random.choice(threads)
        t['queue'].put(k)

    # tell all threads there are no more keys coming
    for t in threads:
        t['queue'].put(None)

    num_finished = 0
    tasks = {}
    # read values from response_queue until we receive
    # None len(threads) times
    while num_finished < len(threads):
        value = response_queue.get()
        if not value:
            num_finished += 1
            logger.debug('one worker thread finished')
            continue

        info = tasks.setdefault(value['uuid'], {})
        info.setdefault(value['type'], []).append(value)

    # cleanup like we're supposed to, even though it's python
    for t in threads:
        t['thread'].join(timeout=0.5)  # timeout, for sanity

    return tasks


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    run()