import logging import requests import time from enum import Enum # Navigation Properties # http://149.210.162.190:81/ImsVersions/4.19.9/html/86d07a57-fa45-835e-d4a2-a789c4acbc96.htm # noqa from requests import HTTPError logger = logging.getLogger(__name__) # http://149.210.162.190:81/ImsVersions/20.1/html/86d07a57-fa45-835e-d4a2-a789c4acbc96.htm # noqa CIRCUIT_PROPERTIES = { 'Site': 8, 'Speed': 16, 'Customer': 32, 'Product': 128, 'CalculatedNode': 256, 'Ports': 512, 'InternalPorts': 1024, 'CarrierCircuits': 65536, 'SubCircuits': 131072, 'PortsFullDetails': 262144, 'InternalPortsFullDetails': 524288, 'PortA': 34359738368, 'PortB': 68719476736 } # http://149.210.162.190:81/ImsVersions/4.19.9/html/dbc969d0-e735-132e-6281-f724c6d7da64.htm # noqa CONTACT_PROPERTIES = { 'SiteRelatedContacts': 8, 'CustomerRelatedContacts': 16, 'GroupRelatedContacts': 32, 'VendorRelatedContacts': 64 } # http://149.210.162.190:81/ImsVersions/4.19.9/html/5a40472e-48ee-c120-0a36-52a85d52127c.htm # noqa CUSTOMER_PROPERTIES = { 'CustomerRelatedContacts': 32768, 'CustomerType': 262144 } # http://149.210.162.190:81/ImsVersions/4.19.9/html/347cb410-8c05-47bd-ceb0-d1dd05bf98a4.htm # noqa CITY_PROPERTIES = { 'Country': 8 } # http://149.210.162.190:81/ImsVersions/4.19.9/html/a8dc6266-d934-8162-4a55-9e1648187f2c.htm # noqa EQUIP_DEF_PROPERTIES = { 'Nodes': 4096 } # http://149.210.162.190:81/ImsVersions/4.19.9/html/f18222e3-e353-0abe-b89c-820db87940ac.htm # noqa INTERNAL_PORT_PROPERTIES = { 'Node': 8, 'Shelf': 64, 'Circuit': 256, } # http://149.210.162.190:81/ImsVersions/4.19.9/html/2d27d509-77cb-537d-3ffa-796de7e82af8.htm # noqa NODE_PROPERTIES = { 'EquipmentDefinition': 16, 'ManagementSystem': 128, 'Site': 256, 'Shelves': 1024, 'Ports': 4096, 'InternalPorts': 8192, 'NodeCity': 32768, 'Order': 67108864, 'NodeCountry': 1073741824 } # http://149.210.162.190:81/ImsVersions/4.19.9/html/199f32b5-5104-fec7-8787-0d730113e902.htm # noqa PORT_PROPERTIES = { 'Node': 16, 'Shelf': 32, 'Circuit': 512, } # http://149.210.162.190:81/ImsVersions/4.19.9/html/9c8d50f0-842c-5959-0fa6-14e1720669ec.htm # noqa SITE_PROPERTIES = { 'City': 2, 'SiteAliases': 64, 'Country': 256, 'Nodes': 32768 } # http://149.210.162.190:81/ImsVersions/4.19.9/html/8ce06cb7-7707-46c4-f02f-86083310d81b.htm # noqa VENDOR_PROPERTIES = { 'VendorRelatedContacts': 64, 'VendorType': 1024 } # http://149.210.162.190:81/ImsVersions/4.19.9/html/62e7aa63-aff0-6992-7697-377ace239c4f.htm # noqa VENDOR_RELATED_CONTACT_PROPERTIES = { 'Vendor': 8, 'Contact': 16 } VM_PORT_RELATE_PROPERTIES = { 'Circuit': 32 } VM_INTERNAL_PORT_RELATE_PROPERTIES = { 'Circuit': 32 } NO_FILTERED_RESULTS_MESSAGE = 'no records found for entity:' # this will be obsolete as soon as Inventory Provider update is done, but is # here for between the time of the roll out and the Inventory Update IMS_SERVICE_NAMES = { 'EUMETSAT GRE', 'EUMETSAT INTERNATIONAL', 'EUMETSAT TERRESTRIAL', 'EXPRESS ROUTE', 'GEANT - GBS', 'GEANT CLOUD PEERING', 'GEANT IP', 'GEANT LAMBDA', 'GEANT OPEN CROSS CONNECT', 'GEANT OPEN PORT', 'GEANT PEERING', 'GEANT PLUS', 'GTS', 'GWS - BROKERED', 'GWS - DIRECT', 'GWS - INDIRECT', 'GWS - UPSTREAM', 'IP PEERING - NON R&E (PRIVATE)', 'IP PEERING - NON R&E (PUBLIC)', 'IP PEERING - R&E', 'IP TRUNK', 'L2SERVICES', 'L3-VPN', 'MD-VPN (INTERNAL)', 'MD-VPN (NATIVE)', 'MD-VPN (PROXY)', 'POP LAN LINK', 'SERVER LINK' } class InventoryStatus(Enum): PLANNED = 1 READY_FOR_SERVICE = 2 IN_SERVICE = 3 MIGRATION = 4 OUT_OF_SERVICE = 6 READY_FOR_CEASURE = 7 class RelateType(Enum): VENDOR = 1 CONTACT = 2 GROUP = 3 CONTRACT = 4 CUSTOMER = 5 SITE = 6 class IMSError(Exception): pass class IMS(object): TIMEOUT_THRESHOLD = 1200 PERMITTED_RECONNECT_ATTEMPTS = 3 LOGIN_PATH = '/login' IMS_PATH = '/ims' cache = {} base_url = None bearer_token = None bearer_token_init_time = 0 reconnect_attempts = 0 def __init__(self, base_url, username, password, bearer_token=None): IMS.base_url = base_url self.username = username self.password = password IMS.bearer_token = bearer_token @classmethod def _init_bearer_token(cls, username, password): re_init_time = time.time() if not cls.bearer_token or \ re_init_time - cls.bearer_token_init_time \ > cls.TIMEOUT_THRESHOLD: cls.reconnect_attempts = 0 else: cls.reconnect_attempts += 1 if cls.reconnect_attempts > cls.PERMITTED_RECONNECT_ATTEMPTS: raise IMSError('Too many reconnection attempts made') logger.debug(f'Logging in - Username: {username}' f' - URL: {cls.base_url + cls.LOGIN_PATH}') response = requests.post( cls.base_url + cls.LOGIN_PATH, auth=(username, password)) response.raise_for_status() cls.bearer_token_init_time = re_init_time cls.bearer_token = response.text def _get_entity( self, url, params=None, navigation_properties=None, use_cache=False): url = f'{self.base_url + IMS.IMS_PATH}/{url}' cache_key = url if navigation_properties: params = params if params else {} params['navigationproperty'] = navigation_properties if isinstance( navigation_properties, int) else sum(navigation_properties) if use_cache: if params: s = '-'.join( [f'{t}::{params[t]}' for t in sorted(params)]) cache_key = f'{cache_key}::{s}' entity = IMS.cache.get(cache_key, None) if entity: return entity if not IMS.bearer_token: IMS._init_bearer_token(self.username, self.password) def _is_invalid_login_state(response_): if response_.status_code == requests.codes.unauthorized: return True if response_.status_code in (requests.codes.ok, requests.codes.not_found): if NO_FILTERED_RESULTS_MESSAGE in response_.text.lower(): return False try: r = response_.json() except Exception as e: logger.debug(f"unexpected response: {response_.text}") raise e if r and 'HasErrors' in r and r['HasErrors']: for e in r['Errors']: if 'Guid expired' in e['ErrorMessage']: return True return False def _convert_keys(source): if isinstance(source, list): return [_convert_keys(x) for x in source] elif isinstance(source, dict): new = {} for k, v in source.items(): if isinstance(v, (dict, list)): v = _convert_keys(v) new[k.lower()] = v return new return source while True: response = requests.get( url, headers={'Authorization': f'Bearer {self.bearer_token}'}, params=params) if _is_invalid_login_state(response): IMS._init_bearer_token(self.username, self.password) else: response.raise_for_status() orig = response.json() return_value = _convert_keys(orig) if use_cache: IMS.cache[cache_key] = return_value return return_value def get_entity_by_id( self, entity_type, entity_id, navigation_properties=None, use_cache=False): url = f'{entity_type}/{entity_id}' return \ self._get_entity(url, None, navigation_properties, use_cache) def get_entity_by_name( self, entity, name, navigation_properties=None, use_cache=False): url = f'{entity}/byname/"{name}"' return self._get_entity(url, None, navigation_properties, use_cache) def get_filtered_entities( self, entity, filter_string, navigation_properties=None, use_cache=False, step_count=50): more_to_come = True start_entity = 0 while more_to_come: params = { 'paginatorStartElement': start_entity, 'paginatorNumberOfElements': step_count } url = f'{entity}/filtered/{filter_string}' try: more_to_come = False entities = self._get_entity( url, params, navigation_properties, use_cache) except HTTPError as e: r = e.response if r.status_code == requests.codes.not_found \ and NO_FILTERED_RESULTS_MESSAGE in r.text.lower(): entities = None else: raise e if entities: more_to_come = \ len(entities) >= step_count start_entity += step_count yield from entities def get_all_entities( self, entity, navigation_properties=None, use_cache=False, step_count=50 ): yield from self.get_filtered_entities( entity, 'Id <> 0', navigation_properties, use_cache, step_count )