diff --git a/inventory_provider/routes/ims_classifier.py b/inventory_provider/routes/ims_classifier.py index f5577fe45725650a23af48001090dc6fcc8e6f88..f2c5f4f1c1ae7f3522d462cf2065417c72b3a20a 100644 --- a/inventory_provider/routes/ims_classifier.py +++ b/inventory_provider/routes/ims_classifier.py @@ -4,8 +4,10 @@ import json import logging import re from functools import lru_cache +from typing import Optional, Iterator, List from flask import Blueprint, Response, current_app +from redis import Redis from inventory_provider.db.ims import IMS_SERVICE_NAMES from inventory_provider.routes import common @@ -15,7 +17,7 @@ routes = Blueprint("ims-inventory-data-classifier-support-routes", __name__) logger = logging.getLogger(__name__) -def _LOCATION(equipment, name, abbreviation): +def _LOCATION(equipment: str, name: str, abbreviation: str) -> dict: return { 'equipment': equipment, 'name': name, @@ -23,7 +25,8 @@ def _LOCATION(equipment, name, abbreviation): } -def build_locations(loc_a, loc_b=None): +def build_locations(loc_a: Optional[dict], loc_b: Optional[dict] = None) \ + -> Optional[dict]: locations = None if loc_a: locations = {'a': loc_a} @@ -32,18 +35,19 @@ def build_locations(loc_a, loc_b=None): return locations -def _remove_duplicates_from_list(all): +def _remove_duplicates_from_list(all_: list) -> list: """ removes duplicates from the input list the list items must be encodable as json :param l: :return: a new list with unique elements """ - tmp_dict = dict([(json.dumps(item, sort_keys=True), item) for item in all]) + tmp_dict = dict( + [(json.dumps(item, sort_keys=True), item) for item in all_]) return list(tmp_dict.values()) -def _location_from_equipment(equipment_name, r): +def _location_from_equipment(equipment_name: str, r: Redis) -> Optional[dict]: result = r.get(f'ims:location:{equipment_name}') if not result: logger.error(f'error looking up location for {equipment_name}') @@ -51,7 +55,8 @@ def _location_from_equipment(equipment_name, r): result = json.loads(result.decode('utf-8')) if not result: - logger.error(f'sanity failure: empty list for location {equipment_name}') + logger.error( + f'sanity failure: empty list for location {equipment_name}') return None return _LOCATION( @@ -60,7 +65,7 @@ def _location_from_equipment(equipment_name, r): abbreviation=result[0]['pop']['abbreviation']) -def _location_from_services(services, r): +def _location_from_services(services, r: Redis): for s in services: loc_a = _location_from_equipment(s['equipment'], r) loc_b = _location_from_equipment(s['other_end_equipment'], r) \ @@ -69,7 +74,7 @@ def _location_from_services(services, r): @lru_cache(256, typed=False) -def _location_from_port_id(port_id, r): +def _location_from_port_id(port_id: Optional[str], r: Redis) -> Optional[dict]: if not port_id: return None port_info = r.get(f'ims:port_id_interface:{port_id}') @@ -121,7 +126,7 @@ def get_ims_interface(interface: str) -> str: return interface.upper() -def related_interfaces(hostname, interface): +def related_interfaces(hostname:str, interface: str) -> Iterator[str]: r = common.get_current_redis() prefix = f'netconf-interfaces:{hostname}:' for k in r.keys(prefix + interface + '.*'): @@ -131,7 +136,7 @@ def related_interfaces(hostname, interface): yield k[len(prefix):] -def get_top_level_services(circuit_id, r): +def get_top_level_services(circuit_id: str, r: Redis) -> List[dict]: tls = {} key = f'ims:circuit_hierarchy:{circuit_id}' results = r.get(key) @@ -160,15 +165,15 @@ def get_top_level_services(circuit_id, r): return list(tls.values()) -def get_related_services(source_equipment, interface, r): +def get_related_services(source_equipment: str, interface: str, r) -> dict: """ - This not only finds the top-level-services for the given interface - but also gets the top-level-services for the related interfaces + Finds the top-level-services for the given interface + and also gets the top-level-services for the related interfaces e.g. ae20 will also find services on all logical units of ae20 (ae20.1 ...) - :param source_equipment: - :param interface: - :param r: - :return: + :param source_equipment: equipment name + :param interface: interface name + :param r: redis connection + :return: Dict """ ims_source_equipment = get_ims_equipment_name(source_equipment) ims_interface = get_ims_interface(interface) @@ -187,14 +192,14 @@ def get_related_services(source_equipment, interface, r): yield from get_top_level_services(s['id'], r) -def get_top_level_services_by_port_id(port_id, r): +def get_top_level_services_by_port_id(port_id: str, r: Redis) -> Iterator: services = r.get(f'ims:port_id_services:{port_id}') if services: for s in json.loads(services.decode('utf-8')): yield from get_top_level_services(s['id'], r) -def get_full_service_info(s, r): +def get_full_service_info(s: dict, r: Redis) -> dict: port_a_id = s.get('port_a_id', None) port_b_id = s.get('port_b_id', None) service = s.copy() @@ -224,7 +229,7 @@ def get_full_service_info(s, r): @routes.route("/juniper-link-info/<source_equipment>/<path:interface>", methods=['GET', 'POST']) @common.require_accepts_json -def get_juniper_link_info(source_equipment: str, interface: str): +def get_juniper_link_info(source_equipment: str, interface: str) -> Response: ims_source_equipment = get_ims_equipment_name(source_equipment) ims_interface = get_ims_interface(interface) @@ -312,7 +317,7 @@ def get_juniper_link_info(source_equipment: str, interface: str): return Response(result, mimetype="application/json") -def ix_peering_info(peer_info): +def ix_peering_info(peer_info: dict) -> dict: """ TODO: this is probably the least efficient way of doing this (if it's a problem, pre-compute these lists) @@ -354,7 +359,7 @@ def ix_peering_info(peer_info): return result -def find_interfaces(address): +def find_interfaces(address) -> Iterator: """ TODO: this is probably the least efficient way of doing this (if it's a problem, pre-compute these lists) @@ -375,7 +380,7 @@ def find_interfaces(address): yield ifc -def find_interfaces_and_services(address_str): +def find_interfaces_and_services(address_str: str) -> Iterator[dict]: """ :param address_str: an ipaddress object @@ -409,7 +414,7 @@ def find_interfaces_and_services(address_str): @routes.route("/peer-info/<address>", methods=['GET', 'POST']) @common.require_accepts_json -def peer_info(address): +def peer_info(address: str) -> Response: # canonicalize the input address first ... try: @@ -484,7 +489,8 @@ def peer_info(address): "<source_equipment>/<interface>/<circuit_id>", methods=['GET', 'POST']) @common.require_accepts_json -def get_trap_metadata(source_equipment, interface, circuit_id): +def get_trap_metadata(source_equipment: str, interface: str, circuit_id: str) \ + -> Response: # interface = interface.replace('-T', '-') cache_key = f'ims-classifier-cache:infinera:{source_equipment}:{interface}' @@ -553,7 +559,8 @@ def get_trap_metadata(source_equipment, interface, circuit_id): @routes.route("/infinera-fiberlink-info/<ne_name_str>/<object_name_str>", methods=['GET', 'POST']) @common.require_accepts_json -def get_fiberlink_trap_metadata(ne_name_str, object_name_str): +def get_fiberlink_trap_metadata(ne_name_str: str, object_name_str: str) \ + -> Response: objects = object_name_str.split('_') shelves = [x.split('-')[0] for x in objects] p = r'([a-zA-Z\d]+?-(OLA|DTNX)\d+(-\d)?)' @@ -566,7 +573,8 @@ def get_fiberlink_trap_metadata(ne_name_str, object_name_str): r = common.get_current_redis() # double check that we only need to check the two nodes and not the objects - cache_key = f'ims-classifier-cache:fiberlink:{ne_name_str}:{object_name_str}' + cache_key = \ + f'ims-classifier-cache:fiberlink:{ne_name_str}:{object_name_str}' # result = r.get(cache_key) result = False @@ -644,7 +652,7 @@ def get_fiberlink_trap_metadata(ne_name_str, object_name_str): @routes.route('/coriant-info/<equipment_name>/<path:entity_string>', methods=['GET', 'POST']) @common.require_accepts_json -def get_coriant_info(equipment_name, entity_string): +def get_coriant_info(equipment_name: str, entity_string: str) -> Response: r = common.get_current_redis() equipment_name = get_ims_equipment_name(equipment_name)