From 5b89b71360664fe0e5173df56f8de57d92869d56 Mon Sep 17 00:00:00 2001
From: Erik Reid <erik.reid@geant.org>
Date: Thu, 4 Jun 2020 11:33:08 +0200
Subject: [PATCH] write events to all db's

---
 inventory_provider/tasks/monitor.py | 45 +++++++++++++++++++++++++----
 1 file changed, 40 insertions(+), 5 deletions(-)

diff --git a/inventory_provider/tasks/monitor.py b/inventory_provider/tasks/monitor.py
index 5481c9b6..389bf15a 100644
--- a/inventory_provider/tasks/monitor.py
+++ b/inventory_provider/tasks/monitor.py
@@ -8,17 +8,41 @@ must be defined in the environment
 import json
 import logging
 import os
+import queue
+import threading
 from inventory_provider import config, environment
 from inventory_provider.tasks.worker import app
-from inventory_provider.tasks.common import get_current_redis
+from inventory_provider.tasks.common import get_current_redis, _get_redis
+
 
 logger = logging.getLogger(__name__)
 INFO_EVENT_TYPES = ('task-info', 'task-warning', 'task-error')
 
 
+def _save_proc(db_queue, params, dbid):
+    """
+    save redis events to a specific db
+    :param q:
+    :param params:
+    :param dbid:
+    :return:
+    """
+    def _save_events(r, q):
+        while True:
+            event = q.get()
+            r.set(event['key'], event['value'])
+
+    while True:
+        try:
+            _save_events(_get_redis(config=params, dbid=dbid), db_queue)
+        except:
+            logger.exception('redis i/o exception, reconnecting')
+            # TODO: do something to terminate the process ...?
+
+
 def run():
     """
-    save 'task-*' events to redis, never returns
+    save 'task-*' events to redis (all databases), never returns
     """
     environment.setup_logging()
 
@@ -30,6 +54,16 @@ def run():
 
     state = app.events.State()
 
+    db_queues = []
+    for dbid in config_params['redis-databases']:
+        q = queue.Queue()
+        t = threading.Thread(
+            target=_save_proc,
+            args=[q, config_params, dbid])
+        t.start()
+        db_queues.append(q)
+        # TODO: graceful shutdown?  save threads and join later?
+
     def _log_event(event):
         state.event(event)
 
@@ -40,8 +74,9 @@ def run():
         if event['type'] in INFO_EVENT_TYPES:
             key += f':{event["clock"]}'
 
-        r = get_current_redis(config_params)
-        r.set(key, json.dumps(event))
+        value = json.dumps(event)
+        for q in db_queues:
+            q.put({'key': key, 'value': value})
 
         logger.debug(f'{key}: {json.dumps(event)}')
 
@@ -54,7 +89,7 @@ def run():
 
 def clear_joblog(r):
     """
-    :param r:
+    :param r: connection to a redis database
     :return:
     """
     rp = r.pipeline()
-- 
GitLab