diff --git a/Changelog.md b/Changelog.md index 3942962741008f66708123693715f5b0a62a600a..12700b8264045cc7b6926a9d86cd30f2ce16365b 100644 --- a/Changelog.md +++ b/Changelog.md @@ -2,6 +2,9 @@ All notable changes to this project will be documented in this file. +## [0.46] - 2020-06-05 +- optimization for redis network latency + ## [0.45] - 2020-06-05 - DBOARD3-242: use cached netconf/snmp data when router is unavailable - use celery events rather than status for logging errors & warnings diff --git a/inventory_provider/tasks/monitor.py b/inventory_provider/tasks/monitor.py index 7d7a9951b6a32bd2b85045e39757ec033c137ac8..7d8a1d568419b36655c22eba5f85c2ac9c4109fa 100644 --- a/inventory_provider/tasks/monitor.py +++ b/inventory_provider/tasks/monitor.py @@ -114,18 +114,15 @@ def run(): t['thread'].join() -def clear_joblog(r, keys_read_event=None): +def clear_joblog(r): """ :param r: connection to a redis database - :param keys_read_event: optional event to signal after all keys are read :return: """ rp = r.pipeline() - for key in r.scan_iter('joblog:*'): + # scan with bigger batches, to mitigate network latency effects + for key in r.scan_iter('joblog:*', count=1000): rp.delete(key) - if keys_read_event: - assert isinstance(keys_read_event, threading.Event) # sanity - keys_read_event.set() rp.execute() @@ -193,7 +190,8 @@ def load_task_log(config_params, ignored_keys=[]): threads.append({'thread': t, 'queue': q}) r = get_current_redis(config_params) - for k in r.scan_iter('joblog:*'): + # scan with bigger batches, to mitigate network latency effects + for k in r.scan_iter('joblog:*', count=1000): k = k.decode('utf-8') if k in ignored_keys: logger.debug('ignoring key: {k}') diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index c69c9642912928ff9876cb4ca5506aa0de578a6c..dd1689d934ad5a39695e5ee5b509ff9716fab12a 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -1,7 +1,6 @@ import json import logging import os -import threading import time from redis.exceptions import RedisError @@ -146,7 +145,8 @@ def update_interfaces_to_services(self): r = get_next_redis(InventoryTask.config) rp = r.pipeline() - for key in r.scan_iter('opsdb:interface_services:*'): + # scan with bigger batches, to mitigate network latency effects + for key in r.scan_iter('opsdb:interface_services:*', count=1000): rp.delete(key) rp.execute() @@ -207,7 +207,8 @@ def update_access_services(self): r = get_next_redis(InventoryTask.config) rp = r.pipeline() - for key in r.scan_iter('opsdb:access_services:*'): + # scan with bigger batches, to mitigate network latency effects + for key in r.scan_iter('opsdb:access_services:*', count=1000): rp.delete(key) rp.execute() @@ -225,7 +226,8 @@ def update_lg_routers(self): r = get_next_redis(InventoryTask.config) rp = r.pipeline() - for k in r.scan_iter('opsdb:lg:*'): + # scan with bigger batches, to mitigate network latency effects + for k in r.scan_iter('opsdb:lg:*', count=1000): rp.delete(k) rp.execute() @@ -241,7 +243,8 @@ def update_lg_routers(self): def update_equipment_locations(self): r = get_next_redis(InventoryTask.config) rp = r.pipeline() - for k in r.scan_iter('opsdb:location:*'): + # scan with bigger batches, to mitigate network latency effects + for k in r.scan_iter('opsdb:location:*', count=1000): rp.delete(k) rp.execute() @@ -271,9 +274,10 @@ def update_circuit_hierarchy(self): r = get_next_redis(InventoryTask.config) rp = r.pipeline() - for key in r.scan_iter('opsdb:services:parents:*'): + # scan with bigger batches, to mitigate network latency effects + for key in r.scan_iter('opsdb:services:parents:*', count=1000): rp.delete(key) - for key in r.scan_iter('opsdb:services:children:*'): + for key in r.scan_iter('opsdb:services:children:*', count=1000): rp.delete(key) rp.execute() @@ -291,7 +295,8 @@ def update_geant_lambdas(self): r = get_next_redis(InventoryTask.config) rp = r.pipeline() - for key in r.scan_iter('opsdb:geant_lambdas:*'): + # scan with bigger batches, to mitigate network latency effects + for key in r.scan_iter('opsdb:geant_lambdas:*', count=1000): rp.delete(key) rp.execute() @@ -425,9 +430,11 @@ def refresh_juniper_interface_list(hostname, netconf): r = get_next_redis(InventoryTask.config) rp = r.pipeline() - for k in r.scan_iter('netconf-interfaces:%s:*' % hostname): + # scan with bigger batches, to mitigate network latency effects + for k in r.scan_iter('netconf-interfaces:%s:*' % hostname, count=1000): rp.delete(k) - for k in r.keys('netconf-interface-bundles:%s:*' % hostname): + for k in r.scan_iter( + 'netconf-interface-bundles:%s:*' % hostname, count=1000): rp.delete(k) rp.execute() @@ -555,18 +562,7 @@ def launch_refresh_cache_all(config): _erase_next_db(config) update_latch_status(config, pending=True) - # call monitor.clear_joblog in a thread, since - # deletion might be slow and can be done in parallel - def _clear_log_proc(wait_event): - monitor.clear_joblog(get_current_redis(config), wait_event) - - keys_captured_event = threading.Event() - threading.Thread( - target=_clear_log_proc, - args=[keys_captured_event]).start() - if not keys_captured_event.wait(timeout=60.0): - # wait a reasonable time - logging.error('timed out waiting for log keys to be read') + monitor.clear_joblog(get_current_redis(config)) # first batch of subtasks: refresh cached opsdb data subtasks = [ @@ -695,7 +691,8 @@ def _build_subnet_db(update_callback=lambda s: None): update_callback('loading all network addresses') subnets = {} - for k in r.scan_iter('reverse_interface_addresses:*'): + # scan with bigger batches, to mitigate network latency effects + for k in r.scan_iter('reverse_interface_addresses:*', count=1000): info = r.get(k.decode('utf-8')).decode('utf-8') info = json.loads(info) entry = subnets.setdefault(info['interface address'], []) diff --git a/setup.py b/setup.py index 58be9cbf2bb693d9f1c731df938b11afe8a2c3fb..a975d4f998a7526e50b4e1d6c55d0016448bc55e 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup, find_packages setup( name='inventory-provider', - version="0.45", + version="0.46", author='GEANT', author_email='swd@geant.org', description='Dashboard inventory provider', diff --git a/test/conftest.py b/test/conftest.py index a4f05322340f3610f70da983beb5cc4857d2e08c..8f51d49a98bae3a3f127ee0aeb1e3622684a9ee8 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -107,7 +107,7 @@ class MockedRedis(object): key = key.decode('utf-8') del MockedRedis.db[key] - def scan_iter(self, glob=None): + def scan_iter(self, glob=None, count='unused'): if not glob: for k in list(MockedRedis.db.keys()): yield k.encode('utf-8')