Skip to content
Snippets Groups Projects
Commit 45555c53 authored by Robert Latta's avatar Robert Latta
Browse files

added first attempt at coriant route

parent a649e62d
No related branches found
No related tags found
No related merge requests found
......@@ -592,3 +592,84 @@ def get_fiberlink_trap_metadata(ne_name_str, object_name_str):
mimetype="text/html")
return Response(result, mimetype="application/json")
@routes.route('/coriant-info/<equipment_name>/<path:entity_string>',
methods=['GET', 'POST'])
@common.require_accepts_json
def get_coriant_info(equipment_name, entity_string):
r = common.get_current_redis()
equipment_name = get_ims_equipment_name(equipment_name)
cache_key = 'ims-classifier-cache:coriant:%s:%s' % (
equipment_name, entity_string)
# result = r.get(cache_key)
result = False
if result:
result = result.decode('utf-8')
else:
m = re.match(r'^(\d+\-\d+)\.(\d+)', entity_string)
if not m:
logger.error(
'invalid coriant entity string format: %r' % entity_string)
return Response(
response="no available info for '{}' '{}'".format(
equipment_name, entity_string),
status=404,
mimetype="text/html")
result = {
'equipment name': equipment_name,
'card id': m.group(1).replace('-', '/'),
'port number': m.group(2),
'locations': []
}
interface_name = f'{result["card id"]}/{result["port number"]}'
top_level_services = []
services = r.get(
f'ims:interface_services:{equipment_name}:{interface_name}')
if services:
result['services'] = json.loads(services.decode('utf=8'))
for s in result['services']:
top_level_services.extend(get_top_level_services(s['id'], r))
result['locations'] += \
_location_from_equipment(s['equipment'], r)
result['locations'] += \
_location_from_equipment(s['other_end_equipment'], r)
def _related_services():
for related in related_interfaces(equipment_name, interface_name):
logger.debug(f'Related Interface: {related}')
rs = r.get(f'ims:interface_services:{equipment_name}:'
f'{related.upper()}')
if rs:
for s in json.loads(rs.decode('utf-8')):
top_level_services.extend(
get_top_level_services(s['id'], r))
yield {
'name': s['name'],
'status': s['status'],
'circuit_type': s['circuit_type'],
'project': s['project']
}
related_services = list(_related_services())
if related_services:
top_level_services.extend(related_services)
if top_level_services:
result['related-services'] = top_level_services
if not result['locations']:
result['locations'] = _location_from_equipment(equipment_name, r)
result['locations'] = _remove_duplicates_from_list(result['locations'])
result = json.dumps(result)
# cache this data for the next call
r.set(cache_key, result.encode('utf-8'))
return Response(result, mimetype="application/json")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment