import ipaddress import json import logging import re from typing import Optional from flask import Blueprint, Response, request from redis import Redis from inventory_provider.routes import common from inventory_provider.routes.common import _ignore_cache_or_retrieve routes = Blueprint("inventory-data-classifier-support-routes", __name__) logger = logging.getLogger(__name__) def _LOCATION(equipment, name, abbreviation): return { 'equipment': equipment, 'name': name, 'abbreviation': abbreviation } def build_locations(loc_a: Optional[dict], loc_b: Optional[dict] = None) \ -> Optional[dict]: locations = None if loc_a: locations = {'a': loc_a} if loc_b: locations['b'] = loc_b return locations def _remove_duplicates_from_list(all_: list) -> list: """ removes duplicates from the input list the list items must be encodable as json :param all_: :return: a new list with unique elements """ tmp_dict = dict( [(json.dumps(item, sort_keys=True), item) for item in all_]) return list(tmp_dict.values()) def _location_from_equipment(equipment_name: str, r: Redis) -> Optional[dict]: result = r.get(f'ims:location:{equipment_name}') if not result: logger.error(f'error looking up location for {equipment_name}') return None result = json.loads(result.decode('utf-8')) if not result: logger.error( f'sanity failure: empty list for location {equipment_name}') return None return _LOCATION( equipment=result[0]['equipment-name'], name=result[0]['pop']['name'], abbreviation=result[0]['pop']['abbreviation']) def _location_from_services(services, r: Redis): for s in services: loc_a = _location_from_equipment(s['equipment'], r) loc_b = _location_from_equipment(s['other_end_equipment'], r) \ if s['other_end_equipment'] else None yield build_locations(loc_a, loc_b) class ClassifierRequestError(Exception): status_code = 500 def __init__(self): super().__init__() self.message = "Unclassified Internal Error" class ClassifierProcessingError(ClassifierRequestError): status_code = 422 def __init__(self, message, status_code=None): super().__init__() self.message = str(message) if status_code is not None: self.status_code = status_code @routes.errorhandler(ClassifierRequestError) def handle_request_error(error): return Response( response=error.message, status=error.status_code) @routes.after_request def after_request(resp): return common.after_request(resp) def get_ims_equipment_name(equipment_name: str) -> str: ims_equipment_name = equipment_name.upper() if ims_equipment_name.startswith('MX'): ims_equipment_name = ims_equipment_name.split('.GEANT.NET')[0] return ims_equipment_name def get_ims_interface(interface: str) -> str: return interface.upper() def related_interfaces(hostname, interface): r = common.get_current_redis() prefix = f'netconf-interfaces:{hostname}:' for k in r.keys(f'{prefix}{interface}.*'): k = k.decode('utf-8') assert k.startswith(prefix) # sanity assert len(k) > len(prefix) # sanity (contains at least an interface) yield k[len(prefix):] def get_related_services(source_equipment: str, interface: str, r) -> dict: """ Finds the top-level-services for the given interface and also gets the top-level-services for the related interfaces e.g. ae20 will also find services on all logical units of ae20 (ae20.1 ...) :param source_equipment: equipment name :param interface: interface name :param r: redis connection :return: Dict """ ims_source_equipment = get_ims_equipment_name(source_equipment) ims_interface = get_ims_interface(interface) if_services = r.get(f'ims:interface_services:{ims_source_equipment}:' f'{ims_interface}') if if_services: for s in json.loads(if_services.decode('utf-8')): yield from s['top-level-services'] for related in related_interfaces(source_equipment, interface): ims_interface = get_ims_interface(related) rif_services = r.get( f'ims:interface_services:{ims_source_equipment}:{ims_interface}') if rif_services: for s in json.loads(rif_services.decode('utf-8')): yield from s['top-level-services'] def get_interface_services_and_locs(ims_source_equipment, ims_interface, r): def _format_service(_s): keys = { 'id', 'name', 'status', 'circuit_type', 'service_type', 'project', 'pop_name', 'pop_abbreviation', 'equipment', 'card_id', 'port', 'logical_unit', 'other_end_pop_name', 'other_end_pop_abbreviation', 'other_end_equipment', 'other_end_card_id', 'other_end_port', 'other_end_logical_unit'} keys_to_remove = set(_s.keys()) - keys for k in keys_to_remove: _s.pop(k) result = { 'locations': [] } raw_services = r.get( f'ims:interface_services:{ims_source_equipment}:{ims_interface}') if raw_services: result['services'] = json.loads(raw_services.decode('utf-8')) for s in result['services']: _format_service(s) result['related-services'] = \ list(get_related_services(ims_source_equipment, ims_interface, r)) result['locations'] = \ list(_location_from_services(result['services'], r)) if not result['services']: result.pop('services', None) if result['related-services']: for r in result['related-services']: r.pop('id', None) else: result.pop('related-services', None) if not result.get('locations', None): locations = build_locations( _location_from_equipment(ims_source_equipment, r)) result['locations'] = [locations] if locations else [] result['locations'] = _remove_duplicates_from_list(result['locations']) return result def _link_interface_info(r, hostname, interface): """ Generates the 'interface' field for the juniper-link-info response payload. only called from get_juniper_link_info :param r: redis connection :param hostname: router hostname :param interface: interface name :return: payload dict """ ifc_info = r.get(f'netconf-interfaces:{hostname}:{interface}') if not ifc_info: # warning: this should match the structure returned by # juniper:list_interfaces:_ifc_info ifc_info = { 'name': interface, 'description': '', 'bundle': [], 'ipv4': [], 'ipv6': [] } else: ifc_info = json.loads(ifc_info.decode('utf-8')) bundle_members = r.get( f'netconf-interface-bundles:{hostname}:{interface}') if bundle_members: ifc_info['bundle_members'] \ = json.loads(bundle_members.decode('utf-8')) else: ifc_info['bundle_members'] = [] snmp_info = r.get( f'snmp-interfaces-single:{hostname}:{interface}') if snmp_info: snmp_info = json.loads(snmp_info.decode('utf-8')) ifc_info['snmp'] = { 'community': snmp_info['community'], 'index': snmp_info['index'] } return ifc_info @routes.route("/juniper-link-info/<source_equipment>/<path:interface>", methods=['GET', 'POST']) @common.require_accepts_json def get_juniper_link_info(source_equipment: str, interface: str) -> Response: """ Handler for /classifier/juniper-link-info that returns metadata about an IP interface. The response will be formatted according to the following schema: .. asjson:: inventory_provider.routes.classifier_schema.JUNIPER_LINK_RESPONSE_SCHEMA :param source_equipment: router hostname :param interface: link interface name :return: """ ims_source_equipment = get_ims_equipment_name(source_equipment) ims_interface = get_ims_interface(interface) r = common.get_current_redis() cache_key = \ f'classifier-cache:juniper:{ims_source_equipment}:{ims_interface}' result = _ignore_cache_or_retrieve(request, cache_key, r) if not result: result = { 'interface': _link_interface_info(r, source_equipment, interface) } bundle_members = r.get( f'netconf-interface-bundles:{source_equipment}:{interface}') if bundle_members: result['interface']['bundle_members'] = \ json.loads(bundle_members.decode('utf-8')) else: result['interface']['bundle_members'] = [] result.update( get_interface_services_and_locs( ims_source_equipment, ims_interface, r ) ) result = json.dumps(result) # cache this data for the next call r.set(cache_key, result.encode('utf-8')) return Response(result, mimetype="application/json") def _asn_group_info(redis, address): """ :param redis: a redis db connection :param address: the remote peer address :return: """ try: address = ipaddress.ip_address(address).exploded except ValueError: raise ClassifierProcessingError( f'unable to parse {address} as an ip address') all_peerings = redis.get(f'juniper-peerings:remote:{address}') if not all_peerings: return None all_peerings = json.loads(all_peerings.decode('utf-8')) all_asn = {p['remote-asn'] for p in all_peerings if 'remote-asn' in p} if not all_asn: return None peer_asn = all_asn.pop() if all_asn: logger.error( f'found multiple asn''s for {address}, ' f'using {peer_asn} and ignoring {all_asn}') peerings_this_asn = redis.get(f'juniper-peerings:peer-asn:{peer_asn}') if not peerings_this_asn: logger.error( f'internal data corruption, no peerings found for asn {peer_asn}') return None peerings_this_asn = json.loads(peerings_this_asn.decode('utf-8')) return { 'asn': peer_asn, 'peers': [ { 'router': p['hostname'], 'address': p['address'], # for future use (in case isolation # should be computed per peering type) 'group': p['group'] } for p in peerings_this_asn ] } def _vpn_rr_peering_info(redis, address): """ :param redis: a redis db connection :param address: the remote peer address :return: """ def _is_rr(peering_info): if peering_info.get('logical-system', '') != 'VRR': return False group = peering_info.get('group', '') if group not in ('VPN-RR', 'VPN-RR-INTERNAL'): return False if 'description' not in peering_info: logger.error('internal data error, looks like vpn rr peering' f'but description is missing: {peering_info}') return False return True try: address = ipaddress.ip_address(address).exploded except ValueError: raise ClassifierProcessingError( f'unable to parse {address} as an ip address') all_peerings = redis.get(f'juniper-peerings:remote:{address}') if not all_peerings: return None all_peerings = json.loads(all_peerings.decode('utf-8')) rr_peerings = list(filter(_is_rr, all_peerings)) if not rr_peerings: return None if len(rr_peerings) > 1: logger.warning( f'using the first of multiple vpn rr peer matches: {rr_peerings}') return_value = { 'name': address, 'description': rr_peerings[0]['description'], 'router': rr_peerings[0]['hostname'] } if 'remote-asn' in rr_peerings[0]: return_value['peer-as'] = rr_peerings[0]['remote-asn'] return return_value def _ix_peering_info(redis, address): """ :param redis: a redis db connection :param address: the remote peer address :return: """ def _is_ix(peering_info): if peering_info.get('instance', '') != 'IAS': return False if not peering_info.get('group', '').startswith('GEANT-IX'): return False expected_keys = ('description', 'local-asn', 'remote-asn') if any(peering_info.get(x, None) is None for x in expected_keys): logger.error('internal data error, looks like ix peering but' f'some expected keys are missing: {peering_info}') return False return True try: address = ipaddress.ip_address(address).exploded except ValueError: raise ClassifierProcessingError( f'unable to parse {address} as an ip address') all_peerings = redis.get(f'juniper-peerings:remote:{address}') if not all_peerings: return None all_peerings = json.loads(all_peerings.decode('utf-8')) ix_peerings = list(filter(_is_ix, all_peerings)) if not ix_peerings: return None if len(ix_peerings) > 1: logger.warning( f'using the first of multiple ix peer matches: {ix_peerings}') peer_info = { 'name': address, 'description': ix_peerings[0]['description'], 'as': { 'local': ix_peerings[0]['local-asn'], 'peer': ix_peerings[0]['remote-asn'] }, 'router': ix_peerings[0]['hostname'] } return_value = { 'peer': peer_info, 'group': [], 'router': [] } # load the other peers in the same group # regex needed??? (e.g. tabs???) peering_group_name = peer_info['description'].split(' ')[0] peering_group = redis.get( f'juniper-peerings:ix-groups:{peering_group_name}') if peering_group: peering_group = peering_group.decode('utf-8') return_value['group'] = sorted(json.loads(peering_group)) # load the other ix peers from the same router router_peerings = redis.get( f'juniper-peerings:hosts:{peer_info["router"]}') router_peerings = json.loads(router_peerings.decode('utf-8')) router_ix_peers = list(filter(_is_ix, router_peerings)) if router_ix_peers: addresses = {p['address'] for p in router_ix_peers} return_value['router'] = sorted(list(addresses)) return return_value def find_interfaces(address): """ TODO: this is probably the least efficient way of doing this (if it's a problem, pre-compute these lists) :param address: an ipaddress object :return: """ r = common.get_current_redis() for k in r.keys('subnets:*'): k = k.decode('utf-8') m = re.match(r'^subnets:(.*)$', k) assert m, 'sanity failure: redis returned an invalid key name' interface = ipaddress.ip_interface(m.group(1)) if address in interface.network: info = r.get(k).decode('utf-8') info = json.loads(info) for ifc in info: yield ifc @routes.route("/peer-info/<address_str>", methods=['GET', 'POST']) @common.require_accepts_json def peer_info(address_str: str) -> Response: """ Handler for /classifier/peer-info that returns bgp peering metadata. The response will be formatted according to the following schema: .. asjson:: inventory_provider.routes.classifier_schema.PEER_INFO_RESPONSE_SCHEMA :param address: string representation of a bgp peer address :return: """ # canonicalize the input address first ... try: address = ipaddress.ip_address(address_str) address_str = address.exploded except ValueError: raise ClassifierProcessingError( f'unable to parse {address_str} as an ip address') r = common.get_current_redis() cache_key = f'classifier-cache:peer:{address_str}' result = _ignore_cache_or_retrieve(request, cache_key, r) if not result: result = { 'interfaces': [], 'locations': [], } ix_peering_info = _ix_peering_info(r, address_str) if ix_peering_info: result['ix-public-peer-info'] = ix_peering_info result['locations'].append(build_locations( _location_from_equipment( get_ims_equipment_name( ix_peering_info['peer']['router']), r))) vpn_rr_peering_info = _vpn_rr_peering_info(r, address_str) if vpn_rr_peering_info: result['vpn-rr-peer-info'] = vpn_rr_peering_info result['locations'].append(build_locations( _location_from_equipment( get_ims_equipment_name(vpn_rr_peering_info['router']), r))) asn_group_info = _asn_group_info(r, address_str) if asn_group_info: result['asn'] = asn_group_info for interface in find_interfaces(address): ims_equipment = get_ims_equipment_name(interface["router"]) ims_interface = get_ims_interface(interface["interface name"]) services_and_locs = get_interface_services_and_locs( ims_equipment, ims_interface, r ) t = {'interface': interface} if services_and_locs.get('services', None): t['services'] = services_and_locs['services'] result['interfaces'].append(t) result['locations'].extend(services_and_locs['locations']) snmp_info = r.get( f'snmp-peerings:remote:{address_str}') if snmp_info: snmp_info = json.loads(snmp_info.decode('utf-8')) result['snmp'] = [ { 'hostname': h['hostname'], 'community': h['community'], 'oid': h['oid'] } for h in snmp_info] result['locations'] = _remove_duplicates_from_list(result['locations']) if not result.get('interfaces', None): result.pop('interfaces', None) result = json.dumps(result) # cache this data for the next call r.set(cache_key, result.encode('utf-8')) return Response(result, mimetype="application/json") @routes.route("/infinera-lambda-info/" "<source_equipment>/<interface>/<circuit_id>", methods=['GET', 'POST']) @common.require_accepts_json def get_trap_metadata(source_equipment: str, interface: str, circuit_id: str) \ -> Response: """ Handler for /classifier/infinera-lambda-info that returns metadata for as DTNX port. The response will be formatted according to the following schema: .. asjson:: inventory_provider.routes.classifier_schema.INFINERA_LAMBDA_INFO_RESPONSE_SCHEMA :param source_equipment: DTNX name :param address: interface/port name :param circuit_id: infinera circuit id :return: """ ims_source_equipment = get_ims_equipment_name(source_equipment) ims_interface = get_ims_interface(interface) cache_key = f'classifier-cache:infinera:{source_equipment}:{interface}' r = common.get_current_redis() result = _ignore_cache_or_retrieve(request, cache_key, r) if not result: result = { 'locations': [] } result.update(get_interface_services_and_locs( ims_source_equipment, ims_interface, r )) if not result: return Response( response="no available info for {} {}".format( source_equipment, interface), status=404, mimetype="text/html") result = json.dumps(result) # cache this data for the next call r.set(cache_key, result.encode('utf-8')) return Response(result, mimetype="application/json") @routes.route("/infinera-fiberlink-info/<ne_name_str>/<object_name_str>", methods=['GET', 'POST']) @common.require_accepts_json def get_fiberlink_trap_metadata(ne_name_str: str, object_name_str: str) \ -> Response: """ Handler for /classifier/infinera-fiberlink-info that returns metadata for a particular opitical path segment. TODO: no schema is declared, and there are no validation tests :param ne_name_str: OLA or DTNX equipment name :param object_name_str: path name :return: """ interfaces = object_name_str.split('_') p = r'([a-zA-Z\d]+?-(OLA|DTNX)\d+(-\d)?)' matches = re.findall(p, ne_name_str) if len(matches) != 2 or len(interfaces) != 2: raise ClassifierProcessingError( f'unable to parse {ne_name_str} {object_name_str} ' 'into two elements') r = common.get_current_redis() # double check that we only need to check the two nodes and not the objects cache_key = \ f'classifier-cache:fiberlink:{ne_name_str}:{object_name_str}' result = _ignore_cache_or_retrieve(request, cache_key, r) if not result: equipment_a = matches[0][0] equipment_b = matches[1][0] interface_a = interfaces[0] interface_b = interfaces[1] circuits_a = \ r.get(f'ims:interface_services:{equipment_a}:{interface_a}') logger.debug(f'ims:interface_services:{equipment_a}:{interface_a}') circuits_b = \ r.get(f'ims:interface_services:{equipment_b}:{interface_b}') logger.debug(f'ims:interface_services:{equipment_b}:{interface_b}') fr_a_ids = set() fr_b_ids = set() all_frs = {} tls_a_ids = set() tls_b_ids = set() all_tls = {} if circuits_a: circuits_a = json.loads(circuits_a.decode('utf-8')) for c in circuits_a: for fr in c['fibre-routes']: fr_a_ids.add(fr['id']) all_frs[fr['id']] = fr for fr in c['top-level-services']: tls_a_ids.add(fr['id']) all_tls[fr['id']] = fr if circuits_b: circuits_b = json.loads(circuits_b.decode('utf-8')) for c in circuits_b: for fr in c['fibre-routes']: fr_b_ids.add(fr['id']) all_frs[fr['id']] = fr for fr in c['top-level-services']: tls_b_ids.add(fr['id']) all_tls[fr['id']] = fr fr_ids = fr_a_ids & fr_b_ids if not fr_ids: fr_ids = fr_a_ids | fr_b_ids fibre_routes = [all_frs[x] for x in fr_ids] tls_ids = tls_a_ids & tls_b_ids if not tls_ids: tls_ids = tls_a_ids | tls_b_ids top_level_services = [all_tls[x] for x in tls_ids] if fibre_routes: location_a = _location_from_equipment(equipment_a, r) location_b = _location_from_equipment(equipment_b, r) if location_a: loc_a = location_a else: loc_a = _LOCATION(equipment_a, '', '') if location_b: loc_b = location_b else: loc_b = _LOCATION(equipment_b, '', '') # added locations in preparation for refactoring to be in-line # with other location data. Once Dashboard has been altered to # use this for fiberlink alarms the 'ends' attribute can be # removed result = { 'locations': [ build_locations(loc_a, loc_b) ], 'ends': { 'a': { 'pop': loc_a['name'], 'pop_abbreviation': loc_a['abbreviation'], 'equipment': loc_a['equipment'] }, 'b': { 'pop': loc_b['name'], 'pop_abbreviation': loc_b['abbreviation'], 'equipment': loc_b['equipment'] }, }, 'df_route': fibre_routes[0], 'related-services': top_level_services } for rs in result['related-services']: rs.pop('id', None) result = json.dumps(result) r.set(cache_key, result) if not result: return Response( response="no available info for " f"{ne_name_str} {object_name_str}", status=404, mimetype="text/html") return Response(result, mimetype="application/json") @routes.route('/coriant-info/<equipment_name>/<path:entity_string>', methods=['GET', 'POST']) @common.require_accepts_json def get_coriant_info(equipment_name: str, entity_string: str) -> Response: """ Handler for /classifier/coriant-info that returns metadata for a coriant path. The response will be formatted according to the following schema: .. asjson:: inventory_provider.routes.classifier_schema.CORIANT_INFO_RESPONSE_SCHEMA :param source_equipment: grv hostname :param entity_string: path name :return: """ r = common.get_current_redis() ims_source_equipment = get_ims_equipment_name(equipment_name) ims_interface = get_ims_interface(entity_string) cache_key = 'classifier-cache:coriant:' \ f'{ims_source_equipment}:{ims_interface}' result = _ignore_cache_or_retrieve(request, cache_key, r) if not result: m = re.match(r'^(\d+\-\d+)\.(\d+)', ims_interface) if not m: logger.error( f'invalid coriant entity string format: {ims_interface}') return Response( response="no available info for " f"'{ims_source_equipment}' '{ims_interface}'", status=404, mimetype="text/html") card_id = m.group(1).replace('-', '/') port = m.group(2) result = { 'equipment name': ims_source_equipment, 'card id': card_id, 'port number': port } interface_name = f'{card_id}/{port}' result.update(get_interface_services_and_locs( ims_source_equipment, interface_name, r )) for s in result.get('services', []): s.pop('card_id', None) s.pop('port', None) s.pop('logical_unit', None) s.pop('other_end_card_id', None) s.pop('other_end_port', None) s.pop('other_end_logical_unit', None) result['locations'] = _remove_duplicates_from_list(result['locations']) result = json.dumps(result) # cache this data for the next call r.set(cache_key, result.encode('utf-8')) return Response(result, mimetype="application/json")