Skip to content
Snippets Groups Projects
Commit 9400c5ff authored by Release Webservice's avatar Release Webservice
Browse files

Finished release 0.46.

parents ea87c70d 060314ea
No related branches found
No related tags found
No related merge requests found
...@@ -2,6 +2,9 @@ ...@@ -2,6 +2,9 @@
All notable changes to this project will be documented in this file. All notable changes to this project will be documented in this file.
## [0.46] - 2020-06-05
- optimization for redis network latency
## [0.45] - 2020-06-05 ## [0.45] - 2020-06-05
- DBOARD3-242: use cached netconf/snmp data when router is unavailable - DBOARD3-242: use cached netconf/snmp data when router is unavailable
- use celery events rather than status for logging errors & warnings - use celery events rather than status for logging errors & warnings
......
...@@ -114,18 +114,15 @@ def run(): ...@@ -114,18 +114,15 @@ def run():
t['thread'].join() t['thread'].join()
def clear_joblog(r, keys_read_event=None): def clear_joblog(r):
""" """
:param r: connection to a redis database :param r: connection to a redis database
:param keys_read_event: optional event to signal after all keys are read
:return: :return:
""" """
rp = r.pipeline() rp = r.pipeline()
for key in r.scan_iter('joblog:*'): # scan with bigger batches, to mitigate network latency effects
for key in r.scan_iter('joblog:*', count=1000):
rp.delete(key) rp.delete(key)
if keys_read_event:
assert isinstance(keys_read_event, threading.Event) # sanity
keys_read_event.set()
rp.execute() rp.execute()
...@@ -193,7 +190,8 @@ def load_task_log(config_params, ignored_keys=[]): ...@@ -193,7 +190,8 @@ def load_task_log(config_params, ignored_keys=[]):
threads.append({'thread': t, 'queue': q}) threads.append({'thread': t, 'queue': q})
r = get_current_redis(config_params) r = get_current_redis(config_params)
for k in r.scan_iter('joblog:*'): # scan with bigger batches, to mitigate network latency effects
for k in r.scan_iter('joblog:*', count=1000):
k = k.decode('utf-8') k = k.decode('utf-8')
if k in ignored_keys: if k in ignored_keys:
logger.debug('ignoring key: {k}') logger.debug('ignoring key: {k}')
......
import json import json
import logging import logging
import os import os
import threading
import time import time
from redis.exceptions import RedisError from redis.exceptions import RedisError
...@@ -146,7 +145,8 @@ def update_interfaces_to_services(self): ...@@ -146,7 +145,8 @@ def update_interfaces_to_services(self):
r = get_next_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
rp = r.pipeline() rp = r.pipeline()
for key in r.scan_iter('opsdb:interface_services:*'): # scan with bigger batches, to mitigate network latency effects
for key in r.scan_iter('opsdb:interface_services:*', count=1000):
rp.delete(key) rp.delete(key)
rp.execute() rp.execute()
...@@ -207,7 +207,8 @@ def update_access_services(self): ...@@ -207,7 +207,8 @@ def update_access_services(self):
r = get_next_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
rp = r.pipeline() rp = r.pipeline()
for key in r.scan_iter('opsdb:access_services:*'): # scan with bigger batches, to mitigate network latency effects
for key in r.scan_iter('opsdb:access_services:*', count=1000):
rp.delete(key) rp.delete(key)
rp.execute() rp.execute()
...@@ -225,7 +226,8 @@ def update_lg_routers(self): ...@@ -225,7 +226,8 @@ def update_lg_routers(self):
r = get_next_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
rp = r.pipeline() rp = r.pipeline()
for k in r.scan_iter('opsdb:lg:*'): # scan with bigger batches, to mitigate network latency effects
for k in r.scan_iter('opsdb:lg:*', count=1000):
rp.delete(k) rp.delete(k)
rp.execute() rp.execute()
...@@ -241,7 +243,8 @@ def update_lg_routers(self): ...@@ -241,7 +243,8 @@ def update_lg_routers(self):
def update_equipment_locations(self): def update_equipment_locations(self):
r = get_next_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
rp = r.pipeline() rp = r.pipeline()
for k in r.scan_iter('opsdb:location:*'): # scan with bigger batches, to mitigate network latency effects
for k in r.scan_iter('opsdb:location:*', count=1000):
rp.delete(k) rp.delete(k)
rp.execute() rp.execute()
...@@ -271,9 +274,10 @@ def update_circuit_hierarchy(self): ...@@ -271,9 +274,10 @@ def update_circuit_hierarchy(self):
r = get_next_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
rp = r.pipeline() rp = r.pipeline()
for key in r.scan_iter('opsdb:services:parents:*'): # scan with bigger batches, to mitigate network latency effects
for key in r.scan_iter('opsdb:services:parents:*', count=1000):
rp.delete(key) rp.delete(key)
for key in r.scan_iter('opsdb:services:children:*'): for key in r.scan_iter('opsdb:services:children:*', count=1000):
rp.delete(key) rp.delete(key)
rp.execute() rp.execute()
...@@ -291,7 +295,8 @@ def update_geant_lambdas(self): ...@@ -291,7 +295,8 @@ def update_geant_lambdas(self):
r = get_next_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
rp = r.pipeline() rp = r.pipeline()
for key in r.scan_iter('opsdb:geant_lambdas:*'): # scan with bigger batches, to mitigate network latency effects
for key in r.scan_iter('opsdb:geant_lambdas:*', count=1000):
rp.delete(key) rp.delete(key)
rp.execute() rp.execute()
...@@ -425,9 +430,11 @@ def refresh_juniper_interface_list(hostname, netconf): ...@@ -425,9 +430,11 @@ def refresh_juniper_interface_list(hostname, netconf):
r = get_next_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
rp = r.pipeline() rp = r.pipeline()
for k in r.scan_iter('netconf-interfaces:%s:*' % hostname): # scan with bigger batches, to mitigate network latency effects
for k in r.scan_iter('netconf-interfaces:%s:*' % hostname, count=1000):
rp.delete(k) rp.delete(k)
for k in r.keys('netconf-interface-bundles:%s:*' % hostname): for k in r.scan_iter(
'netconf-interface-bundles:%s:*' % hostname, count=1000):
rp.delete(k) rp.delete(k)
rp.execute() rp.execute()
...@@ -555,18 +562,7 @@ def launch_refresh_cache_all(config): ...@@ -555,18 +562,7 @@ def launch_refresh_cache_all(config):
_erase_next_db(config) _erase_next_db(config)
update_latch_status(config, pending=True) update_latch_status(config, pending=True)
# call monitor.clear_joblog in a thread, since monitor.clear_joblog(get_current_redis(config))
# deletion might be slow and can be done in parallel
def _clear_log_proc(wait_event):
monitor.clear_joblog(get_current_redis(config), wait_event)
keys_captured_event = threading.Event()
threading.Thread(
target=_clear_log_proc,
args=[keys_captured_event]).start()
if not keys_captured_event.wait(timeout=60.0):
# wait a reasonable time
logging.error('timed out waiting for log keys to be read')
# first batch of subtasks: refresh cached opsdb data # first batch of subtasks: refresh cached opsdb data
subtasks = [ subtasks = [
...@@ -695,7 +691,8 @@ def _build_subnet_db(update_callback=lambda s: None): ...@@ -695,7 +691,8 @@ def _build_subnet_db(update_callback=lambda s: None):
update_callback('loading all network addresses') update_callback('loading all network addresses')
subnets = {} subnets = {}
for k in r.scan_iter('reverse_interface_addresses:*'): # scan with bigger batches, to mitigate network latency effects
for k in r.scan_iter('reverse_interface_addresses:*', count=1000):
info = r.get(k.decode('utf-8')).decode('utf-8') info = r.get(k.decode('utf-8')).decode('utf-8')
info = json.loads(info) info = json.loads(info)
entry = subnets.setdefault(info['interface address'], []) entry = subnets.setdefault(info['interface address'], [])
......
...@@ -2,7 +2,7 @@ from setuptools import setup, find_packages ...@@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup( setup(
name='inventory-provider', name='inventory-provider',
version="0.45", version="0.46",
author='GEANT', author='GEANT',
author_email='swd@geant.org', author_email='swd@geant.org',
description='Dashboard inventory provider', description='Dashboard inventory provider',
......
...@@ -107,7 +107,7 @@ class MockedRedis(object): ...@@ -107,7 +107,7 @@ class MockedRedis(object):
key = key.decode('utf-8') key = key.decode('utf-8')
del MockedRedis.db[key] del MockedRedis.db[key]
def scan_iter(self, glob=None): def scan_iter(self, glob=None, count='unused'):
if not glob: if not glob:
for k in list(MockedRedis.db.keys()): for k in list(MockedRedis.db.keys()):
yield k.encode('utf-8') yield k.encode('utf-8')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment