diff --git a/inventory_provider/routes/classifier.py b/inventory_provider/routes/classifier.py index db1376cb8cb5ea199deabda9c1b679e10262b8f5..6dcb9e4285de2ce4a85f9aa316e6723462995034 100644 --- a/inventory_provider/routes/classifier.py +++ b/inventory_provider/routes/classifier.py @@ -2,14 +2,12 @@ import ipaddress import json import logging import re -from copy import copy from distutils.util import strtobool -from typing import Optional, List +from typing import Optional from flask import Blueprint, Response, request from redis import Redis -from inventory_provider.db.ims import IMS_SERVICE_NAMES from inventory_provider.routes import common routes = Blueprint("inventory-data-classifier-support-routes", __name__) @@ -124,55 +122,6 @@ def related_interfaces(hostname, interface): yield k[len(prefix):] -def get_top_level_services(circuit_id: str, r: Redis) -> List[dict]: - tls = {} - key = f'ims:circuit_hierarchy:{circuit_id}' - results = r.get(key) - if results: - results = json.loads(results.decode('utf-8')) - - def _is_tls(candidate): - if 'circuit-type' in candidate: - if candidate['product'] == 'IP PEERING - NON R&E (PUBLIC)': - return False - if candidate['circuit-type'] == 'service': - return True - if candidate['speed'] == 'BGP': - return True - - # this will be obsolete as soon as Inventory Provider - # update is done, but is here for between the time of the roll out - # and the Inventory Update - tls_names = copy(IMS_SERVICE_NAMES) - - # whilst this is a service type the top level for reporting - # are the BGP services on top of it - tls_names.remove('IP PEERING - NON R&E (PUBLIC)') - if candidate['product'] in tls_names: - return True - if candidate['speed'] == 'BGP': - return True - return False - - # should only ever be one, may refactor this - c = results[0] - - if _is_tls(c): - tls[c['id']] = { - 'id': c['id'], - 'name': c['name'], - 'status': c['status'], - 'circuit_type': 'service', - 'project': c['project'] - } - elif c['sub-circuits']: - for sub in c['sub-circuits']: - temp_parents = \ - get_top_level_services(sub, r) - tls.update({t['id']: t for t in temp_parents}) - return list(tls.values()) - - def get_related_services(source_equipment: str, interface: str, r) -> dict: """ Finds the top-level-services for the given interface @@ -189,17 +138,28 @@ def get_related_services(source_equipment: str, interface: str, r) -> dict: f'{ims_interface}') if if_services: for s in json.loads(if_services.decode('utf-8')): - yield from get_top_level_services(s['id'], r) + yield from s['top-level-services'] for related in related_interfaces(source_equipment, interface): ims_interface = get_ims_interface(related) rif_services = r.get( f'ims:interface_services:{ims_source_equipment}:{ims_interface}') if rif_services: for s in json.loads(rif_services.decode('utf-8')): - yield from get_top_level_services(s['id'], r) + yield from s['top-level-services'] def get_interface_services_and_locs(ims_source_equipment, ims_interface, r): + + def _format_service(_s): + keys = { + 'id', 'name', 'status', 'circuit_type', 'service_type', + 'project', 'pop_name', 'pop_abbreviation', 'equipment', + 'card_id', 'port', 'logical_unit', 'other_end_pop_name', + 'other_end_pop_abbreviation', 'other_end_equipment', + 'other_end_card_id', 'other_end_port', 'other_end_logical_unit'} + keys_to_remove = set(_s.keys()) - keys + for k in keys_to_remove: + _s.pop(k) result = { 'locations': [] } @@ -207,6 +167,8 @@ def get_interface_services_and_locs(ims_source_equipment, ims_interface, r): f'ims:interface_services:{ims_source_equipment}:{ims_interface}') if raw_services: result['services'] = json.loads(raw_services.decode('utf-8')) + for s in result['services']: + _format_service(s) result['related-services'] = \ list(get_related_services(ims_source_equipment, ims_interface, r)) result['locations'] = \ @@ -755,33 +717,41 @@ def get_fiberlink_trap_metadata(ne_name_str: str, object_name_str: str) \ r.get(f'ims:interface_services:{equipment_b}:{interface_b}') logger.debug(f'ims:interface_services:{equipment_b}:{interface_b}') - def _get_fr(circs): - for c in circs: - h = r.get(f'ims:circuit_hierarchy:{c["id"]}') - if h: - h = json.loads(h.decode('utf-8')) - for sc in h: - yield from sc['fibre-routes'] - fr_a_ids = set() fr_b_ids = set() all_frs = {} + tls_a_ids = set() + tls_b_ids = set() + all_tls = {} if circuits_a: circuits_a = json.loads(circuits_a.decode('utf-8')) - for fr in _get_fr(circuits_a): - fr_a_ids.add(fr['id']) - all_frs[fr['id']] = fr + for c in circuits_a: + for fr in c['fibre-routes']: + fr_a_ids.add(fr['id']) + all_frs[fr['id']] = fr + for fr in c['top-level-services']: + tls_a_ids.add(fr['id']) + all_tls[fr['id']] = fr if circuits_b: circuits_b = json.loads(circuits_b.decode('utf-8')) - for fr in _get_fr(circuits_b): - fr_b_ids.add(fr['id']) - all_frs[fr['id']] = fr + for c in circuits_b: + for fr in c['fibre-routes']: + fr_b_ids.add(fr['id']) + all_frs[fr['id']] = fr + for fr in c['top-level-services']: + tls_b_ids.add(fr['id']) + all_tls[fr['id']] = fr fr_ids = fr_a_ids & fr_b_ids if not fr_ids: fr_ids = fr_a_ids | fr_b_ids fibre_routes = [all_frs[x] for x in fr_ids] + tls_ids = tls_a_ids & tls_b_ids + if not tls_ids: + tls_ids = tls_a_ids | tls_b_ids + top_level_services = [all_tls[x] for x in tls_ids] + if fibre_routes: location_a = _location_from_equipment(equipment_a, r) location_b = _location_from_equipment(equipment_b, r) @@ -815,8 +785,7 @@ def get_fiberlink_trap_metadata(ne_name_str: str, object_name_str: str) \ }, }, 'df_route': fibre_routes[0], - 'related-services': - get_top_level_services(fibre_routes[0]['id'], r) + 'related-services': top_level_services } for rs in result['related-services']: rs.pop('id', None) diff --git a/inventory_provider/routes/testing.py b/inventory_provider/routes/testing.py index c01743f2693444873eebad08f018eac494893c2b..744b9b2f3986b3bdd607a3380965c7adf05b43d7 100644 --- a/inventory_provider/routes/testing.py +++ b/inventory_provider/routes/testing.py @@ -21,24 +21,6 @@ def flushdb(): # IMS routes -@routes.route("update-interfaces-to-services", methods=['GET', 'POST']) -def update_interfaces_to_services_route(): - worker.update_interfaces_to_services.delay(use_current=True) - return Response('OK') - - -@routes.route("update-fibre-spans", methods=['GET', 'POST']) -def update_fibre_spans_ims(): - worker.update_fibre_spans.delay(use_current=True) - return Response('OK') - - -@routes.route("update-circuit-hierarchy", methods=['GET', 'POST']) -def update_circuit_hierarchy_ims(): - worker.update_circuit_hierarchy.delay(use_current=True) - return Response('OK') - - @routes.route("update-equipment-locations", methods=['GET', 'POST']) def update_equipment_locations_ims(): worker.update_equipment_locations.delay(use_current=True) @@ -50,6 +32,12 @@ def update_lg_routers_ims(): worker.update_lg_routers.delay(use_current=True) return Response('OK') + +@routes.route( + "update-circuit-hierarchy-and-port-id-services", methods=['GET', 'POST']) +def get_circuit_hierarchy_and_port_id_services(): + worker.update_circuit_hierarchy_and_port_id_services.delay() + return Response('OK') # End of IMS routes diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index eed0c42bd6e9b71ac6a3356cb50ddd855f809a82..64021861dd877eb6725c8dbee724ddc7c23b7093 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -5,10 +5,12 @@ import logging import os import subprocess import tempfile +import threading import time from datetime import datetime from enum import IntFlag from pathlib import Path +from typing import List from uuid import uuid4 from redis.exceptions import RedisError @@ -201,18 +203,6 @@ def _unmanaged_interfaces(): _convert, InventoryTask.config.get('unmanaged-interfaces', [])) - # if interfaces: - # r = get_next_redis(InventoryTask.config) - # rp = r.pipeline() - # for ifc in interfaces: - # rp.set( - # f'reverse_interface_addresses:{ifc["name"]}', - # json.dumps(ifc)) - # rp.set( - # f'subnets:{ifc["interface address"]}', - # json.dumps([ifc])) - # rp.execute() - @app.task(base=InventoryTask, bind=True, name='update_neteng_managed_device_list') @@ -364,7 +354,6 @@ def reload_router_config(self, hostname): self.log_info(f'refreshing peers & clearing cache for {hostname}') refresh_juniper_bgp_peers(hostname, netconf_doc) refresh_juniper_interface_list(hostname, netconf_doc) - # clear_cached_classifier_responses(hostname) # load snmp indexes community = juniper.snmp_community_string(netconf_doc) @@ -445,8 +434,9 @@ def internal_refresh_phase_2(self): try: subtasks = [ - update_circuit_hierarchy.apply_async(), - update_interfaces_to_services.apply_async() + # update_circuit_hierarchy.apply_async(), + # update_interfaces_to_services.apply_async() + update_circuit_hierarchy_and_port_id_services.apply_async() ] r = get_next_redis(InventoryTask.config) @@ -471,33 +461,106 @@ def internal_refresh_phase_2(self): @app.task( - base=InventoryTask, bind=True, name='update_interfaces_to_services') + base=InventoryTask, + bind=True, + name='update_circuit_hierarchy_and_port_id_services') @log_task_entry_and_exit -def update_interfaces_to_services(self, use_current=False): +def update_circuit_hierarchy_and_port_id_services(self, use_current=False): + c = InventoryTask.config["ims"] + ds1 = IMS(c['api'], c['username'], c['password']) + ds2 = IMS(c['api'], c['username'], c['password']) + ds3 = IMS(c['api'], c['username'], c['password']) + + locations = {k: v for k, v in ims_data.get_node_locations(ds1)} + tls_names = list(ims_data.get_service_types(ds1)) + # logger.debug(json.dumps(locations['TS1.GEN.CH'], indent=2)) + # return + hierarchy = None + port_id_details = defaultdict(list) port_id_services = defaultdict(list) + interface_services = defaultdict(list) - c = InventoryTask.config["ims"] - ds = IMS(c['api'], c['username'], c['password']) + def _populate_hierarchy(): + nonlocal hierarchy + hierarchy = {d['id']: d for d in ims_data.get_circuit_hierarchy(ds1)} + logger.debug("hierarchy complete") - if use_current: - r = get_current_redis(InventoryTask.config) - else: - r = get_next_redis(InventoryTask.config) + def _populate_port_id_details(): + nonlocal port_id_details + for x in ims_data.get_port_details(ds2): + pd = port_id_details[x['port_id']] + pd.append(x) + # port_id_details = list(ims_data.get_port_details(ds2)) + logger.debug("Port details complete") - rp = r.pipeline() - # scan with bigger batches, to mitigate network latency effects - for key in r.scan_iter('ims:interface_services:*', count=2000): - rp.delete(key) - rp.execute() + def _populate_circuit_info(): + for x in ims_data.get_port_id_services(ds3): + port_id_services[x['port_a_id']].append(x) + logger.debug("port circuits complete") - locations = {k: v for k, v in ims_data.get_node_locations(ds)} - port_id_details = {d['port_id']: d for d in ims_data.get_port_details(ds)} - for service in ims_data.get_port_id_services(ds): - port_id_services[service["port_a_id"]].append(service) - rp = r.pipeline() + hierarchy_thread = threading.Thread(target=_populate_hierarchy) + hierarchy_thread.start() + + port_id_details_thread = threading.Thread(target=_populate_port_id_details) + port_id_details_thread.start() + + circuit_info_thread = threading.Thread(target=_populate_circuit_info) + circuit_info_thread.start() + + hierarchy_thread.join() + circuit_info_thread.join() + port_id_details_thread.join() + + def _get_fibre_routes(c_id): + _circ = hierarchy.get(c_id, None) + if _circ is None: + return + if _circ['speed'].lower() == 'fibre_route': + yield _circ['id'] + else: + for cc in _circ['carrier-circuits']: + yield from _get_fibre_routes(cc) + + # whilst this is a service type the top level for reporting + # are the BGP services on top of it + tls_names.remove('IP PEERING - NON R&E (PUBLIC)') + + def get_top_level_services(circuit_id: str) -> List[dict]: + tls = {} + c = hierarchy.get(circuit_id, None) + if c: + + def _is_tls(candidate): + if 'circuit-type' in candidate: + if candidate['product'] == 'IP PEERING - NON R&E (PUBLIC)': + return False + if candidate['circuit-type'] == 'service': + return True + if candidate['speed'] == 'BGP': + return True + if candidate['product'] in tls_names: + return True + if candidate['speed'] == 'BGP': + return True + return False + + if _is_tls(c): + tls[c['id']] = { + 'id': c['id'], + 'name': c['name'], + 'status': c['status'], + 'circuit_type': 'service', + 'project': c['project'] + } + elif c['sub-circuits']: + for sub in c['sub-circuits']: + temp_parents = \ + get_top_level_services(sub) + tls.update({t['id']: t for t in temp_parents}) + return list(tls.values()) def _format_service(s): - pd_a = port_id_details[s['port_a_id']] + pd_a = port_id_details[s['port_a_id']][0] loc_a = locations[pd_a['equipment_name']]['pop'] s['pop_name'] = loc_a['name'] s['pop_abbreviation'] = loc_a['abbreviation'] @@ -506,7 +569,7 @@ def update_interfaces_to_services(self, use_current=False): s['port'] = pd_a['interface_name'] s['logical_unit'] = '' # this is redundant I believe if 'port_b_id' in s: - pd_b = port_id_details[s['port_b_id']] + pd_b = port_id_details[s['port_b_id']][0] loc_b = locations[pd_b['equipment_name']]['pop'] s['other_end_pop_name'] = loc_a['name'] s['other_end_pop_abbreviation'] = loc_b['abbreviation'] @@ -521,31 +584,26 @@ def update_interfaces_to_services(self, use_current=False): s.pop('port_a_id', None) s.pop('port_b_id', None) - interface_services = defaultdict(list) - - for port_info in port_id_details.values(): - service_ids = set() - services = [] - for service in port_id_services[port_info['port_id']]: - if service['id'] not in service_ids: - _format_service(service) - services.append(service) - service_ids.add(service['id']) - - if_key = f'{port_info["equipment_name"]}:{port_info["interface_name"]}' - interface_services[if_key].extend(services) + for key, value in port_id_details.items(): + for details in value: + k = f"{details['equipment_name']}:" \ + f"{details['interface_name']}" + circuits = port_id_services.get(details['port_id'], []) + + # add fibre-routes to circuits + for circ in circuits: + circ['fibre-routes'] = \ + [{ + 'id': hierarchy[x]['id'], + 'name': hierarchy[x]['name'], + 'status': hierarchy[x]['status'] + } for x in set(_get_fibre_routes(circ['id']))] + circ['top-level-services'] = \ + get_top_level_services(circ['id']) + _format_service(circ) + + interface_services[k].extend(circuits) - for k, v in interface_services.items(): - rp.set( - f'ims:interface_services:{k}', - json.dumps(v)) - rp.execute() - - -@app.task(base=InventoryTask, bind=True, name='update_circuit_hierarchy') -@log_task_entry_and_exit -def update_circuit_hierarchy(self, use_current=False): - hierarchy = {} if use_current: r = get_current_redis(InventoryTask.config) else: @@ -553,33 +611,16 @@ def update_circuit_hierarchy(self, use_current=False): rp = r.pipeline() for k in r.scan_iter('ims:circuit_hierarchy:*', count=2000): rp.delete(k) + for k in r.scan_iter('ims:interface_services:*', count=2000): + rp.delete(k) rp.execute() - - c = InventoryTask.config["ims"] - ds = IMS(c['api'], c['username'], c['password']) - - hierarchy = {d['id']: d for d in ims_data.get_circuit_hierarchy(ds)} - - def _get_fibre_routes(c_id): - _circ = hierarchy.get(c_id, None) - if _circ is None: - return - if _circ['speed'].lower() == 'fibre_route': - yield _circ['id'] - else: - for cc in _circ['carrier-circuits']: - yield from _get_fibre_routes(cc) - rp = r.pipeline() for circ in hierarchy.values(): - circ['fibre-routes'] = \ - [{ - 'id': hierarchy[x]['id'], - 'name': hierarchy[x]['name'], - 'status': hierarchy[x]['status'] - } for x in set(_get_fibre_routes(circ['id']))] rp.set(f'ims:circuit_hierarchy:{circ["id"]}', json.dumps([circ])) - + for k, v in interface_services.items(): + rp.set( + f'ims:interface_services:{k}', + json.dumps(v)) rp.execute() @@ -604,7 +645,7 @@ def update_equipment_locations(self, use_current=False): # put into a list to match non-IMS version rp.set(f'ims:location:{h}', json.dumps([d])) if h in hostnames_found: - print(f'Multiple entries for {h}') + logger.debug(f'Multiple entries for {h}') hostnames_found.add(h) rp.execute() diff --git a/test/data/router-info.json b/test/data/router-info.json index 312689ccb3dc13297f8820d69f6499c9987f5084..e3bed7251440eab498899defb87d58b686c1073a 100644 Binary files a/test/data/router-info.json and b/test/data/router-info.json differ