import ipaddress import json import logging import re from copy import copy from distutils.util import strtobool from typing import Optional, List from flask import Blueprint, Response, request from redis import Redis from inventory_provider.db.ims import IMS_SERVICE_NAMES from inventory_provider.routes import common routes = Blueprint("inventory-data-classifier-support-routes", __name__) logger = logging.getLogger(__name__) def _LOCATION(equipment, name, abbreviation): return { 'equipment': equipment, 'name': name, 'abbreviation': abbreviation } def build_locations(loc_a: Optional[dict], loc_b: Optional[dict] = None) \ -> Optional[dict]: locations = None if loc_a: locations = {'a': loc_a} if loc_b: locations['b'] = loc_b return locations def _remove_duplicates_from_list(all_: list) -> list: """ removes duplicates from the input list the list items must be encodable as json :param all_: :return: a new list with unique elements """ tmp_dict = dict( [(json.dumps(item, sort_keys=True), item) for item in all_]) return list(tmp_dict.values()) def _location_from_equipment(equipment_name: str, r: Redis) -> Optional[dict]: result = r.get(f'ims:location:{equipment_name}') if not result: logger.error(f'error looking up location for {equipment_name}') return None result = json.loads(result.decode('utf-8')) if not result: logger.error( f'sanity failure: empty list for location {equipment_name}') return None return _LOCATION( equipment=result[0]['equipment-name'], name=result[0]['pop']['name'], abbreviation=result[0]['pop']['abbreviation']) def _location_from_services(services, r: Redis): for s in services: loc_a = _location_from_equipment(s['equipment'], r) loc_b = _location_from_equipment(s['other_end_equipment'], r) \ if s['other_end_equipment'] else None yield build_locations(loc_a, loc_b) class ClassifierRequestError(Exception): status_code = 500 def __init__(self): super().__init__() self.message = "Unclassified Internal Error" class ClassifierProcessingError(ClassifierRequestError): status_code = 422 def __init__(self, message, status_code=None): super().__init__() self.message = str(message) if status_code is not None: self.status_code = status_code @routes.errorhandler(ClassifierRequestError) def handle_request_error(error): return Response( response=error.message, status=error.status_code) @routes.after_request def after_request(resp): return common.after_request(resp) def get_ims_equipment_name(equipment_name: str) -> str: ims_equipment_name = equipment_name.upper() if ims_equipment_name.startswith('MX'): ims_equipment_name = ims_equipment_name.split('.GEANT.NET')[0] return ims_equipment_name def get_ims_interface(interface: str) -> str: return interface.upper() def related_interfaces(hostname, interface): r = common.get_current_redis() prefix = f'netconf-interfaces:{hostname}:' for k in r.keys(f'{prefix}{interface}.*'): k = k.decode('utf-8') assert k.startswith(prefix) # sanity assert len(k) > len(prefix) # sanity (contains at least an interface) yield k[len(prefix):] def get_top_level_services(circuit_id: str, r: Redis) -> List[dict]: tls = {} key = f'ims:circuit_hierarchy:{circuit_id}' results = r.get(key) if results: results = json.loads(results.decode('utf-8')) def _is_tls(candidate): if 'circuit-type' in candidate: if candidate['product'] == 'IP PEERING - NON R&E (PUBLIC)': return False if candidate['circuit-type'] == 'service': return True if candidate['speed'] == 'BGP': return True # this will be obsolete as soon as Inventory Provider # update is done, but is here for between the time of the roll out # and the Inventory Update tls_names = copy(IMS_SERVICE_NAMES) # whilst this is a service type the top level for reporting # are the BGP services on top of it tls_names.remove('IP PEERING - NON R&E (PUBLIC)') if candidate['product'] in tls_names: return True if candidate['speed'] == 'BGP': return True return False # should only ever be one, may refactor this c = results[0] if _is_tls(c): tls[c['id']] = { 'id': c['id'], 'name': c['name'], 'status': c['status'], 'circuit_type': 'service', 'project': c['project'] } elif c['sub-circuits']: for sub in c['sub-circuits']: temp_parents = \ get_top_level_services(sub, r) tls.update({t['id']: t for t in temp_parents}) return list(tls.values()) def get_related_services(source_equipment: str, interface: str, r) -> dict: """ Finds the top-level-services for the given interface and also gets the top-level-services for the related interfaces e.g. ae20 will also find services on all logical units of ae20 (ae20.1 ...) :param source_equipment: equipment name :param interface: interface name :param r: redis connection :return: Dict """ ims_source_equipment = get_ims_equipment_name(source_equipment) ims_interface = get_ims_interface(interface) if_services = r.get(f'ims:interface_services:{ims_source_equipment}:' f'{ims_interface}') if if_services: for s in json.loads(if_services.decode('utf-8')): yield from get_top_level_services(s['id'], r) for related in related_interfaces(source_equipment, interface): ims_interface = get_ims_interface(related) rif_services = r.get( f'ims:interface_services:{ims_source_equipment}:{ims_interface}') if rif_services: for s in json.loads(rif_services.decode('utf-8')): yield from get_top_level_services(s['id'], r) def get_interface_services_and_locs(ims_source_equipment, ims_interface, r): result = { 'locations': [] } raw_services = r.get( f'ims:interface_services:{ims_source_equipment}:{ims_interface}') if raw_services: result['services'] = json.loads(raw_services.decode('utf-8')) result['related-services'] = \ list(get_related_services(ims_source_equipment, ims_interface, r)) result['locations'] = \ list(_location_from_services(result['services'], r)) if not result['services']: result.pop('services', None) if result['related-services']: for r in result['related-services']: r.pop('id', None) else: result.pop('related-services', None) if not result.get('locations', None): locations = build_locations( _location_from_equipment(ims_source_equipment, r)) result['locations'] = [locations] if locations else [] result['locations'] = _remove_duplicates_from_list(result['locations']) return result def _link_interface_info(r, hostname, interface): """ Generates the 'interface' field for the juniper-link-info response payload. only called from get_juniper_link_info :param r: redis connection :param hostname: router hostname :param interface: interface name :return: payload dict """ ifc_info = r.get(f'netconf-interfaces:{hostname}:{interface}') if not ifc_info: # warning: this should match the structure returned by # juniper:list_interfaces:_ifc_info ifc_info = { 'name': interface, 'description': '', 'bundle': [], 'ipv4': [], 'ipv6': [] } else: ifc_info = json.loads(ifc_info.decode('utf-8')) bundle_members = r.get( f'netconf-interface-bundles:{hostname}:{interface}') if bundle_members: ifc_info['bundle_members'] \ = json.loads(bundle_members.decode('utf-8')) else: ifc_info['bundle_members'] = [] snmp_info = r.get( f'snmp-interfaces-single:{hostname}:{interface}') if snmp_info: snmp_info = json.loads(snmp_info.decode('utf-8')) ifc_info['snmp'] = { 'community': snmp_info['community'], 'index': snmp_info['index'] } return ifc_info @routes.route("/juniper-link-info/<source_equipment>/<path:interface>", methods=['GET', 'POST']) @common.require_accepts_json def get_juniper_link_info(source_equipment: str, interface: str) -> Response: """ Handler for /classifier/juniper-link-info that returns metadata about an IP interface. The response will be formatted according to the following schema: .. asjson:: inventory_provider.routes.classifier_schema.JUNIPER_LINK_RESPONSE_SCHEMA :param source_equipment: router hostname :param interface: link interface name :return: """ ims_source_equipment = get_ims_equipment_name(source_equipment) ims_interface = get_ims_interface(interface) r = common.get_current_redis() cache_key = \ f'classifier-cache:juniper:{ims_source_equipment}:{ims_interface}' ignore_cache = request.args.get('ignore-cache', default='false', type=str) try: ignore_cache = strtobool(ignore_cache) except ValueError: ignore_cache = False if ignore_cache: result = False else: result = r.get(cache_key) if result: result = result.decode('utf-8') else: result = { 'interface': _link_interface_info(r, source_equipment, interface) } bundle_members = r.get( f'netconf-interface-bundles:{source_equipment}:{interface}') if bundle_members: result['interface']['bundle_members'] = \ json.loads(bundle_members.decode('utf-8')) else: result['interface']['bundle_members'] = [] result.update( get_interface_services_and_locs( ims_source_equipment, ims_interface, r ) ) result = json.dumps(result) # cache this data for the next call r.set(cache_key, result.encode('utf-8')) return Response(result, mimetype="application/json") def _asn_group_info(redis, address): """ :param redis: a redis db connection :param address: the remote peer address :return: """ try: address = ipaddress.ip_address(address).exploded except ValueError: raise ClassifierProcessingError( f'unable to parse {address} as an ip address') all_peerings = redis.get(f'juniper-peerings:remote:{address}') if not all_peerings: return None all_peerings = json.loads(all_peerings.decode('utf-8')) all_asn = {p['remote-asn'] for p in all_peerings if 'remote-asn' in p} if not all_asn: return None peer_asn = all_asn.pop() if all_asn: logger.error( f'found multiple asn''s for {address}, ' f'using {peer_asn} and ignoring {all_asn}') peerings_this_asn = redis.get(f'juniper-peerings:peer-asn:{peer_asn}') if not peerings_this_asn: logger.error( f'internal data corruption, no peerings found for asn {peer_asn}') return None peerings_this_asn = json.loads(peerings_this_asn.decode('utf-8')) return { 'asn': peer_asn, 'peers': [ { 'router': p['hostname'], 'address': p['address'], # for future use (in case isolation # should be computed per peering type) 'group': p['group'] } for p in peerings_this_asn ] } def _vpn_rr_peering_info(redis, address): """ :param redis: a redis db connection :param address: the remote peer address :return: """ def _is_rr(peering_info): if peering_info.get('logical-system', '') != 'VRR': return False group = peering_info.get('group', '') if group not in ('VPN-RR', 'VPN-RR-INTERNAL'): return False if 'description' not in peering_info: logger.error('internal data error, looks like vpn rr peering' f'but description is missing: {peering_info}') return False return True try: address = ipaddress.ip_address(address).exploded except ValueError: raise ClassifierProcessingError( f'unable to parse {address} as an ip address') all_peerings = redis.get(f'juniper-peerings:remote:{address}') if not all_peerings: return None all_peerings = json.loads(all_peerings.decode('utf-8')) rr_peerings = list(filter(_is_rr, all_peerings)) if not rr_peerings: return None if len(rr_peerings) > 1: logger.warning( f'using the first of multiple vpn rr peer matches: {rr_peerings}') return_value = { 'name': address, 'description': rr_peerings[0]['description'], 'router': rr_peerings[0]['hostname'] } if 'remote-asn' in rr_peerings[0]: return_value['peer-as'] = rr_peerings[0]['remote-asn'] return return_value def _ix_peering_info(redis, address): """ :param redis: a redis db connection :param address: the remote peer address :return: """ def _is_ix(peering_info): if peering_info.get('instance', '') != 'IAS': return False if not peering_info.get('group', '').startswith('GEANT-IX'): return False expected_keys = ('description', 'local-asn', 'remote-asn') if any(peering_info.get(x, None) is None for x in expected_keys): logger.error('internal data error, looks like ix peering but' f'some expected keys are missing: {peering_info}') return False return True try: address = ipaddress.ip_address(address).exploded except ValueError: raise ClassifierProcessingError( f'unable to parse {address} as an ip address') all_peerings = redis.get(f'juniper-peerings:remote:{address}') if not all_peerings: return None all_peerings = json.loads(all_peerings.decode('utf-8')) ix_peerings = list(filter(_is_ix, all_peerings)) if not ix_peerings: return None if len(ix_peerings) > 1: logger.warning( f'using the first of multiple ix peer matches: {ix_peerings}') peer_info = { 'name': address, 'description': ix_peerings[0]['description'], 'as': { 'local': ix_peerings[0]['local-asn'], 'peer': ix_peerings[0]['remote-asn'] }, 'router': ix_peerings[0]['hostname'] } return_value = { 'peer': peer_info, 'group': [], 'router': [] } # load the other peers in the same group # regex needed??? (e.g. tabs???) peering_group_name = peer_info['description'].split(' ')[0] peering_group = redis.get( f'juniper-peerings:ix-groups:{peering_group_name}') if peering_group: peering_group = peering_group.decode('utf-8') return_value['group'] = sorted(json.loads(peering_group)) # load the other ix peers from the same router router_peerings = redis.get( f'juniper-peerings:hosts:{peer_info["router"]}') router_peerings = json.loads(router_peerings.decode('utf-8')) router_ix_peers = list(filter(_is_ix, router_peerings)) if router_ix_peers: addresses = {p['address'] for p in router_ix_peers} return_value['router'] = sorted(list(addresses)) return return_value def find_interfaces(address): """ TODO: this is probably the least efficient way of doing this (if it's a problem, pre-compute these lists) :param address: an ipaddress object :return: """ r = common.get_current_redis() for k in r.keys('subnets:*'): k = k.decode('utf-8') m = re.match(r'^subnets:(.*)$', k) assert m, 'sanity failure: redis returned an invalid key name' interface = ipaddress.ip_interface(m.group(1)) if address in interface.network: info = r.get(k).decode('utf-8') info = json.loads(info) for ifc in info: yield ifc @routes.route("/peer-info/<address_str>", methods=['GET', 'POST']) @common.require_accepts_json def peer_info(address_str: str) -> Response: """ Handler for /classifier/peer-info that returns bgp peering metadata. The response will be formatted according to the following schema: .. asjson:: inventory_provider.routes.classifier_schema.PEER_INFO_RESPONSE_SCHEMA :param address: string representation of a bgp peer address :return: """ # canonicalize the input address first ... try: address = ipaddress.ip_address(address_str) address_str = address.exploded except ValueError: raise ClassifierProcessingError( f'unable to parse {address_str} as an ip address') r = common.get_current_redis() cache_key = f'classifier-cache:peer:{address_str}' ignore_cache = request.args.get('ignore-cache', default='false', type=str) try: ignore_cache = strtobool(ignore_cache) except ValueError: ignore_cache = False if ignore_cache: result = False else: result = r.get(cache_key) if result: result = result.decode('utf-8') else: result = { 'interfaces': [], 'locations': [], } ix_peering_info = _ix_peering_info(r, address_str) if ix_peering_info: result['ix-public-peer-info'] = ix_peering_info result['locations'].append(build_locations( _location_from_equipment( get_ims_equipment_name( ix_peering_info['peer']['router']), r))) vpn_rr_peering_info = _vpn_rr_peering_info(r, address_str) if vpn_rr_peering_info: result['vpn-rr-peer-info'] = vpn_rr_peering_info result['locations'].append(build_locations( _location_from_equipment( get_ims_equipment_name(vpn_rr_peering_info['router']), r))) asn_group_info = _asn_group_info(r, address_str) if asn_group_info: result['asn'] = asn_group_info for interface in find_interfaces(address): ims_equipment = get_ims_equipment_name(interface["router"]) ims_interface = get_ims_interface(interface["interface name"]) services_and_locs = get_interface_services_and_locs( ims_equipment, ims_interface, r ) t = {'interface': interface} if services_and_locs.get('services', None): t['services'] = services_and_locs['services'] result['interfaces'].append(t) result['locations'].extend(services_and_locs['locations']) snmp_info = r.get( f'snmp-peerings:remote:{address_str}') if snmp_info: snmp_info = json.loads(snmp_info.decode('utf-8')) result['snmp'] = [ { 'hostname': h['hostname'], 'community': h['community'], 'oid': h['oid'] } for h in snmp_info] result['locations'] = _remove_duplicates_from_list(result['locations']) if not result.get('interfaces', None): result.pop('interfaces', None) result = json.dumps(result) # cache this data for the next call r.set(cache_key, result.encode('utf-8')) return Response(result, mimetype="application/json") @routes.route("/infinera-lambda-info/" "<source_equipment>/<interface>/<circuit_id>", methods=['GET', 'POST']) @common.require_accepts_json def get_trap_metadata(source_equipment: str, interface: str, circuit_id: str) \ -> Response: """ Handler for /classifier/infinera-lambda-info that returns metadata for as DTNX port. The response will be formatted according to the following schema: .. asjson:: inventory_provider.routes.classifier_schema.INFINERA_LAMBDA_INFO_RESPONSE_SCHEMA :param source_equipment: DTNX name :param address: interface/port name :param circuit_id: infinera circuit id :return: """ ims_source_equipment = get_ims_equipment_name(source_equipment) ims_interface = get_ims_interface(interface) cache_key = f'classifier-cache:infinera:{source_equipment}:{interface}' r = common.get_current_redis() ignore_cache = request.args.get('ignore-cache', default='false', type=str) try: ignore_cache = strtobool(ignore_cache) except ValueError: ignore_cache = False if ignore_cache: result = False else: result = r.get(cache_key) if result: result = result.decode('utf-8') else: result = { 'locations': [] } result.update(get_interface_services_and_locs( ims_source_equipment, ims_interface, r )) if not result: return Response( response="no available info for {} {}".format( source_equipment, interface), status=404, mimetype="text/html") result = json.dumps(result) # cache this data for the next call r.set(cache_key, result.encode('utf-8')) return Response(result, mimetype="application/json") @routes.route("/infinera-fiberlink-info/<ne_name_str>/<object_name_str>", methods=['GET', 'POST']) @common.require_accepts_json def get_fiberlink_trap_metadata(ne_name_str: str, object_name_str: str) \ -> Response: """ Handler for /classifier/infinera-fiberlink-info that returns metadata for a particular opitical path segment. TODO: no schema is declared, and there are no validation tests :param ne_name_str: OLA or DTNX equipment name :param object_name_str: path name :return: """ interfaces = object_name_str.split('_') p = r'([a-zA-Z\d]+?-(OLA|DTNX)\d+(-\d)?)' matches = re.findall(p, ne_name_str) if len(matches) != 2 or len(interfaces) != 2: raise ClassifierProcessingError( f'unable to parse {ne_name_str} {object_name_str} ' 'into two elements') r = common.get_current_redis() # double check that we only need to check the two nodes and not the objects cache_key = \ f'classifier-cache:fiberlink:{ne_name_str}:{object_name_str}' ignore_cache = request.args.get('ignore-cache', default='false', type=str) try: ignore_cache = strtobool(ignore_cache) except ValueError: ignore_cache = False if ignore_cache: result = False else: result = r.get(cache_key) if result: result = result.decode('utf-8') else: equipment_a = matches[0][0] equipment_b = matches[1][0] interface_a = interfaces[0] interface_b = interfaces[1] circuits_a = \ r.get(f'ims:interface_services:{equipment_a}:{interface_a}') logger.debug(f'ims:interface_services:{equipment_a}:{interface_a}') circuits_b = \ r.get(f'ims:interface_services:{equipment_b}:{interface_b}') logger.debug(f'ims:interface_services:{equipment_b}:{interface_b}') def _get_fr(circs): for c in circs: h = r.get(f'ims:circuit_hierarchy:{c["id"]}') if h: h = json.loads(h.decode('utf-8')) for sc in h: yield from sc['fibre-routes'] fr_a_ids = set() fr_b_ids = set() all_frs = {} if circuits_a: circuits_a = json.loads(circuits_a.decode('utf-8')) for fr in _get_fr(circuits_a): fr_a_ids.add(fr['id']) all_frs[fr['id']] = fr if circuits_b: circuits_b = json.loads(circuits_b.decode('utf-8')) for fr in _get_fr(circuits_b): fr_b_ids.add(fr['id']) all_frs[fr['id']] = fr fr_ids = fr_a_ids & fr_b_ids if not fr_ids: fr_ids = fr_a_ids | fr_b_ids fibre_routes = [all_frs[x] for x in fr_ids] if fibre_routes: location_a = _location_from_equipment(equipment_a, r) location_b = _location_from_equipment(equipment_b, r) if location_a: loc_a = location_a else: loc_a = _LOCATION(equipment_a, '', '') if location_b: loc_b = location_b else: loc_b = _LOCATION(equipment_b, '', '') # added locations in preparation for refactoring to be in-line # with other location data. Once Dashboard has been altered to # use this for fiberlink alarms the 'ends' attribute can be # removed result = { 'locations': [ build_locations(loc_a, loc_b) ], 'ends': { 'a': { 'pop': loc_a['name'], 'pop_abbreviation': loc_a['abbreviation'], 'equipment': loc_a['equipment'] }, 'b': { 'pop': loc_b['name'], 'pop_abbreviation': loc_b['abbreviation'], 'equipment': loc_b['equipment'] }, }, 'df_route': fibre_routes[0], 'related-services': get_top_level_services(fibre_routes[0]['id'], r) } for rs in result['related-services']: rs.pop('id', None) result = json.dumps(result) r.set(cache_key, result) if not result: return Response( response="no available info for " f"{ne_name_str} {object_name_str}", status=404, mimetype="text/html") return Response(result, mimetype="application/json") @routes.route('/coriant-info/<equipment_name>/<path:entity_string>', methods=['GET', 'POST']) @common.require_accepts_json def get_coriant_info(equipment_name: str, entity_string: str) -> Response: """ Handler for /classifier/coriant-info that returns metadata for a coriant path. The response will be formatted according to the following schema: .. asjson:: inventory_provider.routes.classifier_schema.CORIANT_INFO_RESPONSE_SCHEMA :param source_equipment: grv hostname :param entity_string: path name :return: """ r = common.get_current_redis() ims_source_equipment = get_ims_equipment_name(equipment_name) ims_interface = get_ims_interface(entity_string) cache_key = 'classifier-cache:coriant:' \ f'{ims_source_equipment}:{ims_interface}' ignore_cache = request.args.get('ignore-cache', default='false', type=str) try: ignore_cache = strtobool(ignore_cache) except ValueError: ignore_cache = False if ignore_cache: result = False else: result = r.get(cache_key) if result: result = result.decode('utf-8') else: m = re.match(r'^(\d+\-\d+)\.(\d+)', ims_interface) if not m: logger.error( f'invalid coriant entity string format: {ims_interface}') return Response( response="no available info for " f"'{ims_source_equipment}' '{ims_interface}'", status=404, mimetype="text/html") card_id = m.group(1).replace('-', '/') port = m.group(2) result = { 'equipment name': ims_source_equipment, 'card id': card_id, 'port number': port } interface_name = f'{card_id}/{port}' result.update(get_interface_services_and_locs( ims_source_equipment, interface_name, r )) for s in result.get('services', []): s.pop('card_id', None) s.pop('port', None) s.pop('logical_unit', None) s.pop('other_end_card_id', None) s.pop('other_end_port', None) s.pop('other_end_logical_unit', None) result['locations'] = _remove_duplicates_from_list(result['locations']) result = json.dumps(result) # cache this data for the next call r.set(cache_key, result.encode('utf-8')) return Response(result, mimetype="application/json")