Skip to content
Snippets Groups Projects
Commit 90f4d7c4 authored by Robert Latta's avatar Robert Latta
Browse files

Non-functional changes

parent 73f868ae
No related branches found
No related tags found
No related merge requests found
......@@ -4,8 +4,10 @@ import json
import logging
import re
from functools import lru_cache
from typing import Optional, Iterator, List
from flask import Blueprint, Response, current_app
from redis import Redis
from inventory_provider.db.ims import IMS_SERVICE_NAMES
from inventory_provider.routes import common
......@@ -15,7 +17,7 @@ routes = Blueprint("ims-inventory-data-classifier-support-routes", __name__)
logger = logging.getLogger(__name__)
def _LOCATION(equipment, name, abbreviation):
def _LOCATION(equipment: str, name: str, abbreviation: str) -> dict:
return {
'equipment': equipment,
'name': name,
......@@ -23,7 +25,8 @@ def _LOCATION(equipment, name, abbreviation):
}
def build_locations(loc_a, loc_b=None):
def build_locations(loc_a: Optional[dict], loc_b: Optional[dict] = None) \
-> Optional[dict]:
locations = None
if loc_a:
locations = {'a': loc_a}
......@@ -32,18 +35,19 @@ def build_locations(loc_a, loc_b=None):
return locations
def _remove_duplicates_from_list(all):
def _remove_duplicates_from_list(all_: list) -> list:
"""
removes duplicates from the input list
the list items must be encodable as json
:param l:
:return: a new list with unique elements
"""
tmp_dict = dict([(json.dumps(item, sort_keys=True), item) for item in all])
tmp_dict = dict(
[(json.dumps(item, sort_keys=True), item) for item in all_])
return list(tmp_dict.values())
def _location_from_equipment(equipment_name, r):
def _location_from_equipment(equipment_name: str, r: Redis) -> Optional[dict]:
result = r.get(f'ims:location:{equipment_name}')
if not result:
logger.error(f'error looking up location for {equipment_name}')
......@@ -51,7 +55,8 @@ def _location_from_equipment(equipment_name, r):
result = json.loads(result.decode('utf-8'))
if not result:
logger.error(f'sanity failure: empty list for location {equipment_name}')
logger.error(
f'sanity failure: empty list for location {equipment_name}')
return None
return _LOCATION(
......@@ -60,7 +65,7 @@ def _location_from_equipment(equipment_name, r):
abbreviation=result[0]['pop']['abbreviation'])
def _location_from_services(services, r):
def _location_from_services(services, r: Redis):
for s in services:
loc_a = _location_from_equipment(s['equipment'], r)
loc_b = _location_from_equipment(s['other_end_equipment'], r) \
......@@ -69,7 +74,7 @@ def _location_from_services(services, r):
@lru_cache(256, typed=False)
def _location_from_port_id(port_id, r):
def _location_from_port_id(port_id: Optional[str], r: Redis) -> Optional[dict]:
if not port_id:
return None
port_info = r.get(f'ims:port_id_interface:{port_id}')
......@@ -121,7 +126,7 @@ def get_ims_interface(interface: str) -> str:
return interface.upper()
def related_interfaces(hostname, interface):
def related_interfaces(hostname:str, interface: str) -> Iterator[str]:
r = common.get_current_redis()
prefix = f'netconf-interfaces:{hostname}:'
for k in r.keys(prefix + interface + '.*'):
......@@ -131,7 +136,7 @@ def related_interfaces(hostname, interface):
yield k[len(prefix):]
def get_top_level_services(circuit_id, r):
def get_top_level_services(circuit_id: str, r: Redis) -> List[dict]:
tls = {}
key = f'ims:circuit_hierarchy:{circuit_id}'
results = r.get(key)
......@@ -160,15 +165,15 @@ def get_top_level_services(circuit_id, r):
return list(tls.values())
def get_related_services(source_equipment, interface, r):
def get_related_services(source_equipment: str, interface: str, r) -> dict:
"""
This not only finds the top-level-services for the given interface
but also gets the top-level-services for the related interfaces
Finds the top-level-services for the given interface
and also gets the top-level-services for the related interfaces
e.g. ae20 will also find services on all logical units of ae20 (ae20.1 ...)
:param source_equipment:
:param interface:
:param r:
:return:
:param source_equipment: equipment name
:param interface: interface name
:param r: redis connection
:return: Dict
"""
ims_source_equipment = get_ims_equipment_name(source_equipment)
ims_interface = get_ims_interface(interface)
......@@ -187,14 +192,14 @@ def get_related_services(source_equipment, interface, r):
yield from get_top_level_services(s['id'], r)
def get_top_level_services_by_port_id(port_id, r):
def get_top_level_services_by_port_id(port_id: str, r: Redis) -> Iterator:
services = r.get(f'ims:port_id_services:{port_id}')
if services:
for s in json.loads(services.decode('utf-8')):
yield from get_top_level_services(s['id'], r)
def get_full_service_info(s, r):
def get_full_service_info(s: dict, r: Redis) -> dict:
port_a_id = s.get('port_a_id', None)
port_b_id = s.get('port_b_id', None)
service = s.copy()
......@@ -224,7 +229,7 @@ def get_full_service_info(s, r):
@routes.route("/juniper-link-info/<source_equipment>/<path:interface>",
methods=['GET', 'POST'])
@common.require_accepts_json
def get_juniper_link_info(source_equipment: str, interface: str):
def get_juniper_link_info(source_equipment: str, interface: str) -> Response:
ims_source_equipment = get_ims_equipment_name(source_equipment)
ims_interface = get_ims_interface(interface)
......@@ -312,7 +317,7 @@ def get_juniper_link_info(source_equipment: str, interface: str):
return Response(result, mimetype="application/json")
def ix_peering_info(peer_info):
def ix_peering_info(peer_info: dict) -> dict:
"""
TODO: this is probably the least efficient way of doing this
(if it's a problem, pre-compute these lists)
......@@ -354,7 +359,7 @@ def ix_peering_info(peer_info):
return result
def find_interfaces(address):
def find_interfaces(address) -> Iterator:
"""
TODO: this is probably the least efficient way of doing this
(if it's a problem, pre-compute these lists)
......@@ -375,7 +380,7 @@ def find_interfaces(address):
yield ifc
def find_interfaces_and_services(address_str):
def find_interfaces_and_services(address_str: str) -> Iterator[dict]:
"""
:param address_str: an ipaddress object
......@@ -409,7 +414,7 @@ def find_interfaces_and_services(address_str):
@routes.route("/peer-info/<address>", methods=['GET', 'POST'])
@common.require_accepts_json
def peer_info(address):
def peer_info(address: str) -> Response:
# canonicalize the input address first ...
try:
......@@ -484,7 +489,8 @@ def peer_info(address):
"<source_equipment>/<interface>/<circuit_id>",
methods=['GET', 'POST'])
@common.require_accepts_json
def get_trap_metadata(source_equipment, interface, circuit_id):
def get_trap_metadata(source_equipment: str, interface: str, circuit_id: str) \
-> Response:
# interface = interface.replace('-T', '-')
cache_key = f'ims-classifier-cache:infinera:{source_equipment}:{interface}'
......@@ -553,7 +559,8 @@ def get_trap_metadata(source_equipment, interface, circuit_id):
@routes.route("/infinera-fiberlink-info/<ne_name_str>/<object_name_str>",
methods=['GET', 'POST'])
@common.require_accepts_json
def get_fiberlink_trap_metadata(ne_name_str, object_name_str):
def get_fiberlink_trap_metadata(ne_name_str: str, object_name_str: str) \
-> Response:
objects = object_name_str.split('_')
shelves = [x.split('-')[0] for x in objects]
p = r'([a-zA-Z\d]+?-(OLA|DTNX)\d+(-\d)?)'
......@@ -566,7 +573,8 @@ def get_fiberlink_trap_metadata(ne_name_str, object_name_str):
r = common.get_current_redis()
# double check that we only need to check the two nodes and not the objects
cache_key = f'ims-classifier-cache:fiberlink:{ne_name_str}:{object_name_str}'
cache_key = \
f'ims-classifier-cache:fiberlink:{ne_name_str}:{object_name_str}'
# result = r.get(cache_key)
result = False
......@@ -644,7 +652,7 @@ def get_fiberlink_trap_metadata(ne_name_str, object_name_str):
@routes.route('/coriant-info/<equipment_name>/<path:entity_string>',
methods=['GET', 'POST'])
@common.require_accepts_json
def get_coriant_info(equipment_name, entity_string):
def get_coriant_info(equipment_name: str, entity_string: str) -> Response:
r = common.get_current_redis()
equipment_name = get_ims_equipment_name(equipment_name)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment