diff --git a/Changelog.md b/Changelog.md index 54fdd0062b3e388997a6ebcc83f736e3afbda265..4746dd1a83b03ab3828e6677e0473fa4aaab45b3 100644 --- a/Changelog.md +++ b/Changelog.md @@ -2,6 +2,10 @@ All notable changes to this project will be documented in this file. +## [0.78] - 2022-02-10 +- DBOARD3-513: Enabled Chorded update +- POL1-530: Added msr/services endpoint + ## [0.77] - 2021-12-03 - DBOARD3-493: added /neteng/location/equipment-name diff --git a/inventory_provider/db/ims_data.py b/inventory_provider/db/ims_data.py index e3a59750d9cb1b30336eb95494ebc8b14f3b4b51..e918835c2ee01fb560d92e84a2f9fa89d97ca174 100644 --- a/inventory_provider/db/ims_data.py +++ b/inventory_provider/db/ims_data.py @@ -84,6 +84,15 @@ def get_monitored_circuit_ids(ds: IMS): yield d['extrafieldvalueobjectinfo']['objectid'] +def get_ids_and_sids(ds: IMS): + for sid_circuit in ds.get_filtered_entities( + 'ExtraFieldValue', + 'extrafieldid == 3209 | value <> ""', + step_count=10000 + ): + yield sid_circuit['objectid'], sid_circuit['value'] + + def get_service_types(ds: IMS): for d in ds.get_filtered_entities( 'ComboBoxData', @@ -194,6 +203,7 @@ def get_port_id_services(ds: IMS): 'circuit_type': circuit['circuit_type'], 'service_type': products[circuit['productid']], 'project': customers[circuit['customerid']], + 'customer': customers[circuit['customerid']], 'customerid': circuit['customerid'] } ports = [] @@ -239,6 +249,7 @@ def get_port_id_services(ds: IMS): 'circuit_type': _get_circuit_type(circuit), 'service_type': products[circuit['productid']], 'project': customers[circuit['customerid']], + 'customer': customers[circuit['customerid']], 'customerid': circuit['customerid'], 'port_a_id': portrelate.get( 'portid', @@ -465,3 +476,11 @@ def lookup_lg_routers(ds: IMS): } } yield eq + + +def lookup_geant_nodes(ds: IMS): + + return (n["name"]for n in ds.get_filtered_entities( + 'Node', + 'customer.Name == "GEANT"', + ims.EQUIP_DEF_PROPERTIES['Nodes'])) diff --git a/inventory_provider/routes/classifier.py b/inventory_provider/routes/classifier.py index f66284e7e92e40a82199fea768a590a63282962e..bc5ffed3e78cb15dbcef5e7cb29af95e5793c569 100644 --- a/inventory_provider/routes/classifier.py +++ b/inventory_provider/routes/classifier.py @@ -17,12 +17,6 @@ These endpoints are intended for use by Dashboard V3. .. autofunction:: inventory_provider.routes.classifier.get_juniper_link_info -/classifier/infinera-lambda-info --------------------------------- - -.. autofunction:: inventory_provider.routes.classifier.get_infinera_lambda_info - - /classifier/infinera-fiberlink-info ------------------------------------ diff --git a/inventory_provider/routes/jobs.py b/inventory_provider/routes/jobs.py index b33476fb1d031b4133585eb204c2ca587b466b59..44e7e8ec9fb7eee700b6de2a4b1ed086bc5a7ee9 100644 --- a/inventory_provider/routes/jobs.py +++ b/inventory_provider/routes/jobs.py @@ -108,13 +108,12 @@ def after_request(resp): return common.after_request(resp) -@routes.route("/update", methods=['GET', 'POST']) -@common.require_accepts_json +@routes.route("update", methods=['GET', 'POST']) def update(): """ Handler for `/jobs/update`. - This resource updates the inventory network data for juniper devices. + This resource updates the inventory network data. The function completes asynchronously and a list of outstanding task id's is returned so the caller can use `/jobs/check-task-status` to determine when all jobs @@ -135,17 +134,15 @@ def update(): response='an update is already in progress', status=503, mimetype="text/html") - - phase2_task_id = worker.launch_refresh_cache_all(config) - - r.set('classifier-cache:update-task-id', phase2_task_id.encode('utf-8')) - return jsonify({'task id': phase2_task_id}) + update_task_id = worker.update_entry_point.delay().get() + r.set('classifier-cache:update-task-id', update_task_id.encode('utf-8')) + return jsonify({'task id': update_task_id}) @routes.route("reload-router-config/<equipment_name>", methods=['GET', 'POST']) @common.require_accepts_json def reload_router_config(equipment_name): - result = worker.reload_router_config.delay(equipment_name) + result = worker.reload_router_config_chorded.delay(equipment_name) return jsonify({'task id': result.id}) diff --git a/inventory_provider/routes/msr.py b/inventory_provider/routes/msr.py index 00cb88c7108ba2d80248265434614c7dfc8834f7..09c459a3391fe1fae9d39f69cd2dba2a233b15da 100644 --- a/inventory_provider/routes/msr.py +++ b/inventory_provider/routes/msr.py @@ -7,7 +7,7 @@ These endpoints are intended for use by MSR. .. contents:: :local: -/poller/access-services +/msr/access-services --------------------------------- .. autofunction::inventory_provider.routes.msr.get_access_services @@ -49,11 +49,17 @@ These endpoints are intended for use by MSR. .. autofunction:: inventory_provider.routes.msr.get_peering_routing_instances -/msr/bgp/routing-instance-peeringns</name> +/msr/bgp/routing-instance-peerings</name> -------------------------------------------- .. autofunction:: inventory_provider.routes.msr.bgp_routing_instance_peerings +/msr/services +-------------------------------------------- + +.. autofunction:: inventory_provider.routes.msr.get_system_correlation_services + + helpers ------------------------------------- @@ -71,6 +77,8 @@ import ipaddress import logging import re import threading +from collections import defaultdict +from typing import Dict from flask import Blueprint, Response, request, current_app import jsonschema @@ -78,7 +86,8 @@ import jsonschema from inventory_provider.routes import common from inventory_provider.routes.classifier import \ get_ims_equipment_name, get_ims_interface, get_interface_services_and_loc -from inventory_provider.routes.common import _ignore_cache_or_retrieve +from inventory_provider.routes.common import _ignore_cache_or_retrieve, \ + ims_equipment_to_hostname from inventory_provider.routes.poller import get_services from inventory_provider.tasks import common as tasks_common @@ -172,6 +181,74 @@ PEERING_ADDRESS_SERVICES_LIST = { 'items': {'$ref': '#/definitions/address-service-info'} } +SYSTEM_CORRELATION_SERVICES_LIST_SCHEMA = { + '$schema': 'http://json-schema.org/draft-07/schema#', + + 'definitions': { + 'v4-network': {'type': 'string'}, # TODO: can this be better? + 'v6-network': {'type': 'string'}, # TODO: can this be better? + 'ip-endpoint': { + 'type': 'object', + 'properties': { + 'hostname': {'type': 'string'}, + 'interface': {'type': 'string'}, + 'addresses': { + 'type': 'object', + 'properties': { + 'v4': {'$ref': '#/definitions/v4-network'}, + 'v6': {'$ref': '#/definitions/v6-network'} + }, + # 'required': ['v4', 'v6'], # TODO: always require both? + 'additionalProperties': False + } + }, + 'required': ['hostname', 'interface'], + 'additionalProperties': False + }, + 'optical-endpoint': { + 'type': 'object', + 'properties': { + 'equipment': {'type': 'string'}, + 'port': {'type': 'string'} + }, + 'required': ['equipment', 'port'], + 'additionalProperties': False + }, + 'endpoints': { + 'type': 'array', + 'items': { + 'oneOf': [ + {'$ref': '#/definitions/ip-endpoint'}, + {'$ref': '#/definitions/optical-endpoint'} + ] + }, + 'minItems': 1 + }, + 'service': { + 'type': 'object', + 'properties': { + 'circuit_id': {'type': 'integer'}, + 'sid': {'type': 'string'}, + 'name': {'type': 'string'}, + 'speed': {'type': 'integer'}, + 'circuit_type': {'type': 'string'}, # TODO: remove this? + 'service_type': {'type': 'string'}, # TODO: enum? + 'project': {'type': 'string'}, # TODO: remove this? + 'customer': {'type': 'string'}, + 'endpoints': {'$ref': '#/definitions/endpoints'} + }, + 'required': [ + 'circuit_id', 'sid', 'name', 'speed', + # 'circuit_type', 'project', # TODO: keeping these?!? + 'service_type', 'customer', 'endpoints'], + 'additionalProperties': False + } + }, + 'type': 'array', + 'items': {'$ref': '#/definitions/service'}, + 'minItems': 1 # otherwise the route should return 404 +} + @routes.after_request def after_request(resp): @@ -655,3 +732,119 @@ def get_peering_services(): mimetype="text/html") return Response(response, mimetype="application/json") + + +@routes.route('/services', methods=['GET', 'POST']) +@common.require_accepts_json +def get_system_correlation_services(): + """ + Handler for `/msr/services` + + This method returns all known services with with information required + by the reporting tool stack. + + cf. https://jira.software.geant.org/browse/POL1-530 + + The response will be formatted as follows: + + .. asjson:: + inventory_provider.routes.msr.SYSTEM_CORRELATION_SERVICES_LIST_SCHEMA + + :return: + """ + + cache_key = 'classifier-cache:msr:services' + + r = common.get_current_redis() + response = _ignore_cache_or_retrieve(request, cache_key, r) + if not response: + peering_info = defaultdict(defaultdict) + + key_pattern = 'netconf-interfaces:*' + + host_if_extraction_re = re.compile( + r'^netconf-interfaces:(.+?):') + for doc in common.load_json_docs( + config_params=current_app.config['INVENTORY_PROVIDER_CONFIG'], + key_pattern=key_pattern, + num_threads=20): + matches = host_if_extraction_re.match(doc['key']) + if matches: + peering_info[matches[1]][doc['value']['name']] = doc['value'] + + logical_interface_re = re.compile(r'.*\.\d+$') + + def _ip_endpoint_extractor(endpoint_details: Dict): + if logical_interface_re.match(endpoint_details['port']): + hostname = ims_equipment_to_hostname( + endpoint_details['equipment']) + interface = endpoint_details['port'].lower() + + ip_endpoint = { + 'hostname': hostname, + 'interface': interface, + } + addresses = {} + host_info = peering_info.get(hostname, {}) + interface_info = host_info.get(interface, {}) + ipv4 = interface_info.get('ipv4') + ipv6 = interface_info.get('ipv6') + if ipv4: + addresses['v4'] = ipv4[0] + if ipv6: + addresses['v6'] = ipv6[0] + if ipv4 or ipv6: + ip_endpoint['addresses'] = addresses + + return ip_endpoint + + def _optical_endpoint_extractor(endpoint_details: Dict): + return { + 'equipment': endpoint_details['equipment'], + 'port': endpoint_details['port'] + } + + def _endpoint_extractor(endpoint_details: Dict): + if not endpoint_details['geant_equipment']: + return + potential_hostname = ims_equipment_to_hostname( + endpoint_details['equipment']) + if potential_hostname in peering_info.keys(): + return _ip_endpoint_extractor(endpoint_details) + else: + return _optical_endpoint_extractor(endpoint_details) + + sid_services = json.loads(r.get('ims:sid_services').decode('utf-8')) + + response = [] + for sid, details in sid_services.items(): + service_info = {'endpoints': []} + for d in details: + if not service_info.get('sid'): + service_info['circuit_id'] = d['circuit_id'] + service_info['sid'] = d['sid'] + service_info['name'] = d['name'] + service_info['speed'] = d['speed'] + service_info['service_type'] = d['service_type'] + service_info['customer'] = d['customer'] + + endpoint = _endpoint_extractor(d) + if endpoint: + service_info['endpoints'].append(endpoint) + if service_info.get('endpoints'): + response.append(service_info) + + jsonschema.validate(response, + SYSTEM_CORRELATION_SERVICES_LIST_SCHEMA) + + if response: + response = json.dumps(response, indent=2) + r.set(cache_key, response.encode('utf-8')) + + if not response: + return Response( + response='no services found', + status=404, + mimetype="text/html") + + return Response(response, mimetype="application/json") diff --git a/inventory_provider/routes/testing.py b/inventory_provider/routes/testing.py index 8240efe5e92438674a881339b140f398f25b5695..f81982283885785da1dfaad497ba8e8ed1899f1c 100644 --- a/inventory_provider/routes/testing.py +++ b/inventory_provider/routes/testing.py @@ -18,12 +18,6 @@ routes = Blueprint("inventory-data-testing-support-routes", __name__) logger = logging.getLogger(__name__) -@routes.route("chord-update", methods=['GET', 'POST']) -def chord_update(): - r = worker.update_entry_point.delay().get() - return jsonify(r) - - @routes.route("flushdb", methods=['GET', 'POST']) def flushdb(): common.get_current_redis().flushdb() diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index fa583a3ec3e68f595ced6035f5a04230b0a5d24d..cdcbdcbed2c18f9557645b62cd95e144ddf13b7b 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -4,19 +4,15 @@ import json import logging import os import re -import threading -import time from typing import List -from redis.exceptions import RedisError -from kombu.exceptions import KombuError - from celery import Task, states, chord from celery.result import AsyncResult from collections import defaultdict + +from kombu.exceptions import KombuError from lxml import etree -import jsonschema from inventory_provider.db import ims_data from inventory_provider.db.ims import IMS @@ -36,6 +32,7 @@ from inventory_provider import config from inventory_provider import environment from inventory_provider import snmp from inventory_provider import juniper +from redis import RedisError FINALIZER_POLLING_FREQUENCY_S = 2.5 FINALIZER_TIMEOUT_S = 300 @@ -100,109 +97,6 @@ class InventoryTask(Task): self.send_event('task-error', message=message) -@app.task(base=InventoryTask, bind=True, name='snmp_refresh_peerings') -@log_task_entry_and_exit -def snmp_refresh_peerings(self, hostname, community, logical_systems): - try: - peerings = list( - snmp.get_peer_state_info(hostname, community, logical_systems)) - except ConnectionError: - msg = f'error loading snmp peering data from {hostname}' - logger.exception(msg) - self.log_warning(msg) - r = get_current_redis(InventoryTask.config) - peerings = r.get(f'snmp-peerings:hosts:{hostname}') - if peerings is None: - raise InventoryTaskError( - f'snmp error with {peerings}' - f' and no cached peering data found') - # unnecessary json encode/decode here ... could be optimized - peerings = json.loads(peerings.decode('utf-8')) - self.log_warning(f'using cached snmp peering data for {hostname}') - - r = get_next_redis(InventoryTask.config) - r.set(f'snmp-peerings:hosts:{hostname}', json.dumps(peerings)) - - self.log_info(f'snmp peering info loaded from {hostname}') - - -@app.task(base=InventoryTask, bind=True, name='snmp_refresh_interfaces') -@log_task_entry_and_exit -def snmp_refresh_interfaces(self, hostname, community, logical_systems): - try: - interfaces = list( - snmp.get_router_snmp_indexes(hostname, community, logical_systems)) - except ConnectionError: - msg = f'error loading snmp interface data from {hostname}' - logger.exception(msg) - self.log_warning(msg) - r = get_current_redis(InventoryTask.config) - interfaces = r.get(f'snmp-interfaces:{hostname}') - if not interfaces: - raise InventoryTaskError( - f'snmp error with {hostname}' - f' and no cached snmp interface data found') - # unnecessary json encode/decode here ... could be optimized - interfaces = json.loads(interfaces.decode('utf-8')) - self.log_warning(f'using cached snmp interface data for {hostname}') - - r = get_next_redis(InventoryTask.config) - - rp = r.pipeline() - rp.set(f'snmp-interfaces:{hostname}', json.dumps(interfaces)) - - # optimization for DBOARD3-372 - # interfaces is a list of dicts like: {'name': str, 'index': int} - for ifc in interfaces: - ifc['hostname'] = hostname - rp.set( - f'snmp-interfaces-single:{hostname}:{ifc["name"]}', - json.dumps(ifc)) - - rp.execute() - - self.log_info(f'snmp interface info loaded from {hostname}') - - -@app.task(base=InventoryTask, bind=True, name='netconf_refresh_config') -@log_task_entry_and_exit -def netconf_refresh_config(self, hostname, lab=False): - """ - load netconf and save - - save under 'lab:...' if lab is true - - :param self: - :param hostname: - :param lab: - :return: - """ - - redis_key = f'netconf:{hostname}' - if lab: - redis_key = f'lab:{redis_key}' - - try: - netconf_doc = juniper.load_config( - hostname, InventoryTask.config["ssh"]) - netconf_str = etree.tostring(netconf_doc, encoding='unicode') - except (ConnectionError, juniper.NetconfHandlingError): - msg = f'error loading netconf data from {hostname}' - logger.exception(msg) - self.log_warning(msg) - r = get_current_redis(InventoryTask.config) - netconf_str = r.get(redis_key) - if not netconf_str: - raise InventoryTaskError( - f'netconf error with {hostname}' - f' and no cached netconf data found') - self.log_warning(f'using cached netconf data for {hostname}') - - r = get_next_redis(InventoryTask.config) - r.set(redis_key, netconf_str) - self.log_info(f'netconf info loaded from {hostname}') - - def _unmanaged_interfaces(): def _convert(d): @@ -220,75 +114,6 @@ def _unmanaged_interfaces(): InventoryTask.config.get('unmanaged-interfaces', [])) -@app.task(base=InventoryTask, bind=True, - name='update_neteng_managed_device_list') -@log_task_entry_and_exit -def update_neteng_managed_device_list(self): - self.log_info('querying netdash for managed routers') - - routers = list(juniper.load_routers_from_netdash( - InventoryTask.config['managed-routers'])) - - self.log_info(f'found {len(routers)} routers, saving details') - - r = get_next_redis(InventoryTask.config) - r.set('netdash', json.dumps(routers).encode('utf-8')) - self.log_info(f'saved {len(routers)} managed routers') - - -def load_netconf_data(hostname, lab=False): - """ - this method should only be called from a task - - :param hostname: - :param lab: True to look up under 'lab:...' - :return: - """ - redis_key = f'netconf:{hostname}' - if lab: - redis_key = f'lab:{redis_key}' - - r = get_next_redis(InventoryTask.config) - netconf = r.get(redis_key) - if not netconf: - raise InventoryTaskError('no netconf data found for %r' % hostname) - return etree.fromstring(netconf.decode('utf-8')) - - -def clear_cached_classifier_responses(hostname=None): - if hostname: - logger.debug( - 'removing cached classifier responses for %r' % hostname) - else: - logger.debug('removing all cached classifier responses') - - r = get_next_redis(InventoryTask.config) - - def _hostname_keys(): - for k in r.keys('classifier-cache:juniper:%s:*' % hostname): - yield k - - # TODO: very inefficient ... but logically simplest at this point - for k in r.keys('classifier-cache:peer:*'): - value = r.get(k.decode('utf-8')) - if not value: - # deleted in another thread - continue - value = json.loads(value.decode('utf-8')) - interfaces = value.get('interfaces', []) - if hostname in [i['interface']['router'] for i in interfaces]: - yield k - - def _all_keys(): - return r.keys('classifier-cache:*') - - keys_to_delete = _hostname_keys() if hostname else _all_keys() - rp = r.pipeline() - for k in keys_to_delete: - rp.delete(k) - rp.execute() - - @log_task_entry_and_exit def refresh_juniper_bgp_peers(hostname, netconf): host_peerings = list(juniper.all_bgp_peers(netconf)) @@ -357,93 +182,6 @@ def refresh_juniper_interface_list(hostname, netconf, lab=False): rp.execute() -@app.task(base=InventoryTask, bind=True, name='reload_lab_router_config') -@log_task_entry_and_exit -def reload_lab_router_config(self, hostname): - self.log_info(f'loading netconf data for lab {hostname}') - - # load new netconf data, in this thread - netconf_refresh_config.apply(args=[hostname, True]) - - netconf_doc = load_netconf_data(hostname, lab=True) - - refresh_juniper_interface_list(hostname, netconf_doc, lab=True) - - # load snmp indexes - community = juniper.snmp_community_string(netconf_doc) - if not community: - raise InventoryTaskError( - f'error extracting community string for {hostname}') - else: - self.log_info(f'refreshing snmp interface indexes for {hostname}') - logical_systems = juniper.logical_systems(netconf_doc) - - # load snmp data, in this thread - snmp_refresh_interfaces.apply( - args=[hostname, community, logical_systems]) - - self.log_info(f'updated configuration for lab {hostname}') - - -@app.task(base=InventoryTask, bind=True, name='reload_router_config') -@log_task_entry_and_exit -def reload_router_config(self, hostname): - self.log_info(f'loading netconf data for {hostname}') - - # get the timestamp for the current netconf data - current_netconf_timestamp = None - try: - netconf_doc = load_netconf_data(hostname) - current_netconf_timestamp \ - = juniper.netconf_changed_timestamp(netconf_doc) - logger.debug( - 'current netconf timestamp: %r' % current_netconf_timestamp) - except InventoryTaskError: - # NOTE: should always reach here, - # since we always erase everything before starting - pass # ok at this point if not found - - # load new netconf data, in this thread - netconf_refresh_config.apply(args=[hostname, False]) - - netconf_doc = load_netconf_data(hostname) - - # return if new timestamp is the same as the original timestamp - new_netconf_timestamp = juniper.netconf_changed_timestamp(netconf_doc) - assert new_netconf_timestamp, \ - 'no timestamp available for new netconf data' - if new_netconf_timestamp == current_netconf_timestamp: - msg = f'no timestamp change for {hostname} netconf data' - logger.debug(msg) - self.log_info(msg) - return - - # clear cached classifier responses for this router, and - # refresh peering data - self.log_info(f'refreshing peers & clearing cache for {hostname}') - refresh_juniper_bgp_peers(hostname, netconf_doc) - refresh_juniper_interface_list(hostname, netconf_doc) - - # load snmp indexes - community = juniper.snmp_community_string(netconf_doc) - if not community: - raise InventoryTaskError( - f'error extracting community string for {hostname}') - else: - self.log_info(f'refreshing snmp interface indexes for {hostname}') - logical_systems = juniper.logical_systems(netconf_doc) - - # load snmp data, in this thread - snmp_refresh_interfaces.apply( - args=[hostname, community, logical_systems]) - snmp_refresh_peerings.apply( - args=[hostname, community, logical_systems]) - - clear_cached_classifier_responses(None) - self.log_info(f'updated configuration for {hostname}') - - -# updated with transaction def _erase_next_db(config): """ flush next db, but first save latch and then restore afterwards @@ -480,77 +218,6 @@ def _erase_next_db(config): update_timestamp=saved_latch.get('update-started', 0)) -@log_task_entry_and_exit -def launch_refresh_cache_all(config): - """ - utility function intended to be called outside of the worker process - :param config: config structure as defined in config.py - :return: - """ - - try: - _erase_next_db(config) - update_latch_status(config, pending=True) - - monitor.clear_joblog(get_current_redis(config)) - - # first batch of subtasks: refresh cached IMS location data - subtasks = [ - update_neteng_managed_device_list.apply_async(), - update_equipment_locations.apply_async(), - update_lg_routers.apply_async(), - ] - [x.get() for x in subtasks] - - # now launch the task whose only purpose is to - # act as a convenient parent for all of the remaining tasks - t = internal_refresh_phase_2.apply_async() - return t.id - - except (RedisError, KombuError): - update_latch_status(config, pending=False, failure=True) - logger.exception('error launching refresh subtasks') - raise - - -@app.task(base=InventoryTask, bind=True, name='internal_refresh_phase_2') -@log_task_entry_and_exit -def internal_refresh_phase_2(self): - # second batch of subtasks: - # ims circuit information - try: - - subtasks = [ - update_circuit_hierarchy_and_port_id_services.apply_async() - ] - - r = get_next_redis(InventoryTask.config) - routers = r.get('netdash') - assert routers - netdash_equipment = json.loads(routers.decode('utf-8')) - for hostname in netdash_equipment: - logger.debug(f'queueing router refresh jobs for {hostname}') - subtasks.append(reload_router_config.apply_async(args=[hostname])) - - lab_routers = InventoryTask.config.get('lab-routers', []) - for hostname in lab_routers: - logger.debug('queueing router refresh jobs for lab %r' % hostname) - subtasks.append( - reload_lab_router_config.apply_async(args=[hostname])) - - pending_task_ids = [x.id for x in subtasks] - - refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)]) - - except KombuError: - # TODO: possible race condition here - # e.g. if one of these tasks takes a long time and another - # update is started, we could end up with strange data - update_latch_status(config, pending=False, failure=True) - logger.exception('error launching refresh phase 2 subtasks') - raise - - def populate_poller_cache(interface_services, r): host_services = defaultdict(dict) for v in interface_services.values(): @@ -575,420 +242,6 @@ def populate_poller_cache(interface_services, r): rp.execute() -@app.task( - base=InventoryTask, - bind=True, - name='update_circuit_hierarchy_and_port_id_services') -@log_task_entry_and_exit -def update_circuit_hierarchy_and_port_id_services(self, use_current=False): - c = InventoryTask.config["ims"] - ds1 = IMS(c['api'], c['username'], c['password']) - ds2 = IMS(c['api'], c['username'], c['password']) - ds3 = IMS(c['api'], c['username'], c['password']) - - locations = {k: v for k, v in ims_data.get_node_locations(ds1)} - tls_names = list(ims_data.get_service_types(ds1)) - customer_contacts = \ - {k: v for k, v in ims_data.get_customer_service_emails(ds1)} - circuit_ids_to_monitor = \ - list(ims_data.get_monitored_circuit_ids(ds1)) - additional_circuit_customer_ids = \ - ims_data.get_circuit_related_customer_ids(ds1) - - hierarchy = None - port_id_details = defaultdict(list) - port_id_services = defaultdict(list) - interface_services = defaultdict(list) - services_by_type = {} - - def _convert_to_bits(value, unit): - unit = unit.lower() - conversions = { - 'm': 1 << 20, - 'mb': 1 << 20, - 'g': 1 << 30, - 'gbe': 1 << 30, - } - return int(value) * conversions[unit] - - def _get_speed(circuit_id): - c = hierarchy[circuit_id] - if c['status'] != 'operational': - return 0 - pattern = re.compile(r'^(\d+)([a-zA-z]+)$') - m = pattern.match(c['speed']) - if m: - try: - return _convert_to_bits(m[1], m[2]) - except KeyError as e: - logger.debug(f'Could not find key: {e} ' - f'for circuit: {circuit_id}') - return 0 - else: - if c['circuit-type'] == 'service' \ - or c['product'].lower() == 'ethernet': - return sum( - (_get_speed(x) for x in c['carrier-circuits']) - ) - else: - return 0 - - def _get_circuit_contacts(c): - customer_ids = {c['customerid']} - customer_ids.update(additional_circuit_customer_ids.get(c['id'], [])) - return set().union( - *[customer_contacts.get(cid, []) for cid in customer_ids]) - - def _populate_hierarchy(): - nonlocal hierarchy - hierarchy = {} - for d in ims_data.get_circuit_hierarchy(ds1): - hierarchy[d['id']] = d - d['contacts'] = sorted(list(_get_circuit_contacts(d))) - logger.debug("hierarchy complete") - - def _populate_port_id_details(): - nonlocal port_id_details - for x in ims_data.get_port_details(ds2): - pd = port_id_details[x['port_id']] - pd.append(x) - logger.debug("Port details complete") - - def _populate_circuit_info(): - for x in ims_data.get_port_id_services(ds3): - port_id_services[x['port_a_id']].append(x) - logger.debug("port circuits complete") - - hierarchy_thread = threading.Thread(target=_populate_hierarchy) - hierarchy_thread.start() - - port_id_details_thread = threading.Thread(target=_populate_port_id_details) - port_id_details_thread.start() - - circuit_info_thread = threading.Thread(target=_populate_circuit_info) - circuit_info_thread.start() - - hierarchy_thread.join() - circuit_info_thread.join() - port_id_details_thread.join() - - def _get_fibre_routes(c_id): - _circ = hierarchy.get(c_id, None) - if _circ is None: - return - if _circ['speed'].lower() == 'fibre_route': - yield _circ['id'] - else: - for cc in _circ['carrier-circuits']: - yield from _get_fibre_routes(cc) - - # whilst this is a service type the top level for reporting - # are the BGP services on top of it - tls_names.remove('IP PEERING - NON R&E (PUBLIC)') - - def _get_related_services(circuit_id: str) -> List[dict]: - rs = {} - c = hierarchy.get(circuit_id, None) - if c: - - if c['circuit-type'] == 'service': - rs[c['id']] = { - 'id': c['id'], - 'name': c['name'], - 'circuit_type': c['circuit-type'], - 'service_type': c['product'], - 'project': c['project'], - 'contacts': c['contacts'] - } - if c['id'] in circuit_ids_to_monitor: - rs[c['id']]['status'] = c['status'] - else: - rs[c['id']]['status'] = 'non-monitored' - - if c['sub-circuits']: - for sub in c['sub-circuits']: - temp_parents = \ - _get_related_services(sub) - rs.update({t['id']: t for t in temp_parents}) - return list(rs.values()) - - def _format_service(s): - - if s['circuit_type'] == 'service' \ - and s['id'] not in circuit_ids_to_monitor: - s['status'] = 'non-monitored' - pd_a = port_id_details[s['port_a_id']][0] - location_a = locations.get(pd_a['equipment_name'], None) - if location_a: - loc_a = location_a['pop'] - else: - loc_a = locations['UNKNOWN_LOC']['pop'] - logger.warning( - f'Unable to find location for {pd_a["equipment_name"]} - ' - f'Service ID {s["id"]}') - s['pop_name'] = loc_a['name'] - s['pop_abbreviation'] = loc_a['abbreviation'] - s['equipment'] = pd_a['equipment_name'] - s['card_id'] = '' # this is redundant I believe - s['port'] = pd_a['interface_name'] - s['logical_unit'] = '' # this is redundant I believe - if 'port_b_id' in s: - pd_b = port_id_details[s['port_b_id']][0] - location_b = locations.get(pd_b['equipment_name'], None) - if location_b: - loc_b = location_b['pop'] - else: - loc_b = locations['UNKNOWN_LOC']['pop'] - logger.warning( - f'Unable to find location for {pd_b["equipment_name"]} - ' - f'Service ID {s["id"]}') - - s['other_end_pop_name'] = loc_b['name'] - s['other_end_pop_abbreviation'] = loc_b['abbreviation'] - s['other_end_equipment'] = pd_b['equipment_name'] - s['other_end_port'] = pd_b['interface_name'] - else: - s['other_end_pop_name'] = '' - s['other_end_pop_abbreviation'] = '' - s['other_end_equipment'] = '' - s['other_end_port'] = '' - - s.pop('port_a_id', None) - s.pop('port_b_id', None) - - # using a dict to ensure no duplicates - node_pair_services = defaultdict(dict) - - for key, value in port_id_details.items(): - for details in value: - k = f"{details['equipment_name']}:" \ - f"{details['interface_name']}" - circuits = port_id_services.get(details['port_id'], []) - - for circ in circuits: - contacts = _get_circuit_contacts(circ) - circ['fibre-routes'] = [] - for x in set(_get_fibre_routes(circ['id'])): - c = { - 'id': hierarchy[x]['id'], - 'name': hierarchy[x]['name'], - 'status': hierarchy[x]['status'] - } - circ['fibre-routes'].append(c) - - circ['related-services'] = \ - _get_related_services(circ['id']) - - for tlc in circ['related-services']: - contacts.update(tlc.pop('contacts')) - circ['contacts'] = sorted(list(contacts)) - - circ['calculated-speed'] = _get_speed(circ['id']) - _format_service(circ) - - type_services = services_by_type.setdefault( - ims_sorted_service_type_key(circ['service_type']), dict()) - type_services[circ['id']] = circ - if circ['other_end_equipment']: - node_pair_services[ - f"{circ['equipment']}/{circ['other_end_equipment']}" - ][circ['id']] = circ - - interface_services[k].extend(circuits) - - if use_current: - r = get_current_redis(InventoryTask.config) - else: - r = get_next_redis(InventoryTask.config) - rp = r.pipeline() - for k in r.scan_iter('ims:circuit_hierarchy:*', count=1000): - rp.delete(k) - rp.execute() - rp = r.pipeline() - for k in r.scan_iter('ims:interface_services:*', count=1000): - rp.delete(k) - rp.execute() - rp = r.pipeline() - for k in r.scan_iter('ims:node_pair_services:*', count=1000): - rp.delete(k) - rp.execute() - rp = r.pipeline() - for k in r.scan_iter('ims:access_services:*', count=1000): - rp.delete(k) - for k in r.scan_iter('ims:gws_indirect:*', count=1000): - rp.delete(k) - rp.execute() - rp = r.pipeline() - for circ in hierarchy.values(): - rp.set(f'ims:circuit_hierarchy:{circ["id"]}', json.dumps([circ])) - rp.execute() - rp = r.pipeline() - for k, v in interface_services.items(): - rp.set( - f'ims:interface_services:{k}', - json.dumps(v)) - rp.execute() - rp = r.pipeline() - for k, v in node_pair_services.items(): - rp.set( - f'ims:node_pair_services:{k}', - json.dumps(list(v.values()))) - rp.execute() - rp = r.pipeline() - - populate_poller_cache(interface_services, r) - - for service_type, services in services_by_type.items(): - for v in services.values(): - rp.set( - f'ims:services:{service_type}:{v["name"]}', - json.dumps({ - 'id': v['id'], - 'name': v['name'], - 'project': v['project'], - 'here': { - 'pop': { - 'name': v['pop_name'], - 'abbreviation': v['pop_abbreviation'] - }, - 'equipment': v['equipment'], - 'port': v['port'], - }, - 'there': { - 'pop': { - 'name': v['other_end_pop_name'], - 'abbreviation': v['other_end_pop_abbreviation'] - }, - 'equipment': v['other_end_equipment'], - 'port': v['other_end_port'], - }, - 'speed_value': v['calculated-speed'], - 'speed_unit': 'n/a', - 'status': v['status'], - 'type': v['service_type'] - })) - - rp.execute() - - -@app.task(base=InventoryTask, bind=True, name='update_equipment_locations') -@log_task_entry_and_exit -def update_equipment_locations(self, use_current=False): - if use_current: - r = get_current_redis(InventoryTask.config) - else: - r = get_next_redis(InventoryTask.config) - rp = r.pipeline() - for k in r.scan_iter('ims:location:*', count=1000): - rp.delete(k) - rp.execute() - - c = InventoryTask.config["ims"] - ds = IMS(c['api'], c['username'], c['password']) - - rp = r.pipeline() - hostnames_found = set() - for h, d in ims_data.get_node_locations(ds): - # put into a list to match non-IMS version - rp.set(f'ims:location:{h}', json.dumps([d])) - if h in hostnames_found: - logger.debug(f'Multiple entries for {h}') - hostnames_found.add(h) - rp.execute() - - -@app.task(base=InventoryTask, bind=True, name='update_lg_routers') -@log_task_entry_and_exit -def update_lg_routers(self, use_current=False): - if use_current: - r = get_current_redis(InventoryTask.config) - for k in r.scan_iter('ims:lg:*'): - r.delete(k) - else: - r = get_next_redis(InventoryTask.config) - - for k in r.scan_iter('ims:lg:*'): - r.delete(k) - c = InventoryTask.config["ims"] - ds = IMS(c['api'], c['username'], c['password']) - - for router in ims_data.lookup_lg_routers(ds): - r.set(f'ims:lg:{router["equipment name"]}', json.dumps(router)) - - -def _wait_for_tasks(task_ids, update_callback=lambda s: None): - - all_successful = True - - start_time = time.time() - while task_ids and time.time() - start_time < FINALIZER_TIMEOUT_S: - update_callback('waiting for tasks to complete: %r' % task_ids) - time.sleep(FINALIZER_POLLING_FREQUENCY_S) - - def _task_and_children_result(id): - tasks = list(check_task_status(id)) - return { - 'error': any([t['ready'] and not t['success'] for t in tasks]), - 'ready': all([t['ready'] for t in tasks]) - } - - results = dict([ - (id, _task_and_children_result(id)) - for id in task_ids]) - - if any([t['error'] for t in results.values()]): - all_successful = False - - task_ids = [ - id for id, status in results.items() - if not status['ready']] - - if task_ids: - raise InventoryTaskError( - 'timeout waiting for pending tasks to complete') - if not all_successful: - raise InventoryTaskError( - 'some tasks finished with an error') - - update_callback('pending tasks completed in {} seconds'.format( - time.time() - start_time)) - - -@app.task(base=InventoryTask, bind=True, name='refresh_finalizer') -@log_task_entry_and_exit -def refresh_finalizer(self, pending_task_ids_json): - - # TODO: if more types of errors appear, use a finally block - - input_schema = { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "array", - "items": {"type": "string"} - } - - try: - task_ids = json.loads(pending_task_ids_json) - logger.debug('task_ids: %r' % task_ids) - jsonschema.validate(task_ids, input_schema) - - _wait_for_tasks(task_ids, update_callback=self.log_info) - _build_subnet_db(update_callback=self.log_info) - _build_snmp_peering_db(update_callback=self.log_info) - _build_juniper_peering_db(update_callback=self.log_info) - populate_poller_interfaces_cache(warning_callback=self.log_warning) - - except (jsonschema.ValidationError, - json.JSONDecodeError, - InventoryTaskError, - RedisError, - KombuError) as e: - update_latch_status(InventoryTask.config, failure=True) - raise e - - latch_db(InventoryTask.config) - self.log_info('latched current/next dbs') - - def _build_subnet_db(update_callback=lambda s: None): r = get_next_redis(InventoryTask.config) @@ -1189,47 +442,52 @@ def check_task_status(task_id, parent=None, forget=False): yield result -# =================================== chorded - currently only here for testing - -# new @app.task(base=InventoryTask, bind=True, name='update_entry_point') @log_task_entry_and_exit def update_entry_point(self): - routers = retrieve_and_persist_neteng_managed_device_list( - info_callback=self.log_info, - warning_callback=self.log_warning - ) - lab_routers = InventoryTask.config.get('lab-routers', []) - - _erase_next_db(InventoryTask.config) - update_latch_status(InventoryTask.config, pending=True) - - tasks = chord( - ( - ims_task.s().on_error(task_error_handler.s()), - chord( - (reload_router_config_chorded.s(r) for r in routers), - empty_task.si('router tasks complete') + + try: + _erase_next_db(InventoryTask.config) + update_latch_status(InventoryTask.config, pending=True) + monitor.clear_joblog(get_current_redis(InventoryTask.config)) + self.log_info("Starting update") + + routers = retrieve_and_persist_neteng_managed_device_list( + info_callback=self.log_info, + warning_callback=self.log_warning + ) + lab_routers = InventoryTask.config.get('lab-routers', []) + + chord( + ( + ims_task.s().on_error(task_error_handler.s()), + chord( + (reload_router_config_chorded.s(r) for r in routers), + empty_task.si('router tasks complete') + ), + chord( + (reload_lab_router_config_chorded.s(r) + for r in lab_routers), + empty_task.si('lab router tasks complete') + ) ), - chord( - (reload_lab_router_config_chorded.s(r) for r in lab_routers), - empty_task.si('lab router tasks complete') - ) - ), - final_task.si().on_error(task_error_handler.s()) - )() - return tasks + final_task.si().on_error(task_error_handler.s()) + )() + + return self.request.id + except (RedisError, KombuError): + update_latch_status(InventoryTask.config, pending=False, failure=True) + logger.exception('error launching refresh subtasks') + raise -# new @app.task def task_error_handler(request, exc, traceback): update_latch_status(InventoryTask.config, pending=False, failure=True) logger.warning('Task {0!r} raised error: {1!r}'.format(request.id, exc)) -# new @app.task(base=InventoryTask, bind=True, name='empty_task') def empty_task(self, message): logger.warning(f'message from empty task: {message}') @@ -1273,12 +531,11 @@ def retrieve_and_persist_neteng_managed_device_list( return netdash_equipment -# updated @app.task(base=InventoryTask, bind=True, name='reload_lab_router_config') @log_task_entry_and_exit def reload_lab_router_config_chorded(self, hostname): try: - self.log_info(f'loading netconf data for lab {hostname} RL') + self.log_info(f'loading netconf data for lab {hostname}') # load new netconf data, in this thread netconf_str = retrieve_and_persist_netconf_config( @@ -1297,7 +554,8 @@ def reload_lab_router_config_chorded(self, hostname): logical_systems = juniper.logical_systems(netconf_doc) # load snmp data, in this thread - snmp_refresh_interfaces(hostname, community, logical_systems) + snmp_refresh_interfaces_chorded( + hostname, community, logical_systems, self.log_info) self.log_info(f'updated configuration for lab {hostname}') except Exception as e: @@ -1305,12 +563,11 @@ def reload_lab_router_config_chorded(self, hostname): update_latch_status(InventoryTask.config, pending=True, failure=True) -# updated @app.task(base=InventoryTask, bind=True, name='reload_router_config') @log_task_entry_and_exit def reload_router_config_chorded(self, hostname): try: - self.log_info(f'loading netconf data for {hostname} RL') + self.log_info(f'loading netconf data for {hostname}') netconf_str = retrieve_and_persist_netconf_config( hostname, update_callback=self.log_warning) @@ -1342,7 +599,6 @@ def reload_router_config_chorded(self, hostname): update_latch_status(InventoryTask.config, pending=True, failure=True) -# new def retrieve_and_persist_netconf_config( hostname, lab=False, update_callback=lambda s: None): redis_key = f'netconf:{hostname}' @@ -1374,7 +630,6 @@ def retrieve_and_persist_netconf_config( return netconf_str -# updated as is no longer a task @log_task_entry_and_exit def snmp_refresh_interfaces_chorded( hostname, community, logical_systems, update_callback=lambda s: None): @@ -1413,7 +668,6 @@ def snmp_refresh_interfaces_chorded( update_callback(f'snmp interface info loaded from {hostname}') -# updated as is no longer a task @log_task_entry_and_exit def snmp_refresh_peerings_chorded( hostname, community, logical_systems, update_callback=lambda S: None): @@ -1440,11 +694,9 @@ def snmp_refresh_peerings_chorded( update_callback(f'snmp peering info loaded from {hostname}') -# new @app.task(base=InventoryTask, bind=True, name='ims_task') @log_task_entry_and_exit def ims_task(self, use_current=False): - try: extracted_data = extract_ims_data() transformed_data = transform_ims_data(extracted_data) @@ -1456,20 +708,19 @@ def ims_task(self, use_current=False): update_latch_status(InventoryTask.config, pending=True, failure=True) -# new def extract_ims_data(): c = InventoryTask.config["ims"] - ds1 = IMS(c['api'], c['username'], c['password']) - ds2 = IMS(c['api'], c['username'], c['password']) - ds3 = IMS(c['api'], c['username'], c['password']) - ds4 = IMS(c['api'], c['username'], c['password']) - ds5 = IMS(c['api'], c['username'], c['password']) + + def _ds() -> IMS: + return IMS(c['api'], c['username'], c['password']) locations = {} lg_routers = [] + geant_nodes = [] customer_contacts = {} circuit_ids_to_monitor = [] + circuit_ids_and_sids = {} additional_circuit_customer_ids = {} hierarchy = {} @@ -1478,35 +729,46 @@ def extract_ims_data(): def _populate_locations(): nonlocal locations - locations = {k: v for k, v in ims_data.get_node_locations(ds1)} + locations = {k: v for k, v in ims_data.get_node_locations(ds=_ds())} def _populate_lg_routers(): nonlocal lg_routers - lg_routers = list(ims_data.lookup_lg_routers(ds5)) + lg_routers = list(ims_data.lookup_lg_routers(ds=_ds())) + + def _populate_geant_nodes(): + nonlocal geant_nodes + geant_nodes = list(ims_data.lookup_geant_nodes(ds=_ds())) def _populate_customer_contacts(): nonlocal customer_contacts customer_contacts = \ - {k: v for k, v in ims_data.get_customer_service_emails(ds2)} + {k: v for k, v in ims_data.get_customer_service_emails(ds=_ds())} def _populate_circuit_ids_to_monitor(): nonlocal circuit_ids_to_monitor circuit_ids_to_monitor = \ - list(ims_data.get_monitored_circuit_ids(ds3)) + list(ims_data.get_monitored_circuit_ids(ds=_ds())) + + def _populate_sids(): + nonlocal circuit_ids_and_sids + circuit_ids_and_sids = \ + {cid: sid for cid, sid in ims_data.get_ids_and_sids(ds=_ds())} def _populate_additional_circuit_customer_ids(): nonlocal additional_circuit_customer_ids additional_circuit_customer_ids = \ - ims_data.get_circuit_related_customer_ids(ds4) + ims_data.get_circuit_related_customer_ids(ds=_ds()) exceptions = {} with concurrent.futures.ThreadPoolExecutor() as executor: futures = { executor.submit(_populate_locations): 'locations', + executor.submit(_populate_geant_nodes): 'geant_nodes', executor.submit(_populate_lg_routers): 'lg_routers', executor.submit(_populate_customer_contacts): 'customer_contacts', executor.submit(_populate_circuit_ids_to_monitor): 'circuit_ids_to_monitor', + executor.submit(_populate_sids): 'sids', executor.submit(_populate_additional_circuit_customer_ids): 'additional_circuit_customer_ids' } @@ -1520,18 +782,19 @@ def extract_ims_data(): def _populate_hierarchy(): nonlocal hierarchy - hierarchy = {d['id']: d for d in ims_data.get_circuit_hierarchy(ds1)} + hierarchy = { + d['id']: d for d in ims_data.get_circuit_hierarchy(ds=_ds())} logger.debug("hierarchy complete") def _populate_port_id_details(): nonlocal port_id_details - for x in ims_data.get_port_details(ds2): + for x in ims_data.get_port_details(ds=_ds()): pd = port_id_details[x['port_id']] pd.append(x) logger.debug("Port details complete") def _populate_circuit_info(): - for x in ims_data.get_port_id_services(ds3): + for x in ims_data.get_port_id_services(ds=_ds()): port_id_services[x['port_a_id']].append(x) logger.debug("port circuits complete") @@ -1554,14 +817,15 @@ def extract_ims_data(): 'lg_routers': lg_routers, 'customer_contacts': customer_contacts, 'circuit_ids_to_monitor': circuit_ids_to_monitor, + 'circuit_ids_sids': circuit_ids_and_sids, 'additional_circuit_customer_ids': additional_circuit_customer_ids, 'hierarchy': hierarchy, 'port_id_details': port_id_details, - 'port_id_services': port_id_services + 'port_id_services': port_id_services, + 'geant_nodes': geant_nodes } -# new def transform_ims_data(data): locations = data['locations'] customer_contacts = data['customer_contacts'] @@ -1570,6 +834,10 @@ def transform_ims_data(data): hierarchy = data['hierarchy'] port_id_details = data['port_id_details'] port_id_services = data['port_id_services'] + circuit_ids_and_sids = data['circuit_ids_sids'] + geant_nodes = data['geant_nodes'] + + sid_services = defaultdict(list) def _get_circuit_contacts(c): customer_ids = {c['customerid']} @@ -1732,17 +1000,39 @@ def transform_ims_data(data): f"{circ['equipment']}/{circ['other_end_equipment']}" ][circ['id']] = circ + if circ['id'] in circuit_ids_and_sids \ + and circ['status'] == 'operational': + sid = circuit_ids_and_sids[circ['id']] + if circ['circuit_type'] == 'circuit': + logger.info(f'SID ({sid}) Circuit ({circ["id"]})' + f' Name ({circ["name"]}) not a service') + else: + sid_info = { + 'circuit_id': circ['id'], + 'sid': sid, + 'name': circ['name'], + 'speed': circ['calculated-speed'], + 'service_type': circ['service_type'], + 'project': circ['project'], + 'customer': circ['customer'], + 'equipment': circ['equipment'], + 'port': circ['port'], + 'geant_equipment': circ['equipment'] in geant_nodes + } + if sid_info not in sid_services[sid]: + sid_services[sid].append(sid_info) + interface_services[k].extend(circuits) return { 'hierarchy': hierarchy, 'interface_services': interface_services, 'services_by_type': services_by_type, - 'node_pair_services': node_pair_services + 'node_pair_services': node_pair_services, + 'sid_services': sid_services } -# new def persist_ims_data(data, use_current=False): hierarchy = data['hierarchy'] locations = data['locations'] @@ -1750,6 +1040,7 @@ def persist_ims_data(data, use_current=False): interface_services = data['interface_services'] services_by_type = data['services_by_type'] node_pair_services = data['node_pair_services'] + sid_services = data['sid_services'] if use_current: r = get_current_redis(InventoryTask.config) @@ -1765,12 +1056,15 @@ def persist_ims_data(data, use_current=False): 'ims:gws_indirect:*', 'ims:node_pair_services:*' ]: + + r.delete('ims:sid_services') rp = r.pipeline() for k in r.scan_iter(key_pattern, count=1000): rp.delete(k) else: r = get_next_redis(InventoryTask.config) + r.set('ims:sid_services', json.dumps(sid_services)) rp = r.pipeline() for h, d in locations.items(): rp.set(f'ims:location:{h}', json.dumps([d])) @@ -1833,7 +1127,6 @@ def persist_ims_data(data, use_current=False): rp.execute() -# new @app.task(base=InventoryTask, bind=True, name='final_task') @log_task_entry_and_exit def final_task(self): diff --git a/setup.py b/setup.py index 75ba85c2aa7c4b3927a063b5a22481cff1659730..8d543a9f43fa6ef124e66f726d42f0a28a3e43e2 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup, find_packages setup( name='inventory-provider', - version="0.77", + version="0.78", author='GEANT', author_email='swd@geant.org', description='Dashboard inventory provider', diff --git a/test/data/router-info.json b/test/data/router-info.json index 17cdc119495f8bb4ef7223417e194249164df0bc..a67ba47dd2433769769f39c5b96973b15b9bfe1a 100644 Binary files a/test/data/router-info.json and b/test/data/router-info.json differ diff --git a/test/per_router/test_celery_worker.py b/test/per_router/test_celery_worker.py index e071f5f8981466f87b2281144f0e3ab2e0b45e95..28776edbf5ea5233a47c2e3011bfd95258672df3 100644 --- a/test/per_router/test_celery_worker.py +++ b/test/per_router/test_celery_worker.py @@ -19,7 +19,7 @@ def backend_db(): def test_netconf_refresh_config(mocked_worker_module, router): del backend_db()['netconf:' + router] - worker.netconf_refresh_config(router) + worker.reload_router_config_chorded(router) assert backend_db()['netconf:' + router] @@ -35,7 +35,7 @@ def test_snmp_refresh_interfaces(mocked_worker_module, router): for k in list(_ifc_keys()): del backend_db()[k] - worker.snmp_refresh_interfaces(router, 'fake-community', []) + worker.snmp_refresh_interfaces_chorded(router, 'fake-community', []) assert backend_db()['snmp-interfaces:' + router] assert list(_ifc_keys()) @@ -51,7 +51,7 @@ def test_snmp_refresh_peerings(mocked_worker_module, router): for k in list(_ifc_keys()): del backend_db()[k] - worker.snmp_refresh_peerings(router, 'fake-community', []) + worker.snmp_refresh_peerings_chorded(router, 'fake-community', []) assert list(_ifc_keys()) @@ -76,28 +76,29 @@ def test_reload_router_config(mocked_worker_module, router, mocker): assert 'netconf:' + router not in backend_db() assert 'snmp-interfaces:' + router not in backend_db() - def _mocked_netconf_refresh_config_apply(args): + def _mocked_retrieve_and_persist_netconf_config(*args, **kwargs): key = 'netconf:' + args[0] backend_db()[key] = saved_data[key] + return saved_data[key] mocker.patch( - 'inventory_provider.tasks.worker.netconf_refresh_config.apply', - _mocked_netconf_refresh_config_apply) + 'inventory_provider.tasks.worker.retrieve_and_persist_netconf_config', + _mocked_retrieve_and_persist_netconf_config) - def _mocked_snmp_refresh_peerings_apply(args): - assert len(args) == 3 - backend_db().update(saved_peerings) - mocker.patch( - 'inventory_provider.tasks.worker.snmp_refresh_peerings.apply', - _mocked_snmp_refresh_peerings_apply) - - def _mocked_snmp_refresh_interfaces_apply(args): - assert len(args) == 3 + def _mocked_snmp_refresh_interfaces_chorded(*args, **kwargs): + assert len(args) == 4 key = 'snmp-interfaces:' + args[0] backend_db()[key] = saved_data[key] mocker.patch( - 'inventory_provider.tasks.worker.snmp_refresh_interfaces.apply', - _mocked_snmp_refresh_interfaces_apply) + 'inventory_provider.tasks.worker.snmp_refresh_interfaces_chorded', + _mocked_snmp_refresh_interfaces_chorded) + + def _mocked_snmp_refresh_peerings_chorded(*args, **kwargs): + assert len(args) == 4 + backend_db().update(saved_peerings) + mocker.patch( + 'inventory_provider.tasks.worker.snmp_refresh_peerings_chorded', + _mocked_snmp_refresh_peerings_chorded) def _mocked_update_status(self, **kwargs): pass @@ -105,6 +106,6 @@ def test_reload_router_config(mocked_worker_module, router, mocker): 'inventory_provider.tasks.worker.InventoryTask.update_state', _mocked_update_status) - worker.reload_router_config(router) + worker.reload_router_config_chorded(router) assert 'netconf:' + router in backend_db() assert 'snmp-interfaces:' + router in backend_db() diff --git a/test/per_router/test_classifier_data.py b/test/per_router/test_classifier_data.py deleted file mode 100644 index 37774b4469d86f8168de0ad7b08d8dfeb4d91a30..0000000000000000000000000000000000000000 --- a/test/per_router/test_classifier_data.py +++ /dev/null @@ -1,24 +0,0 @@ -from inventory_provider.tasks import worker -from inventory_provider.tasks.common import _get_redis - - -def backend_db(): - return _get_redis({ - 'redis': { - 'hostname': None, - 'port': None - }, - 'redis-databases': [0, 7] - }).db - - -def test_clear_classifier_cache( - router, - mocked_redis, - data_config, - classifier_cache_test_entries): - worker.InventoryTask.config = data_config - backend_db().update(classifier_cache_test_entries) - worker.clear_cached_classifier_responses(router) - for k in backend_db(): - assert not k.startswith('classifier-cache:%s:' % router) diff --git a/test/test_ims_data.py b/test/test_ims_data.py index 7d07aff8e7b0ee6ef3a13801beae0a4cd3cd71a2..ae2a2e8cbd713adc806e3bd0bfaf177ee8bdc850 100644 --- a/test/test_ims_data.py +++ b/test/test_ims_data.py @@ -8,7 +8,7 @@ from inventory_provider.db.ims import InventoryStatus from inventory_provider.db.ims_data import lookup_lg_routers, \ get_node_locations, IMS_OPSDB_STATUS_MAP, \ get_port_id_services, get_port_details, \ - get_circuit_hierarchy, NODE_LOCATION_SCHEMA + get_circuit_hierarchy, get_ids_and_sids, NODE_LOCATION_SCHEMA def _json_test_data(filename): @@ -162,6 +162,7 @@ def test_get_port_id_services(mocker): 'circuit_type': 'service', 'service_type': 'GEANT IP', 'project': 'ORG A', + 'customer': 'ORG A', 'port_a_id': 224507, 'customerid': 57658 }, @@ -172,6 +173,7 @@ def test_get_port_id_services(mocker): 'circuit_type': 'service', 'service_type': 'GEANT PEERING', 'project': 'ORG B', + 'customer': 'ORG B', 'port_a_id': 224464, 'customerid': 57664 }, @@ -182,6 +184,7 @@ def test_get_port_id_services(mocker): 'circuit_type': 'circuit', 'service_type': 'ETHERNET', 'project': 'ETH', + 'customer': 'ETH', 'port_a_id': 6423107, 'port_b_id': 6419340, 'customerid': 57744 @@ -193,6 +196,7 @@ def test_get_port_id_services(mocker): 'circuit_type': 'circuit', 'service_type': 'ETHERNET', 'project': 'ETH', + 'customer': 'ETH', 'port_a_id': 6419340, 'port_b_id': 6423107, 'customerid': 57744 @@ -204,6 +208,7 @@ def test_get_port_id_services(mocker): 'circuit_type': 'circuit', 'service_type': 'ETHERNET', 'project': 'ETH', + 'customer': 'ETH', 'port_a_id': 6423111, 'customerid': 57744 }, @@ -214,6 +219,7 @@ def test_get_port_id_services(mocker): 'circuit_type': 'service', 'service_type': 'PRODUCT A', 'project': 'ORG C', + 'customer': 'ORG C', 'port_a_id': 6419453, 'customerid': 57640 } @@ -298,3 +304,54 @@ def test_get_node_location(mocker): 'latitude': 51.5308142, } }) + + +def test_get_circuit_ids_and_sids(mocker): + ims = mocker.patch('inventory_provider.db.ims.IMS') + ims.return_value.get_filtered_entities.return_value = \ + [ + { + 'id': 999999, + 'extrafieldid': 3209, + 'objectid': 111111, + 'value': 'SID-01', + 'extrafield': None, + 'links': None, + 'extrafieldvalueobjectinfo': None, + 'rowversion': '2021-12-10T19:40:51', + 'errors': None, + 'haserrors': False + }, { + 'id': 999998, + 'extrafieldid': 3209, + 'objectid': 111112, + 'value': 'SID-02', + 'extrafield': None, + 'links': None, + 'extrafieldvalueobjectinfo': None, + 'rowversion': '2021-12-10T19:40:51', + 'errors': None, + 'haserrors': False + }, { + 'id': 999997, + 'extrafieldid': 3209, + 'objectid': 111113, + 'value': 'SID-03', + 'extrafield': None, + 'links': None, + 'extrafieldvalueobjectinfo': None, + 'rowversion': '2021-12-10T19:40:51', + 'errors': None, + 'haserrors': False + }, + ] + expected_response = [ + (111111, 'SID-01'), + (111112, 'SID-02'), + (111113, 'SID-03') + ] + + ds = inventory_provider.db.ims.IMS( + 'dummy_base', 'dummy_username', 'dummy_password') + res = list(get_ids_and_sids(ds)) + assert res == expected_response diff --git a/test/test_job_routes.py b/test/test_job_routes.py index f3a4df17d8541322bda4af2634d91ab59e90bb23..c280f57f882959df623e8f50e8fbd9f0dab71fa3 100644 --- a/test/test_job_routes.py +++ b/test/test_job_routes.py @@ -24,9 +24,9 @@ def backend_db(): def test_job_update_all(client, mocker): expected_task_id = 'xyz@123#456' - launch_refresh_cache_all = mocker.patch( - 'inventory_provider.tasks.worker.launch_refresh_cache_all') - launch_refresh_cache_all.return_value = expected_task_id + update_entry_point = mocker.patch( + 'inventory_provider.tasks.worker.update_entry_point.delay') + update_entry_point.return_value.get.return_value = expected_task_id rv = client.post( 'jobs/update', @@ -43,9 +43,9 @@ def test_job_update_all(client, mocker): def test_job_update_force_pending(client, mocker): expected_task_id = 'asf#asdf%111' - launch_refresh_cache_all = mocker.patch( - 'inventory_provider.tasks.worker.launch_refresh_cache_all') - launch_refresh_cache_all.return_value = expected_task_id + update_entry_point = mocker.patch( + 'inventory_provider.tasks.worker.update_entry_point.delay') + update_entry_point.return_value.get.return_value = expected_task_id mocked_get_latch = mocker.patch( 'inventory_provider.routes.jobs.get_latch') @@ -64,7 +64,7 @@ def test_job_update_pending_force_false(client, mocker): def _assert_if_called(*args, **kwargs): assert False mocker.patch( - 'inventory_provider.tasks.worker.launch_refresh_cache_all', + 'inventory_provider.tasks.worker.update_entry_point', _assert_if_called) mocked_get_latch = mocker.patch( @@ -81,7 +81,7 @@ def test_job_update_pending(client, mocker): def _assert_if_called(*args, **kwargs): assert False mocker.patch( - 'inventory_provider.tasks.worker.launch_refresh_cache_all', + 'inventory_provider.tasks.worker.update_entry_point', _assert_if_called) mocked_get_latch = mocker.patch( @@ -104,7 +104,7 @@ class MockedAsyncResult(object): def test_reload_router_config(client, mocker): delay_result = mocker.patch( - 'inventory_provider.tasks.worker.reload_router_config.delay') + 'inventory_provider.tasks.worker.reload_router_config_chorded.delay') delay_result.return_value = MockedAsyncResult('bogus task id') rv = client.post( diff --git a/test/test_msr_routes.py b/test/test_msr_routes.py index ba10ce0fbe404a78c29985baf96179dc986cdf92..198864e68ae6613815f037bd8b8673e64fc1166d 100644 --- a/test/test_msr_routes.py +++ b/test/test_msr_routes.py @@ -5,14 +5,11 @@ import pytest from inventory_provider.routes.msr import PEERING_LIST_SCHEMA, \ PEERING_GROUP_LIST_SCHEMA, PEERING_ADDRESS_SERVICES_LIST, \ - _get_services_for_address + SYSTEM_CORRELATION_SERVICES_LIST_SCHEMA, _get_services_for_address from inventory_provider.routes.poller import SERVICES_LIST_SCHEMA from inventory_provider.tasks.common import _get_redis - -DEFAULT_REQUEST_HEADERS = { - "Accept": ["application/json"] -} +DEFAULT_REQUEST_HEADERS = {'Accept': ['application/json']} def test_access_services(client): @@ -302,3 +299,14 @@ def test_peering_services_single_threaded(client): assert response_data # test data is non-empty jsonschema.validate(response_data, PEERING_ADDRESS_SERVICES_LIST) + + +def test_system_correlation_services(client): + rv = client.get( + '/msr/services', + headers=DEFAULT_REQUEST_HEADERS) + assert rv.status_code == 200 + assert rv.is_json + response_data = json.loads(rv.data.decode('utf-8')) + jsonschema.validate(response_data, SYSTEM_CORRELATION_SERVICES_LIST_SCHEMA) + assert response_data # test data is non-empty diff --git a/test/test_worker.py b/test/test_worker.py index 825c98909ffdd924a7100d95a245d023168e46ec..7b75b400c18418ae8d0d65f729ba5db3f9132cd9 100644 --- a/test/test_worker.py +++ b/test/test_worker.py @@ -56,6 +56,18 @@ def test_extract_ims_data(mocker): {'port_a_id': '2', 'value': '2A'} ] ) + mocker.patch( + 'inventory_provider.tasks.worker.ims_data.get_ids_and_sids', + return_value=(x for x in [ + (111111, 'SID-01'), + (111112, 'SID-02'), + (111113, 'SID-03') + ]) + ) + mocker.patch( + 'inventory_provider.tasks.worker.ims_data.lookup_geant_nodes', + return_value=[] + ) res = extract_ims_data() assert res['locations'] == {'loc_a': 'LOC A', 'loc_b': 'LOC B'} assert res['lg_routers'] == ['lg router 1', 'lg router 2'] @@ -81,6 +93,11 @@ def test_extract_ims_data(mocker): ], '2': [{'port_a_id': '2', 'value': '2A'}] } + assert res['circuit_ids_sids'] == { + 111111: 'SID-01', + 111112: 'SID-02', + 111113: 'SID-03' + } def test_transform_ims_data(): @@ -124,6 +141,16 @@ def test_transform_ims_data(): "equipment_name": "eq_b", "interface_name": "if_b", "port_id": "port_id_2" + }], + "port_id_3": [{ + "equipment_name": "eq_a", + "interface_name": "if_c", + "port_id": "port_id_3" + }], + "port_id_4": [{ + "equipment_name": "eq_b", + "interface_name": "if_c", + "port_id": "port_id_4" }] } @@ -131,7 +158,10 @@ def test_transform_ims_data(): "port_id_1": [ { "id": "circ_id_1", + "name": "circ_name_1", "customerid": "cu_1", + "customer": "customer_1", + "project": "customer_1", "circuit_type": "circuit", "service_type": "ETHERNET", "status": "operational", @@ -143,7 +173,10 @@ def test_transform_ims_data(): "port_id_2": [ { "id": "circ_id_1", + "name": "circ_name_1", "customerid": "cu_1", + "customer": "customer_1", + "project": "customer_1", "circuit_type": "circuit", "service_type": "ETHERNET", "status": "operational", @@ -151,6 +184,34 @@ def test_transform_ims_data(): "port_b_id": "port_id_1", } + ], + "port_id_3": [ + { + "id": "sub_circuit_2", + "name": "sub_circuit_2", + "customerid": "cu_1", + "customer": "customer_1", + "project": "customer_1", + "circuit_type": "service", + "service_type": "PEERING R & E", + "status": "operational", + "port_a_id": "port_id_3", + "port_b_id": "port_id_4", + } + ], + "port_id_4": [ + { + "id": "sub_circuit_2", + "name": "sub_circuit_2", + "customerid": "cu_1", + "customer": "customer_1", + "project": "customer_1", + "circuit_type": "service", + "service_type": "PEERING R & E", + "status": "operational", + "port_a_id": "port_id_4", + "port_b_id": "port_id_3", + }, ] } @@ -160,8 +221,10 @@ def test_transform_ims_data(): "name": "circ_name_1", "status": "operational", "circuit-type": "circuit", + "service_type": "ETHERNET", "product": "ethernet", "speed": "not fibre_route", + "project": "customer_1", "carrier-circuits": ["carrier_id_1"], "sub-circuits": ["sub_circuit_1"], "customerid": "cu_1", @@ -173,6 +236,7 @@ def test_transform_ims_data(): "circuit-type": "circuit", "product": "ethernet", "speed": "10G", + "project": "customer_1", "carrier-circuits": ["carrier_id_2"], "sub-circuits": ["circ_id_1"], "customerid": "cu_1", @@ -184,6 +248,7 @@ def test_transform_ims_data(): "circuit-type": "circuit", "product": "ethernet", "speed": "not fibre_route", + "project": "customer_1", "carrier-circuits": ["carrier_id_3"], "sub-circuits": ["carrier_id_1"], "customerid": "cu_1", @@ -195,6 +260,7 @@ def test_transform_ims_data(): "circuit-type": "circuit", "product": "OCG4", "speed": "fibre_route", + "project": "customer_1", "carrier-circuits": [], "sub-circuits": ["carrier_id_2"], "customerid": "cu_1", @@ -206,6 +272,7 @@ def test_transform_ims_data(): "circuit-type": "circuit", "product": "ethernet", "speed": "not fibre_route", + "project": "customer_1", "carrier-circuits": ["circ_id_1"], "sub-circuits": ["sub_circuit_2"], "customerid": "cu_1", @@ -217,24 +284,33 @@ def test_transform_ims_data(): "circuit-type": "service", "product": "PEERING R & E", "speed": "not fiber route", - "project": "Project A", + "project": "customer_1", "carrier-circuits": ["sub_circuit_1"], "sub-circuits": [], "customerid": "cu_1", } } + + circuit_ids_and_sids = { + "sub_circuit_2": 'SID-01', + "circ_id_2": 'SID-02', + "circ_id_3": 'SID-03' + } data = { "locations": locations, "customer_contacts": customer_contacts, - "circuit_ids_to_monitor": [], + "circuit_ids_to_monitor": ["sub_circuit_2"], "additional_circuit_customer_ids": additional_circuit_customer_ids, "hierarchy": hierarchy, "port_id_details": port_id_details, - "port_id_services": port_id_services + "port_id_services": port_id_services, + "circuit_ids_sids": circuit_ids_and_sids, + "geant_nodes": ["eq_b"] } res = transform_ims_data(data) ifs = res["interface_services"] - assert list(ifs.keys()) == ["eq_a:if_a", "eq_b:if_b"] + assert list(ifs.keys()) == [ + "eq_a:if_a", "eq_b:if_b", "eq_a:if_c", "eq_b:if_c"] for v in ifs.values(): assert len(v) == 1 assert len(v[0]["related-services"]) == 1 @@ -254,6 +330,38 @@ def test_transform_ims_data(): assert len(v[0]["fibre-routes"]) == 1 assert v[0]["fibre-routes"][0]["id"] == "carrier_id_3" + assert len(res["sid_services"]['SID-01']) == 2 + + for x in [ + { + 'circuit_id': "sub_circuit_2", + 'sid': "SID-01", + 'name': "sub_circuit_2", + 'speed': 10 << 30, + 'service_type': "PEERING R & E", + 'project': "customer_1", + 'customer': "customer_1", + 'equipment': "eq_a", + 'port': "if_c", + 'geant_equipment': False + }, + { + 'circuit_id': "sub_circuit_2", + 'sid': "SID-01", + 'name': "sub_circuit_2", + 'speed': 10 << 30, + 'service_type': "PEERING R & E", + 'project': "customer_1", + 'customer': "customer_1", + 'equipment': "eq_b", + 'port': "if_c", + 'geant_equipment': True + } + ]: + assert json.dumps(x, sort_keys=True) in [ + json.dumps( + y, sort_keys=True) for y in res["sid_services"]['SID-01']] + def test_persist_ims_data(mocker, data_config, mocked_redis): @@ -301,7 +409,9 @@ def test_persist_ims_data(mocker, data_config, mocked_redis): "np1": {"id_1": "data for np1"}, "np2": {"id_2": "data for np2"}, }, + "sid_services": {"SID-001": [{"k1": "data"}, {"k1": "data"}]}, "services_by_type": {}, + "geant_nodes": [] } for k in r.keys("ims:*"): r.delete(k) @@ -325,6 +435,9 @@ def test_persist_ims_data(mocker, data_config, mocked_redis): assert [k.decode("utf-8") for k in r.keys("poller_cache:*")] == \ ["poller_cache:eq1", "poller_cache:eq2"] + assert json.loads(r.get("ims:sid_services").decode("utf-8")) == \ + data["sid_services"] + def test_retrieve_and_persist_neteng_managed_device_list( mocker, data_config, mocked_redis): diff --git a/tox.ini b/tox.ini index 7d0d25df10b59e54c9a10b05c6cdee9fa24920da..1f32f60ae56f411df6f492156ae11ebf471692ef 100644 --- a/tox.ini +++ b/tox.ini @@ -14,7 +14,7 @@ deps = commands = coverage erase - coverage run --source inventory_provider --omit='inventory_provider/routes/ims*,inventory_provider/db/ims*,inventory_provider/tasks/ims*' -m py.test {posargs} + coverage run --source inventory_provider -m py.test {posargs} coverage xml coverage html coverage report --fail-under 75