From 1aa3e9ee6c91e8fbb6df05a5d5837ff2597951c4 Mon Sep 17 00:00:00 2001 From: Robert Latta <robert.latta@geant.org> Date: Tue, 9 Mar 2021 17:15:20 +0000 Subject: [PATCH] _load_redis_docs can now take an itterable --- inventory_provider/routes/common.py | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/inventory_provider/routes/common.py b/inventory_provider/routes/common.py index 275b9765..b9801d64 100644 --- a/inventory_provider/routes/common.py +++ b/inventory_provider/routes/common.py @@ -131,6 +131,8 @@ def _redis_client_proc(key_queue, value_queue, config_params, doc_type): assert doc_type in (_DECODE_TYPE_JSON, _DECODE_TYPE_XML) def _decode(bv): + if not bv: + return value = bv.decode('utf-8') if doc_type == _DECODE_TYPE_JSON: return json.loads(value) @@ -146,10 +148,12 @@ def _redis_client_proc(key_queue, value_queue, config_params, doc_type): if not key: break - value_queue.put({ - 'key': key, - 'value': _decode(r.get(key)) - }) + v = _decode(r.get(key)) + if v is not None: + value_queue.put({ + 'key': key, + 'value': v + }) except json.JSONDecodeError: logger.exception(f'error decoding entry for {key}') @@ -172,7 +176,7 @@ def _load_redis_docs( the redis master the cumulative latency causes nginx/gunicorn timeouts :param config_params: app config - :param pattern: key pattern to load + :param key_pattern: key pattern or iterable of keys to load :param num_threads: number of client threads to create :param doc_type: decoding type to do (xml or json) :return: yields dicts like {'key': str, 'value': dict or xml doc} @@ -190,10 +194,16 @@ def _load_redis_docs( threads.append({'thread': t, 'queue': q}) r = tasks_common.get_current_redis(config_params) - # scan with bigger batches, to mitigate network latency effects - for k in r.scan_iter(key_pattern, count=1000): - t = random.choice(threads) - t['queue'].put(k.decode('utf-8')) + + if isinstance(key_pattern, str): + # scan with bigger batches, to mitigate network latency effects + for k in r.scan_iter(key_pattern, count=1000): + t = random.choice(threads) + t['queue'].put(k.decode('utf-8')) + else: + for k in key_pattern: + t = random.choice(threads) + t['queue'].put(k) # tell all threads there are no more keys coming for t in threads: -- GitLab