Skip to content
Snippets Groups Projects
Commit 34e9153d authored by Erik Reid's avatar Erik Reid
Browse files

added snmp_refresh_peerings

parent 64398e3d
No related branches found
No related tags found
No related merge requests found
...@@ -85,13 +85,45 @@ class InventoryTask(Task): ...@@ -85,13 +85,45 @@ class InventoryTask(Task):
self.send_event('task-error', message=message) self.send_event('task-error', message=message)
@app.task(base=InventoryTask, bind=True, name='snmp_refresh_peerings')
@log_task_entry_and_exit
def snmp_refresh_peerings(self, hostname, community):
try:
peerings = list(snmp.get_peer_state_info(hostname, community))
except ConnectionError:
msg = f'error loading snmp peering data from {hostname}'
logger.exception(msg)
self.log_warning(msg)
r = get_current_redis(InventoryTask.config)
peerings = r.get(f'snmp-peerings:{hostname}')
if peerings is None:
raise InventoryTaskError(
f'snmp error with {peerings}'
f' and no cached peering data found')
# unnecessary json encode/decode here ... could be optimized
peerings = json.loads(peerings.decode('utf-8'))
self.log_warning(f'using cached snmp peering data for {hostname}')
r = get_next_redis(InventoryTask.config)
rp = r.pipeline()
rp.set(f'snmp-peerings:{hostname}', json.dumps(peerings))
for session in peerings:
rp.set(f'snmp-peerings:{session["remote"]}', json.dumps(session))
rp.execute()
self.log_info(f'snmp peering info loaded from {hostname}')
@app.task(base=InventoryTask, bind=True, name='snmp_refresh_interfaces') @app.task(base=InventoryTask, bind=True, name='snmp_refresh_interfaces')
@log_task_entry_and_exit @log_task_entry_and_exit
def snmp_refresh_interfaces(self, hostname, community): def snmp_refresh_interfaces(self, hostname, community):
try: try:
interfaces = list(snmp.get_router_snmp_indexes(hostname, community)) interfaces = list(snmp.get_router_snmp_indexes(hostname, community))
except ConnectionError: except ConnectionError:
msg = f'error loading snmp data from {hostname}' msg = f'error loading snmp interface data from {hostname}'
logger.exception(msg) logger.exception(msg)
self.log_warning(msg) self.log_warning(msg)
r = get_current_redis(InventoryTask.config) r = get_current_redis(InventoryTask.config)
...@@ -99,10 +131,10 @@ def snmp_refresh_interfaces(self, hostname, community): ...@@ -99,10 +131,10 @@ def snmp_refresh_interfaces(self, hostname, community):
if not interfaces: if not interfaces:
raise InventoryTaskError( raise InventoryTaskError(
f'snmp error with {hostname}' f'snmp error with {hostname}'
f' and no cached netconf data found') f' and no cached snmp interface data found')
# unnecessary json encode/decode here ... could be optimized # unnecessary json encode/decode here ... could be optimized
interfaces = json.loads(interfaces.decode('utf-8')) interfaces = json.loads(interfaces.decode('utf-8'))
self.log_warning(f'using cached snmp data for {hostname}') self.log_warning(f'using cached snmp interface data for {hostname}')
r = get_next_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
...@@ -120,7 +152,7 @@ def snmp_refresh_interfaces(self, hostname, community): ...@@ -120,7 +152,7 @@ def snmp_refresh_interfaces(self, hostname, community):
rp.execute() rp.execute()
self.log_info(f'snmp info loaded from {hostname}') self.log_info(f'snmp interface info loaded from {hostname}')
@app.task(base=InventoryTask, bind=True, name='netconf_refresh_config') @app.task(base=InventoryTask, bind=True, name='netconf_refresh_config')
...@@ -540,6 +572,7 @@ def reload_router_config(self, hostname): ...@@ -540,6 +572,7 @@ def reload_router_config(self, hostname):
self.log_info(f'refreshing snmp interface indexes for {hostname}') self.log_info(f'refreshing snmp interface indexes for {hostname}')
# load snmp data, in this thread # load snmp data, in this thread
snmp_refresh_interfaces.apply(args=[hostname, community]) snmp_refresh_interfaces.apply(args=[hostname, community])
snmp_refresh_peerings.apply(args=[hostname, community])
clear_cached_classifier_responses(None) clear_cached_classifier_responses(None)
self.log_info(f'updated configuration for {hostname}') self.log_info(f'updated configuration for {hostname}')
......
import json import json
import os import os
import redis import redis
import random
import queue
import threading
import logging
HOSTNAME = 'test-dashboard-storage02.geant.org'
INDEX = 0
NUM_THREADS = 20
r = redis.StrictRedis(host="test-dashboard-storage02.geant.org")
d = {} def _redis_client_proc(key_queue, value_queue):
for k in r.keys(): r = redis.StrictRedis(host=HOSTNAME, db=INDEX)
print(k) while True:
d[k.decode('utf-8')] = r.get(k).decode('utf-8') key = key_queue.get()
data_filename = os.path.join(os.path.dirname(__file__), "router-info.json") # contract is that None means no more requests
if not key:
break
with open(data_filename, "w") as f: logging.debug(f'key: {key}')
f.write(json.dumps(d)) value_queue.put({
'key': key,
'value': r.get(key).decode('utf-8')
})
# contract is to return None when finished
value_queue.put(None)
def docs():
threads = []
response_queue = queue.Queue()
for _ in range(NUM_THREADS):
q = queue.Queue()
t = threading.Thread(
target=_redis_client_proc,
args=[q, response_queue])
t.start()
threads.append({'thread': t, 'queue': q})
r = redis.StrictRedis(host=HOSTNAME, db=INDEX)
# scan with bigger batches, to mitigate network latency effects
for k in r.scan_iter(count=1000):
t = random.choice(threads)
t['queue'].put(k.decode('utf-8'))
# tell all threads there are no more keys coming
for t in threads:
t['queue'].put(None)
num_finished = 0
# read values from response_queue until we receive
# None len(threads) times
while num_finished < len(threads):
value = response_queue.get()
if not value:
num_finished += 1
logging.info(
'one thread finished '
f'({len(threads) - num_finished} left)')
continue
yield value
# cleanup like we're supposed to, even though it's python
for t in threads:
t['thread'].join(timeout=0.5) # timeout, for sanity
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
d = {}
for item in docs():
d[item['key']] = item['value']
data_filename = os.path.join(os.path.dirname(__file__), "router-info.json")
with open(data_filename, "w") as f:
f.write(json.dumps(d))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment