diff --git a/inventory_provider/routes/testing.py b/inventory_provider/routes/testing.py index f81982283885785da1dfaad497ba8e8ed1899f1c..8240efe5e92438674a881339b140f398f25b5695 100644 --- a/inventory_provider/routes/testing.py +++ b/inventory_provider/routes/testing.py @@ -18,6 +18,12 @@ routes = Blueprint("inventory-data-testing-support-routes", __name__) logger = logging.getLogger(__name__) +@routes.route("chord-update", methods=['GET', 'POST']) +def chord_update(): + r = worker.update_entry_point.delay().get() + return jsonify(r) + + @routes.route("flushdb", methods=['GET', 'POST']) def flushdb(): common.get_current_redis().flushdb() diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index f496e544957e593a385be2c3d679d2663413ae4c..7dc7bee08af32713622f5c2f9d70a1f30dba15ba 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -1,3 +1,4 @@ +import concurrent.futures import functools import json import logging @@ -10,7 +11,7 @@ from typing import List from redis.exceptions import RedisError from kombu.exceptions import KombuError -from celery import Task, states +from celery import Task, states, chord from celery.result import AsyncResult from collections import defaultdict @@ -23,7 +24,7 @@ from inventory_provider.tasks.app import app from inventory_provider.tasks.common \ import get_next_redis, get_current_redis, \ latch_db, get_latch, set_latch, update_latch_status, \ - ims_sorted_service_type_key + ims_sorted_service_type_key, set_single_latch from inventory_provider.tasks import monitor from inventory_provider import config from inventory_provider import environment @@ -1145,3 +1146,625 @@ def check_task_status(task_id, parent=None, forget=False): r.forget() yield result + +# =================================== chorded - currently only here for testing + + +# new +@app.task(base=InventoryTask, bind=True, name='update_entry_point') +@log_task_entry_and_exit +def update_entry_point(self): + + self.log_info('querying netdash for managed routers') + routers = list(juniper.load_routers_from_netdash( + InventoryTask.config['managed-routers'])) + self.log_info(f'found {len(routers)} routers') + + lab_routers = InventoryTask.config.get('lab-routers', []) + + _erase_next_db_chorded(InventoryTask.config) + update_latch_status(InventoryTask.config, pending=True) + + tasks = chord( + ( + ims_task.s().on_error(task_error_handler.s()), + chord( + (reload_router_config_chorded.s(r).on_error( + task_error_handler.s()) for r in routers), + empty_task.si('router tasks complete') + ), + chord( + (reload_lab_router_config_chorded.s(r).on_error( + task_error_handler.s()) for r in lab_routers), + empty_task.si('lab router tasks complete') + ) + ), + final_task.si().on_error(task_error_handler.s()) + )() + return tasks + + +# new +@app.task +def task_error_handler(request, exc, traceback): + update_latch_status(InventoryTask.config, pending=False, failure=True) + logger.warning('Task {0!r} raised error: {1!r}'.format(request.id, exc)) + + +# new +@app.task(base=InventoryTask, bind=True, name='empty_task') +def empty_task(self, message): + logger.warning(f'message from empty task: {message}') + + +# updated with tramsaction +def _erase_next_db_chorded(config): + """ + flush next db, but first save latch and then restore afterwards + + TODO: handle the no latch scenario nicely + :param config: + :return: + """ + r = get_next_redis(config) + saved_latch = get_latch(r) + + if saved_latch: + # execute as transaction to ensure that latch is always available in + # db that is being flushed + rp = r.pipeline() + rp.multi() + rp.flushdb() + set_single_latch( + rp, + saved_latch['this'], + saved_latch['current'], + saved_latch['next'], + saved_latch.get('timestamp', 0) + ) + rp.execute() + + # ensure latch is consistent in all dbs + set_latch( + config, + new_current=saved_latch['current'], + new_next=saved_latch['next'], + timestamp=saved_latch.get('timestamp', 0)) + + +# updated +@app.task(base=InventoryTask, bind=True, name='reload_lab_router_config') +@log_task_entry_and_exit +def reload_lab_router_config_chorded(self, hostname): + self.log_info(f'loading netconf data for lab {hostname} RL') + + # load new netconf data, in this thread + netconf_str = retrieve_and_persist_netconf_config(hostname, lab=True) + netconf_doc = etree.fromstring(netconf_str.decode('utf-8')) + + refresh_juniper_interface_list(hostname, netconf_doc, lab=True) + + # load snmp indexes + community = juniper.snmp_community_string(netconf_doc) + if not community: + raise InventoryTaskError( + f'error extracting community string for {hostname}') + else: + self.log_info(f'refreshing snmp interface indexes for {hostname}') + logical_systems = juniper.logical_systems(netconf_doc) + + # load snmp data, in this thread + snmp_refresh_interfaces(hostname, community, logical_systems) + + self.log_info(f'updated configuration for lab {hostname}') + + +# updated +@app.task(base=InventoryTask, bind=True, name='reload_router_config') +@log_task_entry_and_exit +def reload_router_config_chorded(self, hostname): + self.log_info(f'loading netconf data for {hostname} RL') + netconf_str = retrieve_and_persist_netconf_config(hostname) + netconf_doc = etree.fromstring(netconf_str.decode('utf-8')) + + # clear cached classifier responses for this router, and + # refresh peering data + logger.info(f'refreshing peers & clearing cache for {hostname}') + refresh_juniper_bgp_peers(hostname, netconf_doc) + refresh_juniper_interface_list(hostname, netconf_doc) + + # load snmp indexes + community = juniper.snmp_community_string(netconf_doc) + if not community: + raise InventoryTaskError( + f'error extracting community string for {hostname}') + else: + self.log_info(f'refreshing snmp interface indexes for {hostname}') + logical_systems = juniper.logical_systems(netconf_doc) + + # load snmp data, in this thread + snmp_refresh_interfaces_chorded( + hostname, community, logical_systems, self.log_info) + snmp_refresh_peerings_chorded(hostname, community, logical_systems) + + logger.info(f'updated configuration for {hostname}') + + +# new +def retrieve_and_persist_netconf_config(hostname, lab=False): + redis_key = f'netconf:{hostname}' + if lab: + redis_key = f'lab:{redis_key}' + + try: + netconf_doc = juniper.load_config( + hostname, InventoryTask.config["ssh"]) + netconf_str = etree.tostring(netconf_doc, encoding='unicode') + except (ConnectionError, juniper.NetconfHandlingError): + msg = f'error loading netconf data from {hostname}' + logger.exception(msg) + r = get_current_redis(InventoryTask.config) + + netconf_str = r.get(redis_key) + if not netconf_str: + raise InventoryTaskError( + f'netconf error with {hostname}' + f' and no cached netconf data found') + logger.info(f'Returning cached netconf data for {hostname}') + + r = get_next_redis(InventoryTask.config) + r.set(redis_key, netconf_str) + logger.info(f'netconf info loaded from {hostname}') + return netconf_str + + +# updated as is no longer a task +@log_task_entry_and_exit +def snmp_refresh_interfaces_chorded( + hostname, community, logical_systems, update_callback=lambda s: None): + try: + interfaces = list( + snmp.get_router_snmp_indexes(hostname, community, logical_systems)) + except ConnectionError: + msg = f'error loading snmp interface data from {hostname}' + logger.exception(msg) + update_callback(msg) + r = get_current_redis(InventoryTask.config) + interfaces = r.get(f'snmp-interfaces:{hostname}') + if not interfaces: + raise InventoryTaskError( + f'snmp error with {hostname}' + f' and no cached snmp interface data found') + # unnecessary json encode/decode here ... could be optimized + interfaces = json.loads(interfaces.decode('utf-8')) + update_callback(f'using cached snmp interface data for {hostname}') + + r = get_next_redis(InventoryTask.config) + + rp = r.pipeline() + rp.set(f'snmp-interfaces:{hostname}', json.dumps(interfaces)) + + # optimization for DBOARD3-372 + # interfaces is a list of dicts like: {'name': str, 'index': int} + for ifc in interfaces: + ifc['hostname'] = hostname + rp.set( + f'snmp-interfaces-single:{hostname}:{ifc["name"]}', + json.dumps(ifc)) + + rp.execute() + + update_callback(f'snmp interface info loaded from {hostname}') + + +# updated as is no longer a task +@log_task_entry_and_exit +def snmp_refresh_peerings_chorded( + hostname, community, logical_systems, update_callback=lambda S: None): + try: + peerings = list( + snmp.get_peer_state_info(hostname, community, logical_systems)) + except ConnectionError: + msg = f'error loading snmp peering data from {hostname}' + logger.exception(msg) + update_callback(msg) + r = get_current_redis(InventoryTask.config) + peerings = r.get(f'snmp-peerings:hosts:{hostname}') + if peerings is None: + raise InventoryTaskError( + f'snmp error with {peerings}' + f' and no cached peering data found') + # unnecessary json encode/decode here ... could be optimized + peerings = json.loads(peerings.decode('utf-8')) + update_callback(f'using cached snmp peering data for {hostname}') + + r = get_next_redis(InventoryTask.config) + r.set(f'snmp-peerings:hosts:{hostname}', json.dumps(peerings)) + + update_callback(f'snmp peering info loaded from {hostname}') + + +# new +@app.task(base=InventoryTask, bind=True, name='ims_task') +@log_task_entry_and_exit +def ims_task(self, use_current=False): + + extracted_data = extract_ims_data() + transformed_data = transform_ims_data(extracted_data) + transformed_data['locations'] = extracted_data['locations'] + transformed_data['lg_routers'] = extracted_data['lg_routers'] + persist_ims_data(transformed_data, use_current) + + +# new +def extract_ims_data(): + + c = InventoryTask.config["ims"] + ds1 = IMS(c['api'], c['username'], c['password']) + ds2 = IMS(c['api'], c['username'], c['password']) + ds3 = IMS(c['api'], c['username'], c['password']) + ds4 = IMS(c['api'], c['username'], c['password']) + ds5 = IMS(c['api'], c['username'], c['password']) + + locations = {} + lg_routers = [] + customer_contacts = {} + circuit_ids_to_monitor = [] + additional_circuit_customer_ids = {} + + hierarchy = {} + port_id_details = defaultdict(list) + port_id_services = defaultdict(list) + + def _populate_locations(): + nonlocal locations + locations = {k: v for k, v in ims_data.get_node_locations(ds1)} + + def _populate_lg_routers(): + nonlocal lg_routers + lg_routers = list(ims_data.lookup_lg_routers(ds5)) + + def _populate_customer_contacts(): + nonlocal customer_contacts + customer_contacts = \ + {k: v for k, v in ims_data.get_customer_service_emails(ds2)} + + def _populate_circuit_ids_to_monitor(): + nonlocal circuit_ids_to_monitor + circuit_ids_to_monitor = \ + list(ims_data.get_monitored_circuit_ids(ds3)) + + def _populate_additional_circuit_customer_ids(): + nonlocal additional_circuit_customer_ids + additional_circuit_customer_ids = \ + ims_data.get_circuit_related_customer_ids(ds4) + + exceptions = {} + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = { + executor.submit(_populate_locations): 'locations', + executor.submit(_populate_lg_routers): 'lg_routers', + executor.submit(_populate_customer_contacts): 'customer_contacts', + executor.submit(_populate_circuit_ids_to_monitor): + 'circuit_ids_to_monitor', + executor.submit(_populate_additional_circuit_customer_ids): + 'additional_circuit_customer_ids' + } + + for future in concurrent.futures.as_completed(futures): + if future.exception(): + exceptions[futures[future]] = str(future.exception()) + + if exceptions: + raise InventoryTaskError(json.dumps(exceptions, indent=2)) + + def _populate_hierarchy(): + nonlocal hierarchy + hierarchy = {d['id']: d for d in ims_data.get_circuit_hierarchy(ds1)} + logger.debug("hierarchy complete") + + def _populate_port_id_details(): + nonlocal port_id_details + for x in ims_data.get_port_details(ds2): + pd = port_id_details[x['port_id']] + pd.append(x) + logger.debug("Port details complete") + + def _populate_circuit_info(): + for x in ims_data.get_port_id_services(ds3): + port_id_services[x['port_a_id']].append(x) + logger.debug("port circuits complete") + + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = { + executor.submit(_populate_hierarchy): 'hierarchy', + executor.submit(_populate_port_id_details): 'port_id_details', + executor.submit(_populate_circuit_info): 'circuit_info' + } + + for future in concurrent.futures.as_completed(futures): + if future.exception(): + exceptions[futures[future]] = str(future.exception()) + + if exceptions: + raise InventoryTaskError(json.dumps(exceptions, indent=2)) + + return { + 'locations': locations, + 'lg_routers': lg_routers, + 'customer_contacts': customer_contacts, + 'circuit_ids_to_monitor': circuit_ids_to_monitor, + 'additional_circuit_customer_ids': additional_circuit_customer_ids, + 'hierarchy': hierarchy, + 'port_id_details': port_id_details, + 'port_id_services': port_id_services + } + + +# new +def transform_ims_data(data): + locations = data['locations'] + customer_contacts = data['customer_contacts'] + circuit_ids_to_monitor = data['circuit_ids_to_monitor'] + additional_circuit_customer_ids = data['additional_circuit_customer_ids'] + hierarchy = data['hierarchy'] + port_id_details = data['port_id_details'] + port_id_services = data['port_id_services'] + + def _get_circuit_contacts(c): + customer_ids = {c['customerid']} + customer_ids.update(additional_circuit_customer_ids.get(c['id'], [])) + return set().union( + *[customer_contacts.get(cid, []) for cid in customer_ids]) + + for d in hierarchy.values(): + d['contacts'] = sorted(list(_get_circuit_contacts(d))) + + def _convert_to_bits(value, unit): + unit = unit.lower() + conversions = { + 'm': 1 << 20, + 'mb': 1 << 20, + 'g': 1 << 30, + 'gbe': 1 << 30, + } + return int(value) * conversions[unit] + + def _get_speed(circuit_id): + c = hierarchy[circuit_id] + if c['status'] != 'operational': + return 0 + pattern = re.compile(r'^(\d+)([a-zA-z]+)$') + m = pattern.match(c['speed']) + if m: + try: + return _convert_to_bits(m[1], m[2]) + except KeyError as e: + logger.debug(f'Could not find key: {e} ' + f'for circuit: {circuit_id}') + return 0 + else: + if c['circuit-type'] == 'service' \ + or c['product'].lower() == 'ethernet': + return sum( + (_get_speed(x) for x in c['carrier-circuits']) + ) + else: + return 0 + + def _get_fibre_routes(c_id): + _circ = hierarchy.get(c_id, None) + if _circ is None: + return + if _circ['speed'].lower() == 'fibre_route': + yield _circ['id'] + else: + for cc in _circ['carrier-circuits']: + yield from _get_fibre_routes(cc) + + def _get_related_services(circuit_id: str) -> List[dict]: + rs = {} + c = hierarchy.get(circuit_id, None) + if c: + + if c['circuit-type'] == 'service': + rs[c['id']] = { + 'id': c['id'], + 'name': c['name'], + 'circuit_type': c['circuit-type'], + 'service_type': c['product'], + 'project': c['project'], + 'contacts': c['contacts'] + } + if c['id'] in circuit_ids_to_monitor: + rs[c['id']]['status'] = c['status'] + else: + rs[c['id']]['status'] = 'non-monitored' + + if c['sub-circuits']: + for sub in c['sub-circuits']: + temp_parents = \ + _get_related_services(sub) + rs.update({t['id']: t for t in temp_parents}) + return list(rs.values()) + + def _format_service(s): + + if s['circuit_type'] == 'service' \ + and s['id'] not in circuit_ids_to_monitor: + s['status'] = 'non-monitored' + pd_a = port_id_details[s['port_a_id']][0] + location_a = locations.get(pd_a['equipment_name'], None) + if location_a: + loc_a = location_a['pop'] + else: + loc_a = locations['UNKNOWN_LOC']['pop'] + logger.warning( + f'Unable to find location for {pd_a["equipment_name"]} - ' + f'Service ID {s["id"]}') + s['pop_name'] = loc_a['name'] + s['pop_abbreviation'] = loc_a['abbreviation'] + s['equipment'] = pd_a['equipment_name'] + s['card_id'] = '' # this is redundant I believe + s['port'] = pd_a['interface_name'] + s['logical_unit'] = '' # this is redundant I believe + if 'port_b_id' in s: + pd_b = port_id_details[s['port_b_id']][0] + location_b = locations.get(pd_b['equipment_name'], None) + if location_b: + loc_b = location_b['pop'] + else: + loc_b = locations['UNKNOWN_LOC']['pop'] + logger.warning( + f'Unable to find location for {pd_b["equipment_name"]} - ' + f'Service ID {s["id"]}') + + s['other_end_pop_name'] = loc_b['name'] + s['other_end_pop_abbreviation'] = loc_b['abbreviation'] + s['other_end_equipment'] = pd_b['equipment_name'] + s['other_end_port'] = pd_b['interface_name'] + else: + s['other_end_pop_name'] = '' + s['other_end_pop_abbreviation'] = '' + s['other_end_equipment'] = '' + s['other_end_port'] = '' + + s.pop('port_a_id', None) + s.pop('port_b_id', None) + + services_by_type = {} + interface_services = defaultdict(list) + + for key, value in port_id_details.items(): + for details in value: + k = f"{details['equipment_name']}:" \ + f"{details['interface_name']}" + circuits = port_id_services.get(details['port_id'], []) + + for circ in circuits: + contacts = _get_circuit_contacts(circ) + circ['fibre-routes'] = [] + for x in set(_get_fibre_routes(circ['id'])): + c = { + 'id': hierarchy[x]['id'], + 'name': hierarchy[x]['name'], + 'status': hierarchy[x]['status'] + } + circ['fibre-routes'].append(c) + + circ['related-services'] = \ + _get_related_services(circ['id']) + + for tlc in circ['related-services']: + contacts.update(tlc.pop('contacts')) + circ['contacts'] = sorted(list(contacts)) + + circ['calculated-speed'] = _get_speed(circ['id']) + _format_service(circ) + + type_services = services_by_type.setdefault( + ims_sorted_service_type_key(circ['service_type']), dict()) + type_services[circ['id']] = circ + + interface_services[k].extend(circuits) + + return { + 'hierarchy': hierarchy, + 'interface_services': interface_services, + 'services_by_type': services_by_type + } + + +# new +def persist_ims_data(data, use_current=False): + hierarchy = data['hierarchy'] + locations = data['locations'] + lg_routers = data['lg_routers'] + interface_services = data['interface_services'] + services_by_type = data['services_by_type'] + + if use_current: + r = get_current_redis(InventoryTask.config) + + # only need to delete the individual keys if it's just an IMS update + # rather than a complete update (the db will have been flushed) + for key_pattern in [ + 'ims:location:*', + 'ims:lg:*', + 'ims:circuit_hierarchy:*', + 'ims:interface_services:*', + 'ims:access_services:*', + 'ims:gws_indirect:*' + ]: + rp = r.pipeline() + for k in r.scan_iter(key_pattern, count=1000): + rp.delete(k) + else: + r = get_next_redis(InventoryTask.config) + + rp = r.pipeline() + for h, d in locations.items(): + rp.set(f'ims:location:{h}', json.dumps([d])) + rp.execute() + rp = r.pipeline() + for router in lg_routers: + rp.set(f'ims:lg:{router["equipment name"]}', json.dumps([router])) + rp.execute() + rp = r.pipeline() + for circ in hierarchy.values(): + rp.set(f'ims:circuit_hierarchy:{circ["id"]}', json.dumps([circ])) + rp.execute() + rp = r.pipeline() + for k, v in interface_services.items(): + rp.set( + f'ims:interface_services:{k}', + json.dumps(v)) + rp.execute() + rp = r.pipeline() + + populate_poller_cache(interface_services, r) + + for service_type, services in services_by_type.items(): + for v in services.values(): + rp.set( + f'ims:services:{service_type}:{v["name"]}', + json.dumps({ + 'id': v['id'], + 'name': v['name'], + 'project': v['project'], + 'here': { + 'pop': { + 'name': v['pop_name'], + 'abbreviation': v['pop_abbreviation'] + }, + 'equipment': v['equipment'], + 'port': v['port'], + }, + 'there': { + 'pop': { + 'name': v['other_end_pop_name'], + 'abbreviation': v['other_end_pop_abbreviation'] + }, + 'equipment': v['other_end_equipment'], + 'port': v['other_end_port'], + }, + 'speed_value': v['calculated-speed'], + 'speed_unit': 'n/a', + 'status': v['status'], + 'type': v['service_type'] + })) + + rp.execute() + + +# new +@app.task(base=InventoryTask, bind=True, name='final_task') +@log_task_entry_and_exit +def final_task(self): + + _build_subnet_db(update_callback=self.log_info) + _build_snmp_peering_db(update_callback=self.log_info) + _build_juniper_peering_db(update_callback=self.log_info) + + latch_db(InventoryTask.config) + self.log_info('latched current/next dbs') diff --git a/test/test_worker.py b/test/test_worker.py index 4ee32c2d74be5ddd274991c9a78fcbd6a5e68dd0..2952b9ccaf5248aa9e0835995d1db3585a4150a7 100644 --- a/test/test_worker.py +++ b/test/test_worker.py @@ -1,4 +1,303 @@ +from inventory_provider.tasks import common +from inventory_provider.tasks.worker import transform_ims_data, \ + extract_ims_data, persist_ims_data -def test_place_holder(): - assert True +def test_extract_ims_data(mocker): + + mocker.patch( + 'inventory_provider.tasks.worker.InventoryTask.config' + ) + mocker.patch( + 'inventory_provider.tasks.worker.ims_data.get_node_locations', + return_value=[('loc_a', 'LOC A'), ('loc_b', 'LOC B')] + ) + mocker.patch( + 'inventory_provider.tasks.worker.ims_data.lookup_lg_routers', + return_value=['lg router 1', 'lg router 2'] + ) + mocker.patch( + 'inventory_provider.tasks.worker.ims_data.get_customer_service_emails', + return_value=[('123', 'CON A'), ('456', 'CON B')] + ) + mocker.patch( + 'inventory_provider.tasks.worker.ims_data.get_monitored_circuit_ids', + return_value=[123, 456, 789] + ) + mocker.patch( + 'inventory_provider.tasks.worker.ims_data.' + 'get_circuit_related_customer_ids', + return_value=[{'id a': ['A', 'A2']}, {'id b': ['B']}] + ) + mocker.patch( + 'inventory_provider.tasks.worker.ims_data.get_circuit_hierarchy', + return_value=[ + {'id': '1', 'value': 'A'}, + {'id': '2', 'value': 'B'} + ] + ) + mocker.patch( + 'inventory_provider.tasks.worker.ims_data.get_port_details', + return_value=[ + {'port_id': 'A', 'value': 'a'}, + {'port_id': 'B', 'value': 'b'}, + {'port_id': 'B', 'value': 'c'} + ] + ) + mocker.patch( + 'inventory_provider.tasks.worker.ims_data.get_port_id_services', + return_value=[ + {'port_a_id': '1', 'value': '1A'}, + {'port_a_id': '1', 'value': '1B'}, + {'port_a_id': '2', 'value': '2A'} + ] + ) + res = extract_ims_data() + assert res['locations'] == {'loc_a': 'LOC A', 'loc_b': 'LOC B'} + assert res['lg_routers'] == ['lg router 1', 'lg router 2'] + assert res['customer_contacts'] == {'123': 'CON A', '456': 'CON B'} + assert res['circuit_ids_to_monitor'] == [123, 456, 789] + assert res['additional_circuit_customer_ids'] == \ + [{'id a': ['A', 'A2']}, {'id b': ['B']}] + assert res['hierarchy'] == { + '1': {'id': '1', 'value': 'A'}, + '2': {'id': '2', 'value': 'B'} + } + assert res['port_id_details'] == { + 'A': [{'port_id': 'A', 'value': 'a'}], + 'B': [ + {'port_id': 'B', 'value': 'b'}, + {'port_id': 'B', 'value': 'c'} + ] + } + assert res['port_id_services'] == { + '1': [ + {'port_a_id': '1', 'value': '1A'}, + {'port_a_id': '1', 'value': '1B'} + ], + '2': [{'port_a_id': '2', 'value': '2A'}] + } + + +def test_transform_ims_data(): + locations = { + "eq_a": { + "pop": { + "name": "pop_loc_a", + "abbreviation": "pla", + } + }, + "eq_b": { + "pop": { + "name": "pop_loc_b", + "abbreviation": "plb", + } + }, + "UNKNOWN_LOC": { + "pop": { + "name": "UNKNOWN", + "abbreviation": "UNKNOWN", + } + } + } + + additional_circuit_customer_ids = { + "circ_id_1": "cu_1_1" + } + + customer_contacts = { + "cu_1": ["customer_1@a.org"], + "cu_1_1": ["customer_1_1@a.org"] + } + + port_id_details = { + "port_id_1": [{ + "equipment_name": "eq_a", + "interface_name": "if_a", + "port_id": "port_id_1" + }], + "port_id_2": [{ + "equipment_name": "eq_b", + "interface_name": "if_b", + "port_id": "port_id_2" + }] + } + + port_id_services = { + "port_id_1": [ + { + "id": "circ_id_1", + "customerid": "cu_1", + "circuit_type": "circuit", + "service_type": "ETHERNET", + "status": "operational", + "port_a_id": "port_id_1", + "port_b_id": "port_id_2", + + } + ], + "port_id_2": [ + { + "id": "circ_id_1", + "customerid": "cu_1", + "circuit_type": "circuit", + "service_type": "ETHERNET", + "status": "operational", + "port_a_id": "port_id_2", + "port_b_id": "port_id_1", + + } + ] + } + + hierarchy = { + "circ_id_1": { + "id": "circ_id_1", + "name": "circ_name_1", + "status": "operational", + "circuit-type": "circuit", + "product": "ethernet", + "speed": "not fibre_route", + "carrier-circuits": ["carrier_id_1"], + "sub-circuits": ["sub_circuit_1"], + "customerid": "cu_1", + }, + "carrier_id_1": { + "id": "carrier_id_1", + "name": "circ_carrier_name_1", + "status": "operational", + "circuit-type": "circuit", + "product": "ethernet", + "speed": "10G", + "carrier-circuits": ["carrier_id_2"], + "sub-circuits": ["circ_id_1"], + "customerid": "cu_1", + }, + "carrier_id_2": { + "id": "carrier_id_2", + "name": "circ_carrier_name_3", + "status": "operational", + "circuit-type": "circuit", + "product": "ethernet", + "speed": "not fibre_route", + "carrier-circuits": ["carrier_id_3"], + "sub-circuits": ["carrier_id_1"], + "customerid": "cu_1", + }, + "carrier_id_3": { + "id": "carrier_id_3", + "name": "Fiber Route Circuit", + "status": "operational", + "circuit-type": "circuit", + "product": "OCG4", + "speed": "fibre_route", + "carrier-circuits": [], + "sub-circuits": ["carrier_id_2"], + "customerid": "cu_1", + }, + "sub_circuit_1": { + "id": "sub_circuit_1", + "name": "sub_circuit_name_1", + "status": "operational", + "circuit-type": "circuit", + "product": "ethernet", + "speed": "not fibre_route", + "carrier-circuits": ["circ_id_1"], + "sub-circuits": ["sub_circuit_2"], + "customerid": "cu_1", + }, + "sub_circuit_2": { + "id": "sub_circuit_2", + "name": "sub_circuit_name_2", + "status": "operational", + "circuit-type": "service", + "product": "PEERING R & E", + "speed": "not fiber route", + "project": "Project A", + "carrier-circuits": ["sub_circuit_1"], + "sub-circuits": [], + "customerid": "cu_1", + } + } + data = { + "locations": locations, + "customer_contacts": customer_contacts, + "circuit_ids_to_monitor": [], + "additional_circuit_customer_ids": additional_circuit_customer_ids, + "hierarchy": hierarchy, + "port_id_details": port_id_details, + "port_id_services": port_id_services + } + res = transform_ims_data(data) + ifs = res["interface_services"] + assert list(ifs.keys()) == ["eq_a:if_a", "eq_b:if_b"] + for v in ifs.values(): + assert len(v) == 1 + assert len(v[0]["related-services"]) == 1 + assert v[0]["related-services"][0]["id"] == "sub_circuit_2" + assert len(v[0]["fibre-routes"]) == 1 + assert v[0]["fibre-routes"][0]["id"] == "carrier_id_3" + + +def test_persist_ims_data(mocker, data_config, mocked_redis): + + r = common._get_redis(data_config) + mocker.patch('inventory_provider.tasks.worker.get_next_redis', + return_value=r) + + data = { + "locations": {"loc_a": "LOC A", "loc_b": "LOC B"}, + "lg_routers": [ + {"equipment name": "lg_eq1"}, {"equipment name": "lg_eq2"} + ], + "hierarchy": {"c1": {"id": "123"}, "c2": {"id": "456"}}, + "interface_services": { + "if1": [ + { + "equipment": "eq1", + "port": "port1", + "id": "id1", + "name": "name1", + "service_type": "type1", + "status": "operational" + }, + { + "equipment": "eq1", + "port": "port2", + "id": "id3", + "name": "name2", + "service_type": "type2", + "status": "operational" + } + ], + "if2": [ + { + "equipment": "eq2", + "port": "port1", + "id": "id3", + "name": "name3", + "service_type": "type1", + "status": "operational" + } + ] + }, + "services_by_type": {}, + } + for k in r.keys("ims:*"): + r.delete(k) + persist_ims_data(data) + + assert [k.decode("utf-8") for k in r.keys("ims:location:*")] == \ + ["ims:location:loc_a", "ims:location:loc_b"] + + assert [k.decode("utf-8") for k in r.keys("ims:lg:*")] == \ + ["ims:lg:lg_eq1", "ims:lg:lg_eq2"] + + assert [k.decode("utf-8") for k in r.keys("ims:circuit_hierarchy:*")] == \ + ["ims:circuit_hierarchy:123", "ims:circuit_hierarchy:456"] + + assert [k.decode("utf-8") for k in r.keys("ims:interface_services:*")] == \ + ["ims:interface_services:if1", "ims:interface_services:if2"] + + assert [k.decode("utf-8") for k in r.keys("poller_cache:*")] == \ + ["poller_cache:eq1", "poller_cache:eq2"]