Skip to content
Snippets Groups Projects
Commit baf5dadc authored by Erik Reid's avatar Erik Reid
Browse files

refactored worker module, added more redis pipelining

parent 8e5ffe6e
No related branches found
No related tags found
No related merge requests found
import json
import logging
import re
from inventory_provider.db import opsdb, db
from inventory_provider.tasks.common import get_next_redis
logger = logging.getLogger(__name__)
def build_service_interface_user_list(config):
def _interfaces():
"""
yields interface info from netconf
:return:
"""
r = get_next_redis(config)
for k in r.scan_iter('netconf-interfaces:*'):
k = k.decode('utf-8')
(_, router_name, ifc_name) = k.split(':')
info = r.get(k).decode('utf-8')
info = json.loads(info)
assert ifc_name == info['name']
yield {
'router': router_name,
'interface': info['name'],
'description': info['description']
}
def _lookup_interface_services(wanted_interfaces):
"""
yields interface info from opsdb (with service id)
... only interfaces in wanted_interfaces
:param wanted_interfaces:
:return:
"""
r = get_next_redis(config)
for k in r.scan_iter('opsdb:interface_services:*'):
k = k.decode('utf-8')
fields = k.split(':')
if len(fields) < 4:
# there are some strange records
# e.g. TS1.*, ts1.*, dp1.*, dtn*, ...
continue
router = fields[2]
ifc_name = fields[3]
router_interface_key = f'{router}:{ifc_name}'
if router_interface_key not in wanted_interfaces:
continue
info = r.get(k).decode('utf-8')
info = json.loads(info)
yield {
'router': router,
'interface': ifc_name,
'service_ids': set([service['id'] for service in info])
}
# dict: 'router:interface' -> {'router', 'interface', 'description'}
netconf_interface_map = dict([
(f'{i["router"]}:{i["interface"]}', i) for i in _interfaces()])
# dict: 'router:interface' -> {'router', 'interface', set([service_ids])}
opsdb_interface_map = dict([
(f'{i["router"]}:{i["interface"]}', i)
for i in _lookup_interface_services(netconf_interface_map.keys())])
all_service_ids = set()
for r in opsdb_interface_map.values():
all_service_ids |= r['service_ids']
all_service_ids = list(all_service_ids)
# dict: service_id[int] -> [list of users]
service_user_map = dict()
with db.connection(config["ops-db"]) as cx:
# for user in opsdb.get_service_users(cx, list(all_service_ids)):
service_users = list(opsdb.get_service_users(cx, all_service_ids))
for user in service_users:
service_user_map.setdefault(
user['service_id'], []).append(user['user'])
def _users(ifc_key):
"""
ifc = 'router:ifc_name'
:param ifc:
:return: list of users
"""
users = set()
if ifc_key not in opsdb_interface_map:
return []
service_id_list = opsdb_interface_map[ifc_key].get('service_ids', [])
for service_id in service_id_list:
users |= set(service_user_map.get(service_id, []))
return list(users)
for k, v in netconf_interface_map.items():
v['users'] = _users(k)
yield v
def derive_router_hostnames(config):
r = get_next_redis(config)
junosspace_equipment = set()
for k in r.keys('junosspace:*'):
m = re.match('^junosspace:(.*)$', k.decode('utf-8'))
assert m
junosspace_equipment.add(m.group(1))
opsdb_equipment = set()
for k in r.scan_iter('opsdb:interface_services:*'):
m = re.match(
'opsdb:interface_services:([^:]+):.*$',
k.decode('utf-8'))
if m:
opsdb_equipment.add(m.group(1))
else:
logger.info("Unable to derive router name from %s" %
k.decode('utf-8'))
return junosspace_equipment & opsdb_equipment
......@@ -14,6 +14,7 @@ import jsonschema
from inventory_provider.tasks.app import app
from inventory_provider.tasks.common \
import get_next_redis, latch_db, get_latch, set_latch, update_latch_status
from inventory_provider.tasks import data
from inventory_provider import config
from inventory_provider import environment
from inventory_provider.db import db, opsdb
......@@ -105,8 +106,11 @@ def update_interfaces_to_services(self):
interface_services[equipment_interface].append(service)
r = get_next_redis(InventoryTask.config)
rp = r.pipeline()
for key in r.scan_iter('opsdb:interface_services:*'):
r.delete(key)
rp.delete(key)
rp.execute()
rp = r.pipeline()
for equipment_interface, services in interface_services.items():
rp.set(
......@@ -127,8 +131,8 @@ def import_unmanaged_interfaces(self):
return {
'name': d['address'],
'interface address': d['network'],
'interface name': d['interface'],
'router': d['router']
'interface name': d['interface'].lower(),
'router': d['router'].lower()
}
interfaces = [
......@@ -167,8 +171,11 @@ def update_access_services(self):
access_services[service['name']] = service
r = get_next_redis(InventoryTask.config)
rp = r.pipeline()
for key in r.scan_iter('opsdb:access_services:*'):
r.delete(key)
rp.delete(key)
rp.execute()
rp = r.pipeline()
for name, service in access_services.items():
rp.set(
......@@ -184,12 +191,16 @@ def update_lg_routers(self):
logger.debug('>>> update_lg_routers')
r = get_next_redis(InventoryTask.config)
rp = r.pipeline()
for k in r.scan_iter('opsdb:lg:*'):
r.delete(k)
rp.delete(k)
rp.execute()
with db.connection(InventoryTask.config["ops-db"]) as cx:
rp = r.pipeline()
for router in opsdb.lookup_lg_routers(cx):
r.set(f'opsdb:lg:{router["equipment name"]}', json.dumps(router))
rp.set(f'opsdb:lg:{router["equipment name"]}', json.dumps(router))
rp.execute()
logger.debug('<<< update_lg_routers')
......@@ -199,15 +210,19 @@ def update_equipment_locations(self):
logger.debug('>>> update_equipment_locations')
r = get_next_redis(InventoryTask.config)
rp = r.pipeline()
for k in r.scan_iter('opsdb:location:*'):
r.delete(k)
rp.delete(k)
rp.execute()
with db.connection(InventoryTask.config["ops-db"]) as cx:
for h in _derive_router_hostnames(InventoryTask.config):
rp = r.pipeline()
for h in data.derive_router_hostnames(InventoryTask.config):
# lookup_pop_info returns a list of locations
# (there can sometimes be more than one match)
locations = list(opsdb.lookup_pop_info(cx, h))
r.set('opsdb:location:%s' % h, json.dumps(locations))
rp.set('opsdb:location:%s' % h, json.dumps(locations))
rp.execute()
logger.debug('<<< update_equipment_locations')
......@@ -227,10 +242,12 @@ def update_circuit_hierarchy(self):
child_to_parents[child_id].append(relation)
r = get_next_redis(InventoryTask.config)
rp = r.pipeline()
for key in r.scan_iter('opsdb:services:parents:*'):
r.delete(key)
rp.delete(key)
for key in r.scan_iter('opsdb:services:children:*'):
r.delete(key)
rp.delete(key)
rp.execute()
rp = r.pipeline()
for cid, parents in parent_to_children.items():
......@@ -247,8 +264,11 @@ def update_geant_lambdas(self):
logger.debug('>>> update_geant_lambdas')
r = get_next_redis(InventoryTask.config)
rp = r.pipeline()
for key in r.scan_iter('opsdb:geant_lambdas:*'):
r.delete(key)
rp.delete(key)
rp.execute()
with db.connection(InventoryTask.config["ops-db"]) as cx:
rp = r.pipeline()
for ld in opsdb.get_geant_lambdas(cx):
......@@ -285,8 +305,11 @@ def update_junosspace_device_list(self):
'message': 'found %d routers, saving details' % len(routers)
})
rp = r.pipeline()
for k in r.scan_iter('junosspace:*'):
r.delete(k)
rp.delete(k)
rp.execute()
rp = r.pipeline()
for k, v in routers.items():
rp.set(k, v)
......@@ -342,8 +365,10 @@ def clear_cached_classifier_responses(hostname=None):
return r.keys('classifier-cache:*')
keys_to_delete = _hostname_keys() if hostname else _all_keys()
rp = r.pipeline()
for k in keys_to_delete:
r.delete(k)
rp.delete(k)
rp.execute()
def _refresh_peers(hostname, key_base, peers):
......@@ -398,10 +423,12 @@ def refresh_juniper_interface_list(hostname, netconf):
'removing cached netconf-interfaces for %r' % hostname)
r = get_next_redis(InventoryTask.config)
rp = r.pipeline()
for k in r.scan_iter('netconf-interfaces:%s:*' % hostname):
r.delete(k)
rp.delete(k)
for k in r.keys('netconf-interface-bundles:%s:*' % hostname):
r.delete(k)
rp.delete(k)
rp.execute()
all_bundles = defaultdict(list)
......@@ -503,27 +530,6 @@ def reload_router_config(self, hostname):
}
def _derive_router_hostnames(config):
r = get_next_redis(config)
junosspace_equipment = set()
for k in r.keys('junosspace:*'):
m = re.match('^junosspace:(.*)$', k.decode('utf-8'))
assert m
junosspace_equipment.add(m.group(1))
opsdb_equipment = set()
for k in r.scan_iter('opsdb:interface_services:*'):
m = re.match(
'opsdb:interface_services:([^:]+):.*$',
k.decode('utf-8'))
if m:
opsdb_equipment.add(m.group(1))
else:
logger.info("Unable to derive router name from %s" %
k.decode('utf-8'))
return junosspace_equipment & opsdb_equipment
def _erase_next_db(config):
"""
flush next db, but first save latch and then restore afterwards
......@@ -570,7 +576,7 @@ def launch_refresh_cache_all(config):
update_access_services.apply_async(),
import_unmanaged_interfaces.apply_async()
]
for hostname in _derive_router_hostnames(config):
for hostname in data.derive_router_hostnames(config):
logger.debug('queueing router refresh jobs for %r' % hostname)
subtasks.append(reload_router_config.apply_async(args=[hostname]))
......@@ -654,101 +660,6 @@ def refresh_finalizer(self, pending_task_ids_json):
logger.debug('<<< refresh_finalizer')
def _build_service_interface_user_list():
def _interfaces():
"""
yields interface info from netconf
:return:
"""
r = get_next_redis(InventoryTask.config)
for k in r.scan_iter('netconf-interfaces:*'):
k = k.decode('utf-8')
(_, router_name, ifc_name) = k.split(':')
info = r.get(k).decode('utf-8')
info = json.loads(info)
assert ifc_name == info['name']
yield {
'router': router_name,
'interface': info['name'],
'description': info['description']
}
def _lookup_interface_services(wanted_interfaces):
"""
yields interface info from opsdb (with service id)
... only interfaces in wanted_interfaces
:param wanted_interfaces:
:return:
"""
r = get_next_redis(InventoryTask.config)
for k in r.scan_iter('opsdb:interface_services:*'):
k = k.decode('utf-8')
fields = k.split(':')
if len(fields) < 4:
# there are some strange records
# e.g. TS1.*, ts1.*, dp1.*, dtn*, ...
continue
router = fields[2]
ifc_name = fields[3]
router_interface_key = f'{router}:{ifc_name}'
if router_interface_key not in wanted_interfaces:
continue
info = r.get(k).decode('utf-8')
info = json.loads(info)
yield {
'router': router,
'interface': ifc_name,
'service_ids': set([service['id'] for service in info])
}
# dict: 'router:interface' -> {'router', 'interface', 'description'}
netconf_interface_map = dict([
(f'{i["router"]}:{i["interface"]}', i) for i in _interfaces()])
# dict: 'router:interface' -> {'router', 'interface', set([service_ids])}
opsdb_interface_map = dict([
(f'{i["router"]}:{i["interface"]}', i)
for i in _lookup_interface_services(netconf_interface_map.keys())])
all_service_ids = set()
for r in opsdb_interface_map.values():
all_service_ids |= r['service_ids']
all_service_ids = list(all_service_ids)
# dict: service_id[int] -> [list of users]
service_user_map = dict()
with db.connection(InventoryTask.config["ops-db"]) as cx:
# for user in opsdb.get_service_users(cx, list(all_service_ids)):
service_users = list(opsdb.get_service_users(cx, all_service_ids))
for user in service_users:
service_user_map.setdefault(
user['service_id'], []).append(user['user'])
def _users(ifc_key):
"""
ifc = 'router:ifc_name'
:param ifc:
:return: list of users
"""
users = set()
if ifc_key not in opsdb_interface_map:
return []
service_id_list = opsdb_interface_map[ifc_key].get('service_ids', [])
for service_id in service_id_list:
users |= set(service_user_map.get(service_id, []))
return list(users)
for k, v in netconf_interface_map.items():
v['users'] = _users(k)
yield v
def _build_service_category_interface_list(update_callback=lambda s: None):
logger.debug('>>> _build_interface_services')
......@@ -760,7 +671,8 @@ def _build_service_category_interface_list(update_callback=lambda s: None):
return None
update_callback('loading all known interfaces')
interfaces = list(_build_service_interface_user_list())
interfaces = data.build_service_interface_user_list(InventoryTask.config)
interfaces = list(interfaces)
update_callback(f'loaded {len(interfaces)} interfaces, '
'saving by service category')
......
......@@ -5,7 +5,7 @@ import responses
import inventory_provider
from inventory_provider import juniper
from inventory_provider.tasks import worker
from inventory_provider.tasks import data
TEST_DATA_FILENAME = os.path.realpath(os.path.join(
inventory_provider.__path__[0],
......@@ -50,7 +50,7 @@ def test_router_hostname_derivation(mocked_redis):
},
'redis-databases': [0, 11]
}
hostnames = list(worker._derive_router_hostnames(config))
hostnames = list(data.derive_router_hostnames(config))
assert hostnames # test data is non-empty
for h in hostnames:
assert re.match(r'^(mx[12]|qfx|srx[12])\..+\.geant\.net$', h)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment