diff --git a/test/data/update-test-db.py b/test/data/update-test-db.py index 1fc1017e69e28e875dfcf35f8a0b0363283208df..d55f3bec316d95186718ea02886409601d60164d 100644 --- a/test/data/update-test-db.py +++ b/test/data/update-test-db.py @@ -6,78 +6,126 @@ import queue import threading import logging -HOSTNAME = 'test-dashboard-storage02.geant.org' -INDEX = 0 -NUM_THREADS = 20 +def load_current_data(data_file): + with open(data_file, 'r'): + return json.load(data_file) -def _redis_client_proc(key_queue, value_queue): - r = redis.StrictRedis(host=HOSTNAME, db=INDEX) + +def _redis_client_proc(key_queue, value_queue, hostname, db_index): + r = redis.StrictRedis(host=hostname, db=db_index) while True: key = key_queue.get() - # contract is that None means no more requests if not key: break - logging.debug(f'key: {key}') value_queue.put({ 'key': key, 'value': r.get(key).decode('utf-8') }) - # contract is to return None when finished value_queue.put(None) -def docs(): - threads = [] - response_queue = queue.Queue() - - for _ in range(NUM_THREADS): - q = queue.Queue() - t = threading.Thread( - target=_redis_client_proc, - args=[q, response_queue]) - t.start() - threads.append({'thread': t, 'queue': q}) - - r = redis.StrictRedis(host=HOSTNAME, db=INDEX) - # scan with bigger batches, to mitigate network latency effects - for k in r.scan_iter(count=1000): - t = random.choice(threads) - t['queue'].put(k.decode('utf-8')) - - # tell all threads there are no more keys coming - for t in threads: - t['queue'].put(None) - - num_finished = 0 - # read values from response_queue until we receive - # None len(threads) times - while num_finished < len(threads): - value = response_queue.get() - if not value: - num_finished += 1 - logging.info( - 'one thread finished ' - f'({len(threads) - num_finished} left)') - continue - yield value - - # cleanup like we're supposed to, even though it's python - for t in threads: - t['thread'].join(timeout=0.5) # timeout, for sanity +def docs(loaders, thread_count): + logging.debug('Starting') + for loader in loaders: + logging.debug(f'loader db-index: {loader["db-index"]}') + threads = [] + response_queue = queue.Queue() + + for _ in range(thread_count): + q = queue.Queue() + t = threading.Thread( + target=_redis_client_proc, + args=[ + q, + response_queue, + loader['hostname'], + loader['db-index'] + ] + ) + t.start() + threads.append({'thread': t, 'queue': q}) + + r = redis.StrictRedis(host=loader['hostname'], db=loader['db-index']) + + for pattern in loader['key-patterns']: + logging.debug(pattern) + for k in r.scan_iter(match=pattern, count=1000): + t = random.choice(threads) + t['queue'].put(k.decode('utf-8')) + + for t in threads: + t['queue'].put(None) + + num_finished = 0 + while num_finished < len(threads): + value = response_queue.get() + if value is None: + num_finished += 1 + logging.info( + 'one thread finished ' + f'({len(threads) - num_finished} left)') + continue + yield value + + for t in threads: + t['thread'].join(timeout=0.5) if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) - d = {} - for item in docs(): + conf = { + 'file': 'test/data/router-info.json', + 'delete-all': False, + 'thread-count': 20, + 'loaders': [ + # { + # 'hostname': 'test-dashboard-storage02.geant.org', + # 'db-index': 0, + # 'key-patterns': [ + # 'juniper-peerings:*', + # 'netconf-interface-bundles:*', + # 'netconf-interfaces-hosts:*', + # 'netconf-interfaces:*', + # 'netconf:*', + # 'netdash', + # 'snmp-interfaces-single:*', + # 'snmp-interfaces:*', + # 'snmp-peerings:*', + # 'subnets:*' + # ] + # }, + # { + # 'hostname': 'test-dashboard-storage02.geant.org', + # 'db-index': 4, + # 'key-patterns': [ + # 'ims:*', + # ] + # }, + { + 'hostname': 'localhost', + 'db-index': 0, + 'key-patterns': [ + 'ims:interface_services*', + 'ims:circuit_hierarchy*', + ] + }, + ] + } + data_filename = os.path.join(os.path.dirname(__file__), conf['file']) + if conf['delete-all']: + d = {} + else: + with open(data_filename, "r") as f: + d = json.load(f) + + i = 0 + for item in docs(conf['loaders'], conf['thread-count']): d[item['key']] = item['value'] - data_filename = os.path.join(os.path.dirname(__file__), "router-info.json") - with open(data_filename, "w") as f: - f.write(json.dumps(d)) + f.write(json.dumps(d, indent=2))