Skip to content
Snippets Groups Projects
Commit 1aa3e9ee authored by Robert Latta's avatar Robert Latta
Browse files

_load_redis_docs can now take an itterable

parent 4f50234c
No related branches found
No related tags found
No related merge requests found
...@@ -131,6 +131,8 @@ def _redis_client_proc(key_queue, value_queue, config_params, doc_type): ...@@ -131,6 +131,8 @@ def _redis_client_proc(key_queue, value_queue, config_params, doc_type):
assert doc_type in (_DECODE_TYPE_JSON, _DECODE_TYPE_XML) assert doc_type in (_DECODE_TYPE_JSON, _DECODE_TYPE_XML)
def _decode(bv): def _decode(bv):
if not bv:
return
value = bv.decode('utf-8') value = bv.decode('utf-8')
if doc_type == _DECODE_TYPE_JSON: if doc_type == _DECODE_TYPE_JSON:
return json.loads(value) return json.loads(value)
...@@ -146,10 +148,12 @@ def _redis_client_proc(key_queue, value_queue, config_params, doc_type): ...@@ -146,10 +148,12 @@ def _redis_client_proc(key_queue, value_queue, config_params, doc_type):
if not key: if not key:
break break
value_queue.put({ v = _decode(r.get(key))
'key': key, if v is not None:
'value': _decode(r.get(key)) value_queue.put({
}) 'key': key,
'value': v
})
except json.JSONDecodeError: except json.JSONDecodeError:
logger.exception(f'error decoding entry for {key}') logger.exception(f'error decoding entry for {key}')
...@@ -172,7 +176,7 @@ def _load_redis_docs( ...@@ -172,7 +176,7 @@ def _load_redis_docs(
the redis master the cumulative latency causes nginx/gunicorn timeouts the redis master the cumulative latency causes nginx/gunicorn timeouts
:param config_params: app config :param config_params: app config
:param pattern: key pattern to load :param key_pattern: key pattern or iterable of keys to load
:param num_threads: number of client threads to create :param num_threads: number of client threads to create
:param doc_type: decoding type to do (xml or json) :param doc_type: decoding type to do (xml or json)
:return: yields dicts like {'key': str, 'value': dict or xml doc} :return: yields dicts like {'key': str, 'value': dict or xml doc}
...@@ -190,10 +194,16 @@ def _load_redis_docs( ...@@ -190,10 +194,16 @@ def _load_redis_docs(
threads.append({'thread': t, 'queue': q}) threads.append({'thread': t, 'queue': q})
r = tasks_common.get_current_redis(config_params) r = tasks_common.get_current_redis(config_params)
# scan with bigger batches, to mitigate network latency effects
for k in r.scan_iter(key_pattern, count=1000): if isinstance(key_pattern, str):
t = random.choice(threads) # scan with bigger batches, to mitigate network latency effects
t['queue'].put(k.decode('utf-8')) for k in r.scan_iter(key_pattern, count=1000):
t = random.choice(threads)
t['queue'].put(k.decode('utf-8'))
else:
for k in key_pattern:
t = random.choice(threads)
t['queue'].put(k)
# tell all threads there are no more keys coming # tell all threads there are no more keys coming
for t in threads: for t in threads:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment