Skip to content
Snippets Groups Projects
Commit 711cd409 authored by Robert Latta's avatar Robert Latta
Browse files

temporary file for parallel development of IMS based classification

parent 3aeae5cb
No related branches found
No related tags found
No related merge requests found
import ipaddress
import json
import logging
from flask import Blueprint, Response, current_app
from inventory_provider.routes import common
routes = Blueprint("ims-inventory-data-classifier-support-routes", __name__)
logger = logging.getLogger(__name__)
class ClassifierRequestError(Exception):
status_code = 500
def __init__(self):
super().__init__()
self.message = "Unclassified Internal Error"
class ClassifierProcessingError(ClassifierRequestError):
status_code = 422
def __init__(self, message, status_code=None):
super().__init__()
self.message = str(message)
if status_code is not None:
self.status_code = status_code
@routes.errorhandler(ClassifierRequestError)
def handle_request_error(error):
return Response(
response=error.message,
status=error.status_code)
@routes.after_request
def after_request(resp):
return common.after_request(resp)
def _LOCATION(equipment, name, abbreviation):
return {
'equipment': equipment,
'name': name,
'abbreviation': abbreviation
}
def _remove_duplicates_from_list(all):
"""
removes duplicates from the input list
the list items must be encodable as json
:param l:
:return: a new list with unique elements
"""
tmp_dict = dict([(json.dumps(item, sort_keys=True), item) for item in all])
return list(tmp_dict.values())
def _locations_from_router(router_name):
r = common.get_current_redis()
result = r.get(f'ims:location:{router_name}')
if not result:
logger.error(f'error looking up location for {router_name}')
return []
result = json.loads(result.decode('utf-8'))
if not result:
logger.error(f'sanity failure: empty list for location {router_name}')
return []
return [{
'a': _LOCATION(
equipment=result[0]['equipment-name'],
name=result[0]['pop']['name'],
abbreviation=result[0]['pop']['abbreviation'])
}]
def _location_from_service_dict(s):
location = {
'a': _LOCATION(
equipment=s['equipment'],
name=s['pop_name'],
abbreviation=s['pop_abbreviation'])
}
if all(s[n] for n in (
'other_end_equipment',
'other_end_pop_name',
'other_end_pop_abbreviation')):
location['b'] = _LOCATION(
equipment=s['other_end_equipment'],
name=s['other_end_pop_name'],
abbreviation=s['other_end_pop_abbreviation'])
return location
def related_interfaces(hostname, interface):
r = common.get_current_redis()
prefix = 'netconf-interfaces:%s:' % hostname
for k in r.keys(prefix + interface + '.*'):
k = k.decode('utf-8')
assert k.startswith(prefix) # sanity
assert len(k) > len(prefix) # sanity (contains at least an interface)
yield k[len(prefix):]
def get_top_level_services(circuit_id, r):
tls = []
results = r.get("ims:services:children:{}".format(circuit_id))
if results:
results = json.loads(results.decode('utf-8'))
for c in results:
temp_parents = \
get_top_level_services(c['parent_circuit_id'], r)
if not temp_parents:
tls.append(
{'name': c['parent_circuit'],
'status': c['parent_circuit_status'],
'circuit_type': c['parent_circuit_type'].lower(),
'project': c['parent_project']
})
tls.extend(temp_parents)
return tls
@routes.route("/juniper-link-info/<source_equipment>/<path:interface>",
methods=['GET', 'POST'])
@common.require_accepts_json
def get_juniper_link_info(source_equipment: str, interface: str):
ims_source_equipment = source_equipment.upper()
ims_interface = interface.upper()
if ims_source_equipment.startswith('MX'):
ims_source_equipment = ims_source_equipment.split('.GEANT.NET')[0]
r = common.get_current_redis()
cache_key = f'ims-classifier-cache:juniper:{ims_source_equipment}:{ims_interface}'
# result = r.get(cache_key)
result = False
if result:
result = result.decode('utf-8')
else:
result = {
'locations': []
}
top_level_services = []
services = r.get(
f'ims:interface_services:{ims_source_equipment}:{ims_interface}')
if services:
result['services'] = json.loads(services.decode('utf=8'))
for s in result['services']:
top_level_services.extend(get_top_level_services(s['id'], r))
result['locations'] += [
_location_from_service_dict(s) for s in result['services']]
ifc_info = r.get(f'netconf-interfaces:{source_equipment}:{interface}')
if ifc_info:
result['interface'] = json.loads(ifc_info.decode('utf-8'))
else:
# warning: this should match the structure returned by
# juniper:list_interfaces:_ifc_info
result['interface'] = {
'name': interface,
'description': '',
'bundle': [],
'ipv4': [],
'ipv6': []
}
bundle_members = r.get(
f'netconf-interface-bundles:{source_equipment}:{interface}')
if bundle_members:
result['interface']['bundle_members'] = \
json.loads(bundle_members.decode('utf-8'))
else:
result['interface']['bundle_members'] = []
def _related_services():
for related in related_interfaces(source_equipment, interface):
logger.debug(f'Related Interface: {related}')
rs = r.get(f'ims:interface_services:{ims_source_equipment}:'
f'{related.upper()}')
if rs:
for s in json.loads(rs.decode('utf-8')):
top_level_services.extend(
get_top_level_services(s['id'], r))
yield {
'name': s['name'],
'status': s['status'],
'circuit_type': s['circuit_type'],
'project': s['project']
}
related_services = list(_related_services())
if related_services:
top_level_services.extend(related_services)
if top_level_services:
result['related-services'] = top_level_services
if not result['locations']:
result['locations'] = _locations_from_router(ims_source_equipment)
result['locations'] = _remove_duplicates_from_list(result['locations'])
result = json.dumps(result)
# cache this data for the next call
r.set(cache_key, result.encode('utf-8'))
return Response(result, mimetype="application/json")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment