Skip to content
Snippets Groups Projects
Commit b665f0fa authored by Erik Reid's avatar Erik Reid
Browse files

allow clean shutdown of monitoring process (testing only)

parent c93ab2db
Branches release/1.6.2
No related tags found
No related merge requests found
......@@ -25,7 +25,7 @@ INFO_EVENT_TYPES = ('task-info', 'task-warning', 'task-error')
def _save_proc(db_queue, params, dbid):
"""
save redis events to a specific db
:param q:
:param q: queue for receiving events, None means to stop
:param params:
:param dbid:
:return:
......@@ -33,11 +33,16 @@ def _save_proc(db_queue, params, dbid):
def _save_events(r, q):
while True:
event = q.get()
if not event:
return
r.set(event['key'], event['value'])
while True:
try:
_save_events(_get_redis(config=params, dbid=dbid), db_queue)
# we only reach here if the event loop terminated
# normally, because None was sent
return
except RedisError:
logger.exception('redis i/o exception, reconnecting')
# TODO: do something to terminate the process ...?
......@@ -57,15 +62,14 @@ def run():
state = app.events.State()
db_queues = []
threads = []
for dbid in config_params['redis-databases']:
q = queue.Queue()
t = threading.Thread(
target=_save_proc,
args=[q, config_params, dbid])
t.start()
db_queues.append(q)
# TODO: graceful shutdown? save threads and join later?
threads.append({'thread': t, 'queue': q})
def _log_event(event):
state.event(event)
......@@ -78,8 +82,8 @@ def run():
key += f':{event["clock"]}'
value = json.dumps(event)
for q in db_queues:
q.put({'key': key, 'value': value})
for t in threads:
t['queue'].put({'key': key, 'value': value})
logger.debug(f'{key}: {json.dumps(event)}')
......@@ -89,6 +93,13 @@ def run():
})
recv.capture(limit=None, timeout=None, wakeup=True)
logger.warning('normally we should never reach here')
# allow child threads to terminate
for t in threads:
t['queue'].put(None)
t['thread'].join()
def clear_joblog(r):
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment