""" Classifier Endpoints ========================= These endpoints are intended for use by Dashboard V3. .. contents:: :local: /classifier/peer-info --------------------- .. autofunction:: inventory_provider.routes.classifier.peer_info /classifier/juniper-link-info ----------------------------- .. autofunction:: inventory_provider.routes.classifier.get_juniper_link_info /classifier/infinera-fiberlink-info ------------------------------------ .. autofunction:: inventory_provider.routes.classifier.get_fiberlink_trap_metadata /classifier/coriant-info ------------------------ .. autofunction:: inventory_provider.routes.classifier.get_coriant_info /classifier/mtc-interface-info -------------------------------- .. autofunction:: inventory_provider.routes.classifier.get_mtc_interface_info """ import ipaddress import json import logging import re from functools import lru_cache from typing import Optional from flask import Blueprint, Response, request from redis import Redis from inventory_provider.routes import common from inventory_provider.routes.common import _ignore_cache_or_retrieve routes = Blueprint("inventory-data-classifier-support-routes", __name__) logger = logging.getLogger(__name__) def _LOCATION(equipment, name, abbreviation): return { 'equipment': equipment, 'name': name, 'abbreviation': abbreviation } def build_locations(loc_a: Optional[dict], loc_b: Optional[dict] = None) \ -> Optional[dict]: locations = None if loc_a: locations = {'a': loc_a} if loc_b: locations['b'] = loc_b return locations def _remove_duplicates_from_list(all_: list) -> list: """ removes duplicates from the input list the list items must be encodable as json :param all_: :return: a new list with unique elements """ tmp_dict = dict( [(json.dumps(item, sort_keys=True), item) for item in all_]) return list(tmp_dict.values()) def _location_from_equipment(equipment_name: str, r: Redis) -> Optional[dict]: k = f'ims:location:{equipment_name}' result = r.get(k) if not result: logger.error(f'error looking up location for key: {k}') return None result = json.loads(result.decode('utf-8')) if not result: logger.error( f'sanity failure: empty list for location {equipment_name}') return None return _LOCATION( equipment=result[0]['equipment-name'], name=result[0]['pop']['name'], abbreviation=result[0]['pop']['abbreviation']) class ClassifierRequestError(Exception): status_code = 500 def __init__(self): super().__init__() self.message = "Unclassified Internal Error" class ClassifierProcessingError(ClassifierRequestError): status_code = 422 def __init__(self, message, status_code=None): super().__init__() self.message = str(message) if status_code is not None: self.status_code = status_code @routes.errorhandler(ClassifierRequestError) def handle_request_error(error): return Response( response=error.message, status=error.status_code) @routes.after_request def after_request(resp): return common.after_request(resp) @lru_cache(256) def get_ims_equipment_name(equipment_name: str, r: Redis = None) -> str: if not r: r = common.get_current_redis() ims_equipment_name = equipment_name.upper() candidates = [ ims_equipment_name.split('.GEANT.')[0], ims_equipment_name, ims_equipment_name.split('.OFFICE.')[0] ] return_value = candidates[0] loc_key = 'ims:location:{}' for c in candidates: if r.exists(loc_key.format(c)): return_value = c break return return_value def get_ims_interface(interface: str) -> str: return interface.upper() def get_interface_services_and_loc(ims_source_equipment, ims_interface, redis): def _format_service(_s): keys = { 'id', 'name', 'status', 'circuit_type', 'service_type', 'project', 'pop_name', 'pop_abbreviation', 'equipment', 'card_id', 'port', 'logical_unit', 'other_end_pop_name', 'other_end_pop_abbreviation', 'other_end_equipment', 'other_end_card_id', 'other_end_port', 'other_end_logical_unit', 'sid' } keys_to_remove = set(_s.keys()) - keys for k in keys_to_remove: _s.pop(k) result = {} raw_services = redis.get( f'ims:interface_services:{ims_source_equipment}:{ims_interface}') if raw_services: # result['services'] = [] related_services = {} services = {} contacts = set() for s in json.loads(raw_services.decode('utf-8')): related_services.update( {r['id']: r for r in s['related-services']}) contacts.update(set(s.pop('contacts', set()))) if s['circuit_type'] == 'service': _format_service(s) services['id'] = s def _sorted_by_name(things_with_name): return sorted( list(things_with_name), key=lambda x: x['name']) result['contacts'] = sorted(list(contacts)) result['services'] = _sorted_by_name(services.values()) result['related-services'] = _sorted_by_name(related_services.values()) if not result['services']: result.pop('services', None) if result['related-services']: for r in result['related-services']: r.pop('id', None) else: result.pop('related-services', None) loc = _location_from_equipment(ims_source_equipment, redis) result['locations'] = [build_locations(loc)] if loc else [] return result def _link_interface_info(r, hostname, interface): """ Generates the 'interface' field for the juniper-link-info response payload. only called from get_juniper_link_info :param r: redis connection :param hostname: router hostname :param interface: interface name :return: payload dict """ ifc_info = r.get(f'netconf-interfaces:{hostname}:{interface}') if not ifc_info: # warning: this should match the structure returned by # juniper:list_interfaces:_ifc_info ifc_info = { 'name': interface, 'description': '', 'bundle': [], 'speed': '', 'ipv4': [], 'ipv6': [] } else: ifc_info = json.loads(ifc_info.decode('utf-8')) bundle_members = r.get( f'netconf-interface-bundles:{hostname}:{interface}') if bundle_members: ifc_info['bundle_members'] \ = json.loads(bundle_members.decode('utf-8')) else: ifc_info['bundle_members'] = [] snmp_info = r.get( f'snmp-interfaces-single:{hostname}:{interface}') if snmp_info: snmp_info = json.loads(snmp_info.decode('utf-8')) ifc_info['snmp'] = { 'index': int(snmp_info['index']) } if 'communities' in snmp_info: ifc_info['snmp']['community'] = snmp_info['communities']['dashboard'] else: ifc_info['snmp']['community'] = snmp_info['community'] return ifc_info @routes.route("/link-info/<source_equipment>/<path:interface>", methods=['GET']) @routes.route("/nokia-link-info/<source_equipment>/<path:interface>", methods=['GET']) @routes.route("/juniper-link-info/<source_equipment>/<path:interface>", methods=['GET']) @common.require_accepts_json def get_juniper_link_info(source_equipment: str, interface: str) -> Response: """ Handler for /classifier/juniper-link-info that returns metadata about an IP interface. The response will be formatted according to the following schema: .. asjson:: inventory_provider.routes.classifier_schema.JUNIPER_LINK_RESPONSE_SCHEMA :param source_equipment: router hostname :param interface: link interface name :return: """ r = common.get_current_redis() ims_source_equipment = get_ims_equipment_name(source_equipment, r) ims_interface = get_ims_interface(interface) cache_key = \ f'classifier-cache:link:{ims_source_equipment}:{ims_interface}' result = _ignore_cache_or_retrieve(request, cache_key, r) if not result: result = { 'interface': _link_interface_info(r, source_equipment, interface) } bundle_members = r.get( f'netconf-interface-bundles:{source_equipment}:{interface}') if bundle_members: result['interface']['bundle_members'] = \ json.loads(bundle_members.decode('utf-8')) else: result['interface']['bundle_members'] = [] result.update( get_interface_services_and_loc( ims_source_equipment, ims_interface, r ) ) # don't want to report related services for bundle members, just the # aggregate # if result['interface']['bundle']: # result['related-services'] = [] result = json.dumps(result) # cache this data for the next call r.set(cache_key, result.encode('utf-8')) return Response(result, mimetype="application/json") def _asn_group_info(redis, address): """ :param redis: a redis db connection :param address: the remote peer address :return: """ try: address = ipaddress.ip_address(address).exploded except ValueError: raise ClassifierProcessingError( f'unable to parse {address} as an ip address') all_peerings = redis.get(f'juniper-peerings:remote:{address}') if not all_peerings: return None all_peerings = json.loads(all_peerings.decode('utf-8')) all_asn = {p['remote-asn'] for p in all_peerings if 'remote-asn' in p} if not all_asn: return None peer_asn = all_asn.pop() if all_asn: logger.error( f'found multiple asn''s for {address}, ' f'using {peer_asn} and ignoring {all_asn}') peerings_this_asn = redis.get(f'juniper-peerings:peer-asn:{peer_asn}') if not peerings_this_asn: logger.error( f'internal data corruption, no peerings found for asn {peer_asn}') return None peerings_this_asn = json.loads(peerings_this_asn.decode('utf-8')) return { 'asn': peer_asn, 'peers': [ { 'router': p['hostname'], 'address': p['address'], # for future use (in case isolation # should be computed per peering type) 'group': p['group'] } for p in peerings_this_asn ] } def _vpn_rr_peering_info(redis, address): """ :param redis: a redis db connection :param address: the remote peer address :return: """ def _is_rr(peering_info): if peering_info.get('logical-system', '') != 'VRR': return False group = peering_info.get('group', '') if group not in ('VPN-RR', 'VPN-RR-INTERNAL'): return False if 'description' not in peering_info: logger.error('internal data error, looks like vpn rr peering' f'but description is missing: {peering_info}') return False return True try: address = ipaddress.ip_address(address).exploded except ValueError: raise ClassifierProcessingError( f'unable to parse {address} as an ip address') all_peerings = redis.get(f'juniper-peerings:remote:{address}') if not all_peerings: return None all_peerings = json.loads(all_peerings.decode('utf-8')) rr_peerings = list(filter(_is_rr, all_peerings)) if not rr_peerings: return None if len(rr_peerings) > 1: logger.warning( f'using the first of multiple vpn rr peer matches: {rr_peerings}') return_value = { 'name': address, 'description': rr_peerings[0]['description'], 'router': rr_peerings[0]['hostname'] } if 'remote-asn' in rr_peerings[0]: return_value['peer-as'] = rr_peerings[0]['remote-asn'] return return_value def _ix_peering_info(redis, address): """ :param redis: a redis db connection :param address: the remote peer address :return: """ def _is_ix(peering_info): if peering_info.get('instance', '') != 'IAS': return False if not peering_info.get('group', '').startswith('GEANT-IX'): return False expected_keys = ('description', 'local-asn', 'remote-asn') if any(peering_info.get(x, None) is None for x in expected_keys): logger.error('internal data error, looks like ix peering but' f'some expected keys are missing: {peering_info}') return False return True try: address = ipaddress.ip_address(address).exploded except ValueError: raise ClassifierProcessingError( f'unable to parse {address} as an ip address') all_peerings = redis.get(f'juniper-peerings:remote:{address}') if not all_peerings: return None all_peerings = json.loads(all_peerings.decode('utf-8')) ix_peerings = list(filter(_is_ix, all_peerings)) if not ix_peerings: return None if len(ix_peerings) > 1: logger.warning( f'using the first of multiple ix peer matches: {ix_peerings}') peer_info = { 'name': address, 'description': ix_peerings[0]['description'], 'as': { 'local': ix_peerings[0]['local-asn'], 'peer': ix_peerings[0]['remote-asn'] }, 'router': ix_peerings[0]['hostname'] } return_value = { 'peer': peer_info, 'group': [], 'router': [] } # load the other peers in the same group # regex needed??? (e.g. tabs???) peering_group_name = peer_info['description'].split(' ')[0] peering_group = redis.get( f'juniper-peerings:ix-groups:{peering_group_name}') if peering_group: peering_group = peering_group.decode('utf-8') return_value['group'] = sorted(json.loads(peering_group)) # load the other ix peers from the same router router_peerings = redis.get( f'juniper-peerings:hosts:{peer_info["router"]}') router_peerings = json.loads(router_peerings.decode('utf-8')) router_ix_peers = list(filter(_is_ix, router_peerings)) if router_ix_peers: addresses = {p['address'] for p in router_ix_peers} return_value['router'] = sorted(list(addresses)) return return_value def find_interfaces(address): """ TODO: this is probably the least efficient way of doing this (if it's a problem, pre-compute these lists) :param address: an ipaddress object :return: """ r = common.get_current_redis() for k in r.keys('subnets:*'): k = k.decode('utf-8') m = re.match(r'^subnets:(.*)$', k) assert m, 'sanity failure: redis returned an invalid key name' interface = ipaddress.ip_interface(m.group(1)) if address in interface.network: info = r.get(k).decode('utf-8') info = json.loads(info) for ifc in info: yield ifc @routes.route("/peer-info/<address_str>", methods=['GET']) @common.require_accepts_json def peer_info(address_str: str) -> Response: """ Handler for /classifier/peer-info that returns bgp peering metadata. The response will be formatted according to the following schema: .. asjson:: inventory_provider.routes.classifier_schema.PEER_INFO_RESPONSE_SCHEMA :param address: string representation of a bgp peer address :return: """ # canonicalize the input address first ... try: address = ipaddress.ip_address(address_str) address_str = address.exploded except ValueError: raise ClassifierProcessingError( f'unable to parse {address_str} as an ip address') r = common.get_current_redis() cache_key = f'classifier-cache:peer:{address_str}' result = _ignore_cache_or_retrieve(request, cache_key, r) result = None if not result: result = { 'interfaces': [], 'locations': [], } ix_peering_info = _ix_peering_info(r, address_str) if ix_peering_info: result['ix-public-peer-info'] = ix_peering_info result['locations'].append(build_locations( _location_from_equipment( get_ims_equipment_name( ix_peering_info['peer']['router'], r), r))) vpn_rr_peering_info = _vpn_rr_peering_info(r, address_str) if vpn_rr_peering_info: result['vpn-rr-peer-info'] = vpn_rr_peering_info result['locations'].append(build_locations( _location_from_equipment( get_ims_equipment_name( vpn_rr_peering_info['router'], r), r))) asn_group_info = _asn_group_info(r, address_str) if asn_group_info: result['asn'] = asn_group_info contacts = set() for interface in find_interfaces(address): ims_equipment = get_ims_equipment_name(interface["router"], r) ims_interface = get_ims_interface(interface["interface name"]) services_and_locs = get_interface_services_and_loc( ims_equipment, ims_interface, r ) t = {'interface': interface} if services_and_locs.get('services', None): t['services'] = services_and_locs['services'] contacts.update(services_and_locs['contacts']) result['interfaces'].append(t) result['locations'].extend(services_and_locs['locations']) snmp_info = r.get( f'snmp-peerings:remote:{address_str}') if snmp_info: snmp_info = json.loads(snmp_info.decode('utf-8')) result['snmp'] = [ { 'hostname': h['hostname'], 'community': h['community'], 'oid': h['oid'] } for h in snmp_info] result['locations'] = _remove_duplicates_from_list(result['locations']) result['contacts'] = sorted(list(contacts)) if not result.get('interfaces', None): result.pop('interfaces', None) result = json.dumps(result) # cache this data for the next call r.set(cache_key, result.encode('utf-8')) return Response(result, mimetype="application/json") @routes.route( "/mtc-interface-info/<node>/<interface>", methods=['GET']) @common.require_accepts_json def get_mtc_interface_info(node, interface): """ Handler for /classifier/mtc-interface-info that returns metadata for as MTC port. The response will be formatted according to the following schema: .. asjson:: inventory_provider.routes.classifier_schema.MTC_INTERFACE_INFO_RESPONSE_SCHEMA :param node: :param element: :return: element info """ r = common.get_current_redis() cache_key = f'classifier-cache:mtc:{node}:{interface}' result = _ignore_cache_or_retrieve(request, cache_key, r) if not result: result = get_interface_services_and_loc(node, interface, r) if not result['locations'] and 'services' not in result: result = {} if not result: return Response( response="no available info for " f"{node} {interface}", status=404, mimetype="text/html") return Response(json.dumps(result), mimetype="application/json") @routes.route("/infinera-lambda-info/" "<source_equipment>/<interface>/<circuit_id>", methods=['GET']) @common.require_accepts_json def get_trap_metadata(source_equipment: str, interface: str, circuit_id: str) \ -> Response: """ Handler for /classifier/infinera-lambda-info that returns metadata for a DTNX port. The response will be formatted according to the following schema: .. asjson:: inventory_provider.routes.classifier_schema.INFINERA_LAMBDA_INFO_RESPONSE_SCHEMA :param source_equipment: DTNX name :param address: interface/port name :param circuit_id: infinera circuit id :return: """ r = common.get_current_redis() ims_source_equipment = get_ims_equipment_name(source_equipment, r) ims_interface = get_ims_interface(interface) cache_key = f'classifier-cache:infinera:{source_equipment}:{interface}' result = _ignore_cache_or_retrieve(request, cache_key, r) if not result: result = {} result.update(get_interface_services_and_loc( ims_source_equipment, ims_interface, r )) # rubbish hack to cope with Dashboard Classifier # removing piece of interface name. Once Dashboard # is fully driven by IMS we can remove that # part of the classifier and then this # As of 2021-04-21 the Dashboard line that will need removing is # https://gitlab.geant.net/live-projects/dashboardv3/dashboard-v3-python/-/blob/develop/dashboard/classification/classifier.py#L207 # noqa: E501 if not result or not result.get('services', []): interface_parts = interface.split('-') interface_parts[-2] = 'T' + interface_parts[-2] t_interface = '-'.join(interface_parts) result.update(get_interface_services_and_loc( ims_source_equipment, t_interface, r )) if not result or not result.get('services', []): interface_parts = interface.split('-') interface_parts[-1] = 'T' + interface_parts[-1] t_interface = '-'.join(interface_parts) result.update(get_interface_services_and_loc( ims_source_equipment, t_interface, r )) if result: order = ['operational', 'installed', 'planned'] gl = list(filter( lambda x: x['service_type'] == 'GEANT LAMBDA' and x['status'] in order, result.get('services', []))) if gl: gl.sort(key=lambda x: order.index(x['status'])) result['geant-lambda'] = { 'id': gl[0]['id'], 'name': gl[0]['name'], 'status': gl[0]['status'], 'project': gl[0]['project'] } if not result: return Response( response="no available info for {} {}".format( source_equipment, interface), status=404, mimetype="text/html") result = json.dumps(result) # cache this data for the next call r.set(cache_key, result.encode('utf-8')) return Response(result, mimetype="application/json") @routes.route("/infinera-fiberlink-info/<ne_name_str>/<object_name_str>", methods=['GET']) @common.require_accepts_json def get_fiberlink_trap_metadata(ne_name_str: str, object_name_str: str) \ -> Response: """ Handler for /classifier/infinera-fiberlink-info that returns metadata for a particular opitical path segment. TODO: no schema is declared, and there are no validation tests :param ne_name_str: OLA or DTNX equipment name :param object_name_str: path name :return: """ interfaces = object_name_str.split('_') p = r'([a-zA-Z\d]+?-(OLA|DTNX)\d+(-\d)?)' matches = re.findall(p, ne_name_str) if len(matches) != 2 or len(interfaces) != 2: raise ClassifierProcessingError( f'unable to parse {ne_name_str} {object_name_str} ' 'into two elements') r = common.get_current_redis() # double check that we only need to check the two nodes and not the objects cache_key = \ f'classifier-cache:fiberlink:{ne_name_str}:{object_name_str}' result = _ignore_cache_or_retrieve(request, cache_key, r) if not result: equipment_a = matches[0][0] equipment_b = matches[1][0] interface_a = interfaces[0] interface_b = interfaces[1] circuits_a = \ r.get(f'ims:interface_services:{equipment_a}:{interface_a}') logger.debug(f'ims:interface_services:{equipment_a}:{interface_a}') circuits_b = \ r.get(f'ims:interface_services:{equipment_b}:{interface_b}') logger.debug(f'ims:interface_services:{equipment_b}:{interface_b}') fr_a_ids = set() fr_b_ids = set() all_frs = {} tls_a_ids = set() tls_b_ids = set() all_tls = {} contacts_a = set() contacts_b = set() if circuits_a: circuits_a = json.loads(circuits_a.decode('utf-8')) for c in circuits_a: contacts_a.update(c['contacts']) for fr in c['fibre-routes']: fr_a_ids.add(fr['id']) all_frs[fr['id']] = fr for fr in c['related-services']: tls_a_ids.add(fr['id']) all_tls[fr['id']] = fr if circuits_b: circuits_b = json.loads(circuits_b.decode('utf-8')) for c in circuits_b: contacts_b.update(c['contacts']) for fr in c['fibre-routes']: fr_b_ids.add(fr['id']) all_frs[fr['id']] = fr for fr in c['related-services']: tls_b_ids.add(fr['id']) all_tls[fr['id']] = fr fr_ids = fr_a_ids & fr_b_ids if not fr_ids: fr_ids = fr_a_ids | fr_b_ids fibre_routes = [all_frs[x] for x in fr_ids] tls_ids = tls_a_ids & tls_b_ids if not tls_ids: tls_ids = tls_a_ids | tls_b_ids contacts = contacts_a | contacts_b else: contacts = contacts_a & contacts_b top_level_services = [all_tls[x] for x in tls_ids] if fibre_routes: location_a = _location_from_equipment(equipment_a, r) location_b = _location_from_equipment(equipment_b, r) if location_a: loc_a = location_a else: loc_a = _LOCATION(equipment_a, '', '') if location_b: loc_b = location_b else: loc_b = _LOCATION(equipment_b, '', '') # added locations in preparation for refactoring to be in-line # with other location data. Once Dashboard has been altered to # use this for fiberlink alarms the 'ends' attribute can be # removed result = { 'locations': [ build_locations(loc_a, loc_b) ], 'ends': { 'a': { 'pop': loc_a['name'], 'pop_abbreviation': loc_a['abbreviation'], 'equipment': loc_a['equipment'] }, 'b': { 'pop': loc_b['name'], 'pop_abbreviation': loc_b['abbreviation'], 'equipment': loc_b['equipment'] }, }, 'df_route': fibre_routes[0], 'related-services': top_level_services, 'contacts': sorted(list(contacts)) } for rs in result['related-services']: rs.pop('id', None) result = json.dumps(result) r.set(cache_key, result) if not result: return Response( response="no available info for " f"{ne_name_str} {object_name_str}", status=404, mimetype="text/html") return Response(result, mimetype="application/json") @routes.route("/tnms-fibre-info/<path:enms_pc_name>", methods=['GET']) @common.require_accepts_json def get_tnms_fibre_trap_metadata(enms_pc_name: str) -> Response: """ Handler for /classifier/infinera-fiberlink-info that returns metadata for a particular opitical path segment. The response will be formatted according to the following schema: .. asjson:: inventory_provider.routes.classifier_schema.TNMS_FIBERLINK_INFO_RESPONSE_SCHEMA :param enms_pc_name: both node names separated by a forward slash :return: """ # noqa: E501 r = common.get_current_redis() # double check that we only need to check the two nodes and not the objects cache_key = f'classifier-cache:fiberlink:{enms_pc_name}' result = _ignore_cache_or_retrieve(request, cache_key, r) if not result: logger.debug("1") try: equipment_a, equipment_b = enms_pc_name.split("/") logger.debug(f"{equipment_a} - {equipment_b}") except ValueError: raise ClassifierProcessingError( f'unable to parse {enms_pc_name} ' 'into two elements') circuits = r.get(f'ims:node_pair_services:{enms_pc_name}') all_frs = {} all_tls = {} contacts = set() if circuits: circuits = json.loads(circuits.decode('utf-8')) for c in circuits: contacts.update(c['contacts']) for fr in c['fibre-routes']: all_frs[fr['id']] = fr for fr in c['related-services']: all_tls[fr['id']] = fr fibre_routes = list(all_frs.values()) top_level_services = list(all_tls.values()) if fibre_routes: location_a = _location_from_equipment(equipment_a, r) location_b = _location_from_equipment(equipment_b, r) if location_a: loc_a = location_a else: loc_a = _LOCATION(equipment_a, '', '') if location_b: loc_b = location_b else: loc_b = _LOCATION(equipment_b, '', '') # added locations in preparation for refactoring to be in-line # with other location data. Once Dashboard has been altered to # use this for fiberlink alarms the 'ends' attribute can be # removed result = { 'locations': [ build_locations(loc_a, loc_b) ], 'ends': { 'a': { 'pop': loc_a['name'], 'pop_abbreviation': loc_a['abbreviation'], 'equipment': loc_a['equipment'] }, 'b': { 'pop': loc_b['name'], 'pop_abbreviation': loc_b['abbreviation'], 'equipment': loc_b['equipment'] }, }, 'df_route': fibre_routes[0], 'related-services': top_level_services, 'contacts': sorted(list(contacts)) } for rs in result['related-services']: rs.pop('id', None) result = json.dumps(result) r.set(cache_key, result) if not result: return Response( response=f"no available info for {enms_pc_name}", status=404, mimetype="text/html") return Response(result, mimetype="application/json") @routes.route('/coriant-info/<equipment_name>/<path:entity_string>', methods=['GET']) @common.require_accepts_json def get_coriant_info(equipment_name: str, entity_string: str) -> Response: """ Handler for /classifier/coriant-info that returns metadata for a coriant path. The response will be formatted according to the following schema: .. asjson:: inventory_provider.routes.classifier_schema.CORIANT_INFO_RESPONSE_SCHEMA :param source_equipment: grv hostname :param entity_string: path name :return: """ r = common.get_current_redis() ims_source_equipment = get_ims_equipment_name(equipment_name, r) ims_interface = get_ims_interface(entity_string) cache_key = 'classifier-cache:coriant:' \ f'{ims_source_equipment}:{ims_interface}' result = _ignore_cache_or_retrieve(request, cache_key, r) if not result: m = re.match(r'^(\d+\-\d+)\.(\d+)', ims_interface) if not m: logger.error( f'invalid coriant entity string format: {ims_interface}') return Response( response="no available info for " f"'{ims_source_equipment}' '{ims_interface}'", status=404, mimetype="text/html") card_id = m.group(1).replace('-', '/') port = m.group(2) result = { 'equipment name': ims_source_equipment, 'card id': card_id, 'port number': port } interface_name = f'{card_id}/{port}' result.update(get_interface_services_and_loc( ims_source_equipment, interface_name, r )) services = result.get('services', []) for s in services: s.pop('card_id', None) s.pop('port', None) s.pop('logical_unit', None) s.pop('other_end_card_id', None) s.pop('other_end_port', None) s.pop('other_end_logical_unit', None) p = { 'category': '', 'circuit_type': '', 'service_type': '', 'peering_type': '', 'status': '', 'name': '', 'a': { 'equipment name': '', 'card id': '', 'port number': '', 'pop': { 'name': '', 'city': '', 'country': '', 'abbreviation': '', 'longitude': 0, 'latitude': 0, } } } if services: s = services[0] p['status'] = s['status'] p['name'] = s['name'] p['a']['equipment name'] = ims_source_equipment p['a']['card id'] = card_id p['a']['port number'] = port p['a']['pop']['name'] = s.get('pop_name', '') p['a']['pop']['city'] = s.get('pop_name', '') p['a']['pop']['abbreviation'] = s.get('pop_abbreviation', '') p['b'] = p['a'] result['path'] = p result['locations'] = _remove_duplicates_from_list(result['locations']) result = json.dumps(result) # cache this data for the next call r.set(cache_key, result.encode('utf-8')) return Response(result, mimetype="application/json")