From b665f0fad14a21b1f1c1c1329a2f902c99a38075 Mon Sep 17 00:00:00 2001
From: Erik Reid <erik.reid@geant.org>
Date: Thu, 4 Jun 2020 12:10:41 +0200
Subject: [PATCH] allow clean shutdown of monitoring process (testing only)

---
 inventory_provider/tasks/monitor.py | 23 +++++++++++++++++------
 1 file changed, 17 insertions(+), 6 deletions(-)

diff --git a/inventory_provider/tasks/monitor.py b/inventory_provider/tasks/monitor.py
index a02ede72..2639c5e7 100644
--- a/inventory_provider/tasks/monitor.py
+++ b/inventory_provider/tasks/monitor.py
@@ -25,7 +25,7 @@ INFO_EVENT_TYPES = ('task-info', 'task-warning', 'task-error')
 def _save_proc(db_queue, params, dbid):
     """
     save redis events to a specific db
-    :param q:
+    :param q: queue for receiving events, None means to stop
     :param params:
     :param dbid:
     :return:
@@ -33,11 +33,16 @@ def _save_proc(db_queue, params, dbid):
     def _save_events(r, q):
         while True:
             event = q.get()
+            if not event:
+                return
             r.set(event['key'], event['value'])
 
     while True:
         try:
             _save_events(_get_redis(config=params, dbid=dbid), db_queue)
+            # we only reach here if the event loop terminated
+            # normally, because None was sent
+            return
         except RedisError:
             logger.exception('redis i/o exception, reconnecting')
             # TODO: do something to terminate the process ...?
@@ -57,15 +62,14 @@ def run():
 
     state = app.events.State()
 
-    db_queues = []
+    threads = []
     for dbid in config_params['redis-databases']:
         q = queue.Queue()
         t = threading.Thread(
             target=_save_proc,
             args=[q, config_params, dbid])
         t.start()
-        db_queues.append(q)
-        # TODO: graceful shutdown?  save threads and join later?
+        threads.append({'thread': t, 'queue': q})
 
     def _log_event(event):
         state.event(event)
@@ -78,8 +82,8 @@ def run():
             key += f':{event["clock"]}'
 
         value = json.dumps(event)
-        for q in db_queues:
-            q.put({'key': key, 'value': value})
+        for t in threads:
+            t['queue'].put({'key': key, 'value': value})
 
         logger.debug(f'{key}: {json.dumps(event)}')
 
@@ -89,6 +93,13 @@ def run():
         })
         recv.capture(limit=None, timeout=None, wakeup=True)
 
+    logger.warning('normally we should never reach here')
+
+    # allow child threads to terminate
+    for t in threads:
+        t['queue'].put(None)
+        t['thread'].join()
+
 
 def clear_joblog(r):
     """
-- 
GitLab