Skip to content
Snippets Groups Projects
Commit 5b89b713 authored by Erik Reid's avatar Erik Reid
Browse files

write events to all db's

parent 8836b7bd
No related branches found
No related tags found
No related merge requests found
...@@ -8,17 +8,41 @@ must be defined in the environment ...@@ -8,17 +8,41 @@ must be defined in the environment
import json import json
import logging import logging
import os import os
import queue
import threading
from inventory_provider import config, environment from inventory_provider import config, environment
from inventory_provider.tasks.worker import app from inventory_provider.tasks.worker import app
from inventory_provider.tasks.common import get_current_redis from inventory_provider.tasks.common import get_current_redis, _get_redis
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
INFO_EVENT_TYPES = ('task-info', 'task-warning', 'task-error') INFO_EVENT_TYPES = ('task-info', 'task-warning', 'task-error')
def _save_proc(db_queue, params, dbid):
"""
save redis events to a specific db
:param q:
:param params:
:param dbid:
:return:
"""
def _save_events(r, q):
while True:
event = q.get()
r.set(event['key'], event['value'])
while True:
try:
_save_events(_get_redis(config=params, dbid=dbid), db_queue)
except:
logger.exception('redis i/o exception, reconnecting')
# TODO: do something to terminate the process ...?
def run(): def run():
""" """
save 'task-*' events to redis, never returns save 'task-*' events to redis (all databases), never returns
""" """
environment.setup_logging() environment.setup_logging()
...@@ -30,6 +54,16 @@ def run(): ...@@ -30,6 +54,16 @@ def run():
state = app.events.State() state = app.events.State()
db_queues = []
for dbid in config_params['redis-databases']:
q = queue.Queue()
t = threading.Thread(
target=_save_proc,
args=[q, config_params, dbid])
t.start()
db_queues.append(q)
# TODO: graceful shutdown? save threads and join later?
def _log_event(event): def _log_event(event):
state.event(event) state.event(event)
...@@ -40,8 +74,9 @@ def run(): ...@@ -40,8 +74,9 @@ def run():
if event['type'] in INFO_EVENT_TYPES: if event['type'] in INFO_EVENT_TYPES:
key += f':{event["clock"]}' key += f':{event["clock"]}'
r = get_current_redis(config_params) value = json.dumps(event)
r.set(key, json.dumps(event)) for q in db_queues:
q.put({'key': key, 'value': value})
logger.debug(f'{key}: {json.dumps(event)}') logger.debug(f'{key}: {json.dumps(event)}')
...@@ -54,7 +89,7 @@ def run(): ...@@ -54,7 +89,7 @@ def run():
def clear_joblog(r): def clear_joblog(r):
""" """
:param r: :param r: connection to a redis database
:return: :return:
""" """
rp = r.pipeline() rp = r.pipeline()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment