-
Robert Latta authoredRobert Latta authored
classifier.py 26.59 KiB
import ipaddress
import json
import logging
import re
from typing import Optional
from flask import Blueprint, Response, request
from redis import Redis
from inventory_provider.routes import common
from inventory_provider.routes.common import _ignore_cache_or_retrieve
routes = Blueprint("inventory-data-classifier-support-routes", __name__)
logger = logging.getLogger(__name__)
def _LOCATION(equipment, name, abbreviation):
return {
'equipment': equipment,
'name': name,
'abbreviation': abbreviation
}
def build_locations(loc_a: Optional[dict], loc_b: Optional[dict] = None) \
-> Optional[dict]:
locations = None
if loc_a:
locations = {'a': loc_a}
if loc_b:
locations['b'] = loc_b
return locations
def _remove_duplicates_from_list(all_: list) -> list:
"""
removes duplicates from the input list
the list items must be encodable as json
:param all_:
:return: a new list with unique elements
"""
tmp_dict = dict(
[(json.dumps(item, sort_keys=True), item) for item in all_])
return list(tmp_dict.values())
def _location_from_equipment(equipment_name: str, r: Redis) -> Optional[dict]:
result = r.get(f'ims:location:{equipment_name}')
if not result:
logger.error(f'error looking up location for {equipment_name}')
return None
result = json.loads(result.decode('utf-8'))
if not result:
logger.error(
f'sanity failure: empty list for location {equipment_name}')
return None
return _LOCATION(
equipment=result[0]['equipment-name'],
name=result[0]['pop']['name'],
abbreviation=result[0]['pop']['abbreviation'])
def _location_from_services(services, r: Redis):
for s in services:
loc_a = _location_from_equipment(s['equipment'], r)
loc_b = _location_from_equipment(s['other_end_equipment'], r) \
if s['other_end_equipment'] else None
yield build_locations(loc_a, loc_b)
class ClassifierRequestError(Exception):
status_code = 500
def __init__(self):
super().__init__()
self.message = "Unclassified Internal Error"
class ClassifierProcessingError(ClassifierRequestError):
status_code = 422
def __init__(self, message, status_code=None):
super().__init__()
self.message = str(message)
if status_code is not None:
self.status_code = status_code
@routes.errorhandler(ClassifierRequestError)
def handle_request_error(error):
return Response(
response=error.message,
status=error.status_code)
@routes.after_request
def after_request(resp):
return common.after_request(resp)
def get_ims_equipment_name(equipment_name: str) -> str:
ims_equipment_name = equipment_name.upper()
if ims_equipment_name.startswith('MX'):
ims_equipment_name = ims_equipment_name.split('.GEANT.NET')[0]
return ims_equipment_name
def get_ims_interface(interface: str) -> str:
return interface.upper()
def related_interfaces(hostname, interface):
r = common.get_current_redis()
prefix = f'netconf-interfaces:{hostname}:'
for k in r.keys(f'{prefix}{interface}.*'):
k = k.decode('utf-8')
assert k.startswith(prefix) # sanity
assert len(k) > len(prefix) # sanity (contains at least an interface)
yield k[len(prefix):]
def get_related_services(source_equipment: str, interface: str, r) -> dict:
"""
Finds the top-level-services for the given interface
and also gets the top-level-services for the related interfaces
e.g. ae20 will also find services on all logical units of ae20 (ae20.1 ...)
:param source_equipment: equipment name
:param interface: interface name
:param r: redis connection
:return: Dict
"""
ims_source_equipment = get_ims_equipment_name(source_equipment)
ims_interface = get_ims_interface(interface)
if_services = r.get(f'ims:interface_services:{ims_source_equipment}:'
f'{ims_interface}')
if if_services:
for s in json.loads(if_services.decode('utf-8')):
yield from s['top-level-services']
for related in related_interfaces(source_equipment, interface):
ims_interface = get_ims_interface(related)
rif_services = r.get(
f'ims:interface_services:{ims_source_equipment}:{ims_interface}')
if rif_services:
for s in json.loads(rif_services.decode('utf-8')):
yield from s['top-level-services']
def get_interface_services_and_locs(ims_source_equipment, ims_interface, r):
def _format_service(_s):
keys = {
'id', 'name', 'status', 'circuit_type', 'service_type',
'project', 'pop_name', 'pop_abbreviation', 'equipment',
'card_id', 'port', 'logical_unit', 'other_end_pop_name',
'other_end_pop_abbreviation', 'other_end_equipment',
'other_end_card_id', 'other_end_port', 'other_end_logical_unit'}
keys_to_remove = set(_s.keys()) - keys
for k in keys_to_remove:
_s.pop(k)
result = {
'locations': []
}
raw_services = r.get(
f'ims:interface_services:{ims_source_equipment}:{ims_interface}')
if raw_services:
result['services'] = json.loads(raw_services.decode('utf-8'))
for s in result['services']:
_format_service(s)
result['related-services'] = \
list(get_related_services(ims_source_equipment, ims_interface, r))
result['locations'] = \
list(_location_from_services(result['services'], r))
if not result['services']:
result.pop('services', None)
if result['related-services']:
for r in result['related-services']:
r.pop('id', None)
else:
result.pop('related-services', None)
if not result.get('locations', None):
locations = build_locations(
_location_from_equipment(ims_source_equipment, r))
result['locations'] = [locations] if locations else []
result['locations'] = _remove_duplicates_from_list(result['locations'])
return result
def _link_interface_info(r, hostname, interface):
"""
Generates the 'interface' field for
the juniper-link-info response payload.
only called from get_juniper_link_info
:param r: redis connection
:param hostname: router hostname
:param interface: interface name
:return: payload dict
"""
ifc_info = r.get(f'netconf-interfaces:{hostname}:{interface}')
if not ifc_info:
# warning: this should match the structure returned by
# juniper:list_interfaces:_ifc_info
ifc_info = {
'name': interface,
'description': '',
'bundle': [],
'ipv4': [],
'ipv6': []
}
else:
ifc_info = json.loads(ifc_info.decode('utf-8'))
bundle_members = r.get(
f'netconf-interface-bundles:{hostname}:{interface}')
if bundle_members:
ifc_info['bundle_members'] \
= json.loads(bundle_members.decode('utf-8'))
else:
ifc_info['bundle_members'] = []
snmp_info = r.get(
f'snmp-interfaces-single:{hostname}:{interface}')
if snmp_info:
snmp_info = json.loads(snmp_info.decode('utf-8'))
ifc_info['snmp'] = {
'community': snmp_info['community'],
'index': snmp_info['index']
}
return ifc_info
@routes.route("/juniper-link-info/<source_equipment>/<path:interface>",
methods=['GET', 'POST'])
@common.require_accepts_json
def get_juniper_link_info(source_equipment: str, interface: str) -> Response:
"""
Handler for /classifier/juniper-link-info that
returns metadata about an IP interface.
The response will be formatted according to the following schema:
.. asjson::
inventory_provider.routes.classifier_schema.JUNIPER_LINK_RESPONSE_SCHEMA
:param source_equipment: router hostname
:param interface: link interface name
:return:
"""
ims_source_equipment = get_ims_equipment_name(source_equipment)
ims_interface = get_ims_interface(interface)
r = common.get_current_redis()
cache_key = \
f'classifier-cache:juniper:{ims_source_equipment}:{ims_interface}'
result = _ignore_cache_or_retrieve(request, cache_key, r)
if not result:
result = {
'interface': _link_interface_info(r, source_equipment, interface)
}
bundle_members = r.get(
f'netconf-interface-bundles:{source_equipment}:{interface}')
if bundle_members:
result['interface']['bundle_members'] = \
json.loads(bundle_members.decode('utf-8'))
else:
result['interface']['bundle_members'] = []
result.update(
get_interface_services_and_locs(
ims_source_equipment,
ims_interface,
r
)
)
result = json.dumps(result)
# cache this data for the next call
r.set(cache_key, result.encode('utf-8'))
return Response(result, mimetype="application/json")
def _asn_group_info(redis, address):
"""
:param redis: a redis db connection
:param address: the remote peer address
:return:
"""
try:
address = ipaddress.ip_address(address).exploded
except ValueError:
raise ClassifierProcessingError(
f'unable to parse {address} as an ip address')
all_peerings = redis.get(f'juniper-peerings:remote:{address}')
if not all_peerings:
return None
all_peerings = json.loads(all_peerings.decode('utf-8'))
all_asn = {p['remote-asn'] for p in all_peerings if 'remote-asn' in p}
if not all_asn:
return None
peer_asn = all_asn.pop()
if all_asn:
logger.error(
f'found multiple asn''s for {address}, '
f'using {peer_asn} and ignoring {all_asn}')
peerings_this_asn = redis.get(f'juniper-peerings:peer-asn:{peer_asn}')
if not peerings_this_asn:
logger.error(
f'internal data corruption, no peerings found for asn {peer_asn}')
return None
peerings_this_asn = json.loads(peerings_this_asn.decode('utf-8'))
return {
'asn': peer_asn,
'peers': [
{
'router': p['hostname'],
'address': p['address'],
# for future use (in case isolation
# should be computed per peering type)
'group': p['group']
}
for p in peerings_this_asn
]
}
def _vpn_rr_peering_info(redis, address):
"""
:param redis: a redis db connection
:param address: the remote peer address
:return:
"""
def _is_rr(peering_info):
if peering_info.get('logical-system', '') != 'VRR':
return False
group = peering_info.get('group', '')
if group not in ('VPN-RR', 'VPN-RR-INTERNAL'):
return False
if 'description' not in peering_info:
logger.error('internal data error, looks like vpn rr peering'
f'but description is missing: {peering_info}')
return False
return True
try:
address = ipaddress.ip_address(address).exploded
except ValueError:
raise ClassifierProcessingError(
f'unable to parse {address} as an ip address')
all_peerings = redis.get(f'juniper-peerings:remote:{address}')
if not all_peerings:
return None
all_peerings = json.loads(all_peerings.decode('utf-8'))
rr_peerings = list(filter(_is_rr, all_peerings))
if not rr_peerings:
return None
if len(rr_peerings) > 1:
logger.warning(
f'using the first of multiple vpn rr peer matches: {rr_peerings}')
return_value = {
'name': address,
'description': rr_peerings[0]['description'],
'router': rr_peerings[0]['hostname']
}
if 'remote-asn' in rr_peerings[0]:
return_value['peer-as'] = rr_peerings[0]['remote-asn']
return return_value
def _ix_peering_info(redis, address):
"""
:param redis: a redis db connection
:param address: the remote peer address
:return:
"""
def _is_ix(peering_info):
if peering_info.get('instance', '') != 'IAS':
return False
if not peering_info.get('group', '').startswith('GEANT-IX'):
return False
expected_keys = ('description', 'local-asn', 'remote-asn')
if any(peering_info.get(x, None) is None for x in expected_keys):
logger.error('internal data error, looks like ix peering but'
f'some expected keys are missing: {peering_info}')
return False
return True
try:
address = ipaddress.ip_address(address).exploded
except ValueError:
raise ClassifierProcessingError(
f'unable to parse {address} as an ip address')
all_peerings = redis.get(f'juniper-peerings:remote:{address}')
if not all_peerings:
return None
all_peerings = json.loads(all_peerings.decode('utf-8'))
ix_peerings = list(filter(_is_ix, all_peerings))
if not ix_peerings:
return None
if len(ix_peerings) > 1:
logger.warning(
f'using the first of multiple ix peer matches: {ix_peerings}')
peer_info = {
'name': address,
'description': ix_peerings[0]['description'],
'as': {
'local': ix_peerings[0]['local-asn'],
'peer': ix_peerings[0]['remote-asn']
},
'router': ix_peerings[0]['hostname']
}
return_value = {
'peer': peer_info,
'group': [],
'router': []
}
# load the other peers in the same group
# regex needed??? (e.g. tabs???)
peering_group_name = peer_info['description'].split(' ')[0]
peering_group = redis.get(
f'juniper-peerings:ix-groups:{peering_group_name}')
if peering_group:
peering_group = peering_group.decode('utf-8')
return_value['group'] = sorted(json.loads(peering_group))
# load the other ix peers from the same router
router_peerings = redis.get(
f'juniper-peerings:hosts:{peer_info["router"]}')
router_peerings = json.loads(router_peerings.decode('utf-8'))
router_ix_peers = list(filter(_is_ix, router_peerings))
if router_ix_peers:
addresses = {p['address'] for p in router_ix_peers}
return_value['router'] = sorted(list(addresses))
return return_value
def find_interfaces(address):
"""
TODO: this is probably the least efficient way of doing this
(if it's a problem, pre-compute these lists)
:param address: an ipaddress object
:return:
"""
r = common.get_current_redis()
for k in r.keys('subnets:*'):
k = k.decode('utf-8')
m = re.match(r'^subnets:(.*)$', k)
assert m, 'sanity failure: redis returned an invalid key name'
interface = ipaddress.ip_interface(m.group(1))
if address in interface.network:
info = r.get(k).decode('utf-8')
info = json.loads(info)
for ifc in info:
yield ifc
@routes.route("/peer-info/<address_str>", methods=['GET', 'POST'])
@common.require_accepts_json
def peer_info(address_str: str) -> Response:
"""
Handler for /classifier/peer-info that returns bgp peering metadata.
The response will be formatted according to the following schema:
.. asjson::
inventory_provider.routes.classifier_schema.PEER_INFO_RESPONSE_SCHEMA
:param address: string representation of a bgp peer address
:return:
"""
# canonicalize the input address first ...
try:
address = ipaddress.ip_address(address_str)
address_str = address.exploded
except ValueError:
raise ClassifierProcessingError(
f'unable to parse {address_str} as an ip address')
r = common.get_current_redis()
cache_key = f'classifier-cache:peer:{address_str}'
result = _ignore_cache_or_retrieve(request, cache_key, r)
if not result:
result = {
'interfaces': [],
'locations': [],
}
ix_peering_info = _ix_peering_info(r, address_str)
if ix_peering_info:
result['ix-public-peer-info'] = ix_peering_info
result['locations'].append(build_locations(
_location_from_equipment(
get_ims_equipment_name(
ix_peering_info['peer']['router']), r)))
vpn_rr_peering_info = _vpn_rr_peering_info(r, address_str)
if vpn_rr_peering_info:
result['vpn-rr-peer-info'] = vpn_rr_peering_info
result['locations'].append(build_locations(
_location_from_equipment(
get_ims_equipment_name(vpn_rr_peering_info['router']), r)))
asn_group_info = _asn_group_info(r, address_str)
if asn_group_info:
result['asn'] = asn_group_info
for interface in find_interfaces(address):
ims_equipment = get_ims_equipment_name(interface["router"])
ims_interface = get_ims_interface(interface["interface name"])
services_and_locs = get_interface_services_and_locs(
ims_equipment,
ims_interface,
r
)
t = {'interface': interface}
if services_and_locs.get('services', None):
t['services'] = services_and_locs['services']
result['interfaces'].append(t)
result['locations'].extend(services_and_locs['locations'])
snmp_info = r.get(
f'snmp-peerings:remote:{address_str}')
if snmp_info:
snmp_info = json.loads(snmp_info.decode('utf-8'))
result['snmp'] = [
{
'hostname': h['hostname'],
'community': h['community'],
'oid': h['oid']
} for h in snmp_info]
result['locations'] = _remove_duplicates_from_list(result['locations'])
if not result.get('interfaces', None):
result.pop('interfaces', None)
result = json.dumps(result)
# cache this data for the next call
r.set(cache_key, result.encode('utf-8'))
return Response(result, mimetype="application/json")
@routes.route("/infinera-lambda-info/"
"<source_equipment>/<interface>/<circuit_id>",
methods=['GET', 'POST'])
@common.require_accepts_json
def get_trap_metadata(source_equipment: str, interface: str, circuit_id: str) \
-> Response:
"""
Handler for /classifier/infinera-lambda-info that
returns metadata for as DTNX port.
The response will be formatted according to the following schema:
.. asjson::
inventory_provider.routes.classifier_schema.INFINERA_LAMBDA_INFO_RESPONSE_SCHEMA
:param source_equipment: DTNX name
:param address: interface/port name
:param circuit_id: infinera circuit id
:return:
"""
ims_source_equipment = get_ims_equipment_name(source_equipment)
ims_interface = get_ims_interface(interface)
cache_key = f'classifier-cache:infinera:{source_equipment}:{interface}'
r = common.get_current_redis()
result = _ignore_cache_or_retrieve(request, cache_key, r)
if not result:
result = {
'locations': []
}
result.update(get_interface_services_and_locs(
ims_source_equipment,
ims_interface,
r
))
if not result:
return Response(
response="no available info for {} {}".format(
source_equipment, interface),
status=404,
mimetype="text/html")
result = json.dumps(result)
# cache this data for the next call
r.set(cache_key, result.encode('utf-8'))
return Response(result, mimetype="application/json")
@routes.route("/infinera-fiberlink-info/<ne_name_str>/<object_name_str>",
methods=['GET', 'POST'])
@common.require_accepts_json
def get_fiberlink_trap_metadata(ne_name_str: str, object_name_str: str) \
-> Response:
"""
Handler for /classifier/infinera-fiberlink-info that
returns metadata for a particular opitical path segment.
TODO: no schema is declared, and there are no validation tests
:param ne_name_str: OLA or DTNX equipment name
:param object_name_str: path name
:return:
"""
interfaces = object_name_str.split('_')
p = r'([a-zA-Z\d]+?-(OLA|DTNX)\d+(-\d)?)'
matches = re.findall(p, ne_name_str)
if len(matches) != 2 or len(interfaces) != 2:
raise ClassifierProcessingError(
f'unable to parse {ne_name_str} {object_name_str} '
'into two elements')
r = common.get_current_redis()
# double check that we only need to check the two nodes and not the objects
cache_key = \
f'classifier-cache:fiberlink:{ne_name_str}:{object_name_str}'
result = _ignore_cache_or_retrieve(request, cache_key, r)
if not result:
equipment_a = matches[0][0]
equipment_b = matches[1][0]
interface_a = interfaces[0]
interface_b = interfaces[1]
circuits_a = \
r.get(f'ims:interface_services:{equipment_a}:{interface_a}')
logger.debug(f'ims:interface_services:{equipment_a}:{interface_a}')
circuits_b = \
r.get(f'ims:interface_services:{equipment_b}:{interface_b}')
logger.debug(f'ims:interface_services:{equipment_b}:{interface_b}')
fr_a_ids = set()
fr_b_ids = set()
all_frs = {}
tls_a_ids = set()
tls_b_ids = set()
all_tls = {}
if circuits_a:
circuits_a = json.loads(circuits_a.decode('utf-8'))
for c in circuits_a:
for fr in c['fibre-routes']:
fr_a_ids.add(fr['id'])
all_frs[fr['id']] = fr
for fr in c['top-level-services']:
tls_a_ids.add(fr['id'])
all_tls[fr['id']] = fr
if circuits_b:
circuits_b = json.loads(circuits_b.decode('utf-8'))
for c in circuits_b:
for fr in c['fibre-routes']:
fr_b_ids.add(fr['id'])
all_frs[fr['id']] = fr
for fr in c['top-level-services']:
tls_b_ids.add(fr['id'])
all_tls[fr['id']] = fr
fr_ids = fr_a_ids & fr_b_ids
if not fr_ids:
fr_ids = fr_a_ids | fr_b_ids
fibre_routes = [all_frs[x] for x in fr_ids]
tls_ids = tls_a_ids & tls_b_ids
if not tls_ids:
tls_ids = tls_a_ids | tls_b_ids
top_level_services = [all_tls[x] for x in tls_ids]
if fibre_routes:
location_a = _location_from_equipment(equipment_a, r)
location_b = _location_from_equipment(equipment_b, r)
if location_a:
loc_a = location_a
else:
loc_a = _LOCATION(equipment_a, '', '')
if location_b:
loc_b = location_b
else:
loc_b = _LOCATION(equipment_b, '', '')
# added locations in preparation for refactoring to be in-line
# with other location data. Once Dashboard has been altered to
# use this for fiberlink alarms the 'ends' attribute can be
# removed
result = {
'locations': [
build_locations(loc_a, loc_b)
],
'ends': {
'a': {
'pop': loc_a['name'],
'pop_abbreviation': loc_a['abbreviation'],
'equipment': loc_a['equipment']
},
'b': {
'pop': loc_b['name'],
'pop_abbreviation': loc_b['abbreviation'],
'equipment': loc_b['equipment']
},
},
'df_route': fibre_routes[0],
'related-services': top_level_services
}
for rs in result['related-services']:
rs.pop('id', None)
result = json.dumps(result)
r.set(cache_key, result)
if not result:
return Response(
response="no available info for "
f"{ne_name_str} {object_name_str}",
status=404,
mimetype="text/html")
return Response(result, mimetype="application/json")
@routes.route('/coriant-info/<equipment_name>/<path:entity_string>',
methods=['GET', 'POST'])
@common.require_accepts_json
def get_coriant_info(equipment_name: str, entity_string: str) -> Response:
"""
Handler for /classifier/coriant-info that
returns metadata for a coriant path.
The response will be formatted according to the following schema:
.. asjson::
inventory_provider.routes.classifier_schema.CORIANT_INFO_RESPONSE_SCHEMA
:param source_equipment: grv hostname
:param entity_string: path name
:return:
"""
r = common.get_current_redis()
ims_source_equipment = get_ims_equipment_name(equipment_name)
ims_interface = get_ims_interface(entity_string)
cache_key = 'classifier-cache:coriant:' \
f'{ims_source_equipment}:{ims_interface}'
result = _ignore_cache_or_retrieve(request, cache_key, r)
if not result:
m = re.match(r'^(\d+\-\d+)\.(\d+)', ims_interface)
if not m:
logger.error(
f'invalid coriant entity string format: {ims_interface}')
return Response(
response="no available info for "
f"'{ims_source_equipment}' '{ims_interface}'",
status=404,
mimetype="text/html")
card_id = m.group(1).replace('-', '/')
port = m.group(2)
result = {
'equipment name': ims_source_equipment,
'card id': card_id,
'port number': port
}
interface_name = f'{card_id}/{port}'
result.update(get_interface_services_and_locs(
ims_source_equipment,
interface_name,
r
))
for s in result.get('services', []):
s.pop('card_id', None)
s.pop('port', None)
s.pop('logical_unit', None)
s.pop('other_end_card_id', None)
s.pop('other_end_port', None)
s.pop('other_end_logical_unit', None)
result['locations'] = _remove_duplicates_from_list(result['locations'])
result = json.dumps(result)
# cache this data for the next call
r.set(cache_key, result.encode('utf-8'))
return Response(result, mimetype="application/json")