From 34e9153da7fb0e22bdaaa9ff90858713ae330967 Mon Sep 17 00:00:00 2001 From: Erik Reid <erik.reid@geant.org> Date: Fri, 15 Jan 2021 15:14:54 +0100 Subject: [PATCH] added snmp_refresh_peerings --- inventory_provider/tasks/worker.py | 41 +++++++++++++-- test/data/update-test-db.py | 83 +++++++++++++++++++++++++++--- 2 files changed, 112 insertions(+), 12 deletions(-) diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index 1c4b16fd..c263c416 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -85,13 +85,45 @@ class InventoryTask(Task): self.send_event('task-error', message=message) +@app.task(base=InventoryTask, bind=True, name='snmp_refresh_peerings') +@log_task_entry_and_exit +def snmp_refresh_peerings(self, hostname, community): + try: + peerings = list(snmp.get_peer_state_info(hostname, community)) + except ConnectionError: + msg = f'error loading snmp peering data from {hostname}' + logger.exception(msg) + self.log_warning(msg) + r = get_current_redis(InventoryTask.config) + peerings = r.get(f'snmp-peerings:{hostname}') + if peerings is None: + raise InventoryTaskError( + f'snmp error with {peerings}' + f' and no cached peering data found') + # unnecessary json encode/decode here ... could be optimized + peerings = json.loads(peerings.decode('utf-8')) + self.log_warning(f'using cached snmp peering data for {hostname}') + + r = get_next_redis(InventoryTask.config) + + rp = r.pipeline() + rp.set(f'snmp-peerings:{hostname}', json.dumps(peerings)) + + for session in peerings: + rp.set(f'snmp-peerings:{session["remote"]}', json.dumps(session)) + + rp.execute() + + self.log_info(f'snmp peering info loaded from {hostname}') + + @app.task(base=InventoryTask, bind=True, name='snmp_refresh_interfaces') @log_task_entry_and_exit def snmp_refresh_interfaces(self, hostname, community): try: interfaces = list(snmp.get_router_snmp_indexes(hostname, community)) except ConnectionError: - msg = f'error loading snmp data from {hostname}' + msg = f'error loading snmp interface data from {hostname}' logger.exception(msg) self.log_warning(msg) r = get_current_redis(InventoryTask.config) @@ -99,10 +131,10 @@ def snmp_refresh_interfaces(self, hostname, community): if not interfaces: raise InventoryTaskError( f'snmp error with {hostname}' - f' and no cached netconf data found') + f' and no cached snmp interface data found') # unnecessary json encode/decode here ... could be optimized interfaces = json.loads(interfaces.decode('utf-8')) - self.log_warning(f'using cached snmp data for {hostname}') + self.log_warning(f'using cached snmp interface data for {hostname}') r = get_next_redis(InventoryTask.config) @@ -120,7 +152,7 @@ def snmp_refresh_interfaces(self, hostname, community): rp.execute() - self.log_info(f'snmp info loaded from {hostname}') + self.log_info(f'snmp interface info loaded from {hostname}') @app.task(base=InventoryTask, bind=True, name='netconf_refresh_config') @@ -540,6 +572,7 @@ def reload_router_config(self, hostname): self.log_info(f'refreshing snmp interface indexes for {hostname}') # load snmp data, in this thread snmp_refresh_interfaces.apply(args=[hostname, community]) + snmp_refresh_peerings.apply(args=[hostname, community]) clear_cached_classifier_responses(None) self.log_info(f'updated configuration for {hostname}') diff --git a/test/data/update-test-db.py b/test/data/update-test-db.py index 915d5c5c..1fc1017e 100644 --- a/test/data/update-test-db.py +++ b/test/data/update-test-db.py @@ -1,16 +1,83 @@ import json import os import redis +import random +import queue +import threading +import logging +HOSTNAME = 'test-dashboard-storage02.geant.org' +INDEX = 0 +NUM_THREADS = 20 -r = redis.StrictRedis(host="test-dashboard-storage02.geant.org") -d = {} -for k in r.keys(): - print(k) - d[k.decode('utf-8')] = r.get(k).decode('utf-8') +def _redis_client_proc(key_queue, value_queue): + r = redis.StrictRedis(host=HOSTNAME, db=INDEX) + while True: + key = key_queue.get() -data_filename = os.path.join(os.path.dirname(__file__), "router-info.json") + # contract is that None means no more requests + if not key: + break -with open(data_filename, "w") as f: - f.write(json.dumps(d)) + logging.debug(f'key: {key}') + value_queue.put({ + 'key': key, + 'value': r.get(key).decode('utf-8') + }) + + # contract is to return None when finished + value_queue.put(None) + + +def docs(): + threads = [] + response_queue = queue.Queue() + + for _ in range(NUM_THREADS): + q = queue.Queue() + t = threading.Thread( + target=_redis_client_proc, + args=[q, response_queue]) + t.start() + threads.append({'thread': t, 'queue': q}) + + r = redis.StrictRedis(host=HOSTNAME, db=INDEX) + # scan with bigger batches, to mitigate network latency effects + for k in r.scan_iter(count=1000): + t = random.choice(threads) + t['queue'].put(k.decode('utf-8')) + + # tell all threads there are no more keys coming + for t in threads: + t['queue'].put(None) + + num_finished = 0 + # read values from response_queue until we receive + # None len(threads) times + while num_finished < len(threads): + value = response_queue.get() + if not value: + num_finished += 1 + logging.info( + 'one thread finished ' + f'({len(threads) - num_finished} left)') + continue + yield value + + # cleanup like we're supposed to, even though it's python + for t in threads: + t['thread'].join(timeout=0.5) # timeout, for sanity + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + + d = {} + for item in docs(): + d[item['key']] = item['value'] + + data_filename = os.path.join(os.path.dirname(__file__), "router-info.json") + + with open(data_filename, "w") as f: + f.write(json.dumps(d)) -- GitLab