Skip to content
Snippets Groups Projects
Commit 6d67b54e authored by Robert Latta's avatar Robert Latta
Browse files

allowed multiple data sources

parent 5cf7bc3d
Branches
Tags
No related merge requests found
...@@ -6,78 +6,126 @@ import queue ...@@ -6,78 +6,126 @@ import queue
import threading import threading
import logging import logging
HOSTNAME = 'test-dashboard-storage02.geant.org'
INDEX = 0
NUM_THREADS = 20
def load_current_data(data_file):
with open(data_file, 'r'):
return json.load(data_file)
def _redis_client_proc(key_queue, value_queue):
r = redis.StrictRedis(host=HOSTNAME, db=INDEX) def _redis_client_proc(key_queue, value_queue, hostname, db_index):
r = redis.StrictRedis(host=hostname, db=db_index)
while True: while True:
key = key_queue.get() key = key_queue.get()
# contract is that None means no more requests
if not key: if not key:
break break
logging.debug(f'key: {key}')
value_queue.put({ value_queue.put({
'key': key, 'key': key,
'value': r.get(key).decode('utf-8') 'value': r.get(key).decode('utf-8')
}) })
# contract is to return None when finished
value_queue.put(None) value_queue.put(None)
def docs(): def docs(loaders, thread_count):
threads = [] logging.debug('Starting')
response_queue = queue.Queue() for loader in loaders:
logging.debug(f'loader db-index: {loader["db-index"]}')
for _ in range(NUM_THREADS): threads = []
q = queue.Queue() response_queue = queue.Queue()
t = threading.Thread(
target=_redis_client_proc, for _ in range(thread_count):
args=[q, response_queue]) q = queue.Queue()
t.start() t = threading.Thread(
threads.append({'thread': t, 'queue': q}) target=_redis_client_proc,
args=[
r = redis.StrictRedis(host=HOSTNAME, db=INDEX) q,
# scan with bigger batches, to mitigate network latency effects response_queue,
for k in r.scan_iter(count=1000): loader['hostname'],
t = random.choice(threads) loader['db-index']
t['queue'].put(k.decode('utf-8')) ]
)
# tell all threads there are no more keys coming t.start()
for t in threads: threads.append({'thread': t, 'queue': q})
t['queue'].put(None)
r = redis.StrictRedis(host=loader['hostname'], db=loader['db-index'])
num_finished = 0
# read values from response_queue until we receive for pattern in loader['key-patterns']:
# None len(threads) times logging.debug(pattern)
while num_finished < len(threads): for k in r.scan_iter(match=pattern, count=1000):
value = response_queue.get() t = random.choice(threads)
if not value: t['queue'].put(k.decode('utf-8'))
num_finished += 1
logging.info( for t in threads:
'one thread finished ' t['queue'].put(None)
f'({len(threads) - num_finished} left)')
continue num_finished = 0
yield value while num_finished < len(threads):
value = response_queue.get()
# cleanup like we're supposed to, even though it's python if value is None:
for t in threads: num_finished += 1
t['thread'].join(timeout=0.5) # timeout, for sanity logging.info(
'one thread finished '
f'({len(threads) - num_finished} left)')
continue
yield value
for t in threads:
t['thread'].join(timeout=0.5)
if __name__ == '__main__': if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
d = {} conf = {
for item in docs(): 'file': 'test/data/router-info.json',
'delete-all': False,
'thread-count': 20,
'loaders': [
# {
# 'hostname': 'test-dashboard-storage02.geant.org',
# 'db-index': 0,
# 'key-patterns': [
# 'juniper-peerings:*',
# 'netconf-interface-bundles:*',
# 'netconf-interfaces-hosts:*',
# 'netconf-interfaces:*',
# 'netconf:*',
# 'netdash',
# 'snmp-interfaces-single:*',
# 'snmp-interfaces:*',
# 'snmp-peerings:*',
# 'subnets:*'
# ]
# },
# {
# 'hostname': 'test-dashboard-storage02.geant.org',
# 'db-index': 4,
# 'key-patterns': [
# 'ims:*',
# ]
# },
{
'hostname': 'localhost',
'db-index': 0,
'key-patterns': [
'ims:interface_services*',
'ims:circuit_hierarchy*',
]
},
]
}
data_filename = os.path.join(os.path.dirname(__file__), conf['file'])
if conf['delete-all']:
d = {}
else:
with open(data_filename, "r") as f:
d = json.load(f)
i = 0
for item in docs(conf['loaders'], conf['thread-count']):
d[item['key']] = item['value'] d[item['key']] = item['value']
data_filename = os.path.join(os.path.dirname(__file__), "router-info.json")
with open(data_filename, "w") as f: with open(data_filename, "w") as f:
f.write(json.dumps(d)) f.write(json.dumps(d, indent=2))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment