diff --git a/.gitignore b/.gitignore index d8cc93fdfee743e40090463be17a739ede4b1354..c4d57182c25bbbfee7229472caaae2ed3c02fa60 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,6 @@ coverage.xml .tox htmlcov dist +venv +.vscode +docs/ \ No newline at end of file diff --git a/Changelog.md b/Changelog.md index 7e614ee0056142d09da702ef28cef99ca9f32f9c..8fe43d9a2135906a53c36474c210b328d567d566 100644 --- a/Changelog.md +++ b/Changelog.md @@ -2,6 +2,12 @@ All notable changes to this project will be documented in this file. +## [0.70] - 2021-08-20 +- DBOARD3-459: fix performance issue with /poller/interfaces +- POL1-483: add dashboard mappings to /poller/interfaces response +- POL1-479: add LAG/SRV_L3VPN to RE_PEER dashboards +- updated GWS direct data to support ipv6 creds + ## [0.69] - 2021-06-30 - DBOARD3-444: Added related services to bundle member response diff --git a/inventory_provider/__init__.py b/inventory_provider/__init__.py index 877944d598d7f3731b2022174a03f7424eadcb1c..d350c9ebbcd98a251485f562b3259f9980ae8721 100644 --- a/inventory_provider/__init__.py +++ b/inventory_provider/__init__.py @@ -62,16 +62,6 @@ def create_app(): # end of IMS based routes - # OTRS routes - - from inventory_provider.routes import ims_otrs - app.register_blueprint(ims_otrs.routes, url_prefix='/otrs') - - # from inventory_provider.routes import otrs_jobs - # app.register_blueprint(otrs_jobs.routes, url_prefix='/otrs') - - # end of OTRS routes - from inventory_provider.routes import default app.register_blueprint(default.routes, url_prefix='/') diff --git a/inventory_provider/config.py b/inventory_provider/config.py index 733877cc12a078b0579ae783dc9585d490471e63..8b9d0a6ddf94da9f1ebc79db75d828a232a6ed6d 100644 --- a/inventory_provider/config.py +++ b/inventory_provider/config.py @@ -117,7 +117,7 @@ CONFIG_SCHEMA = { 'required': ['tag', 'counters'], 'additionalProperties': False }, - 'gws-direct-host': { + 'gws-direct-host-v2': { 'type': 'object', 'properties': { 'hostname': {'type': 'string'}, @@ -131,6 +131,31 @@ CONFIG_SCHEMA = { 'required': ['hostname', 'community', 'interfaces'], 'additionalProperties': False }, + 'snmp-v3-cred': { + 'type': 'object', + 'properties': { + 'protocol': {'enum': ['MD5', 'DES']}, + 'password': {'type': 'string'} + }, + 'required': ['protocol', 'password'], + 'additionalProperties': False + }, + 'gws-direct-host-v3': { + 'type': 'object', + 'properties': { + 'hostname': {'type': 'string'}, + 'sec-name': {'type': 'string'}, + 'auth': {'$ref': '#/definitions/snmp-v3-cred'}, + 'priv': {'$ref': '#/definitions/snmp-v3-cred'}, + 'interfaces': { + 'type': 'array', + 'items': {'$ref': '#/definitions/gws-direct-interface'}, + 'minItems': 1 + } + }, + 'required': ['hostname', 'sec-name', 'interfaces'], + 'additionalProperties': False + }, 'gws-direct-nren-isp': { 'type': 'object', 'properties': { @@ -141,7 +166,12 @@ CONFIG_SCHEMA = { }, 'hosts': { 'type': 'array', - 'items': {'$ref': '#/definitions/gws-direct-host'}, + 'items': { + 'oneOf': [ + {'$ref': '#/definitions/gws-direct-host-v2'}, + {'$ref': '#/definitions/gws-direct-host-v3'}, + ] + }, 'minItems': 1 } }, @@ -186,7 +216,6 @@ CONFIG_SCHEMA = { 'ssh', 'redis', 'redis-databases', - 'otrs-export', 'ims', 'managed-routers', 'gws-direct'] @@ -197,7 +226,6 @@ CONFIG_SCHEMA = { 'ssh', 'sentinel', 'redis-databases', - 'otrs-export', 'ims', 'managed-routers', 'gws-direct'] diff --git a/inventory_provider/db/ims_data.py b/inventory_provider/db/ims_data.py index 0283754bf0f3367fdb6a1bf9911dcf3ecb88970d..f0f9ddb516115a1e0435b9b390a933832122c340 100644 --- a/inventory_provider/db/ims_data.py +++ b/inventory_provider/db/ims_data.py @@ -1,6 +1,6 @@ import logging import re -from collections import OrderedDict, defaultdict +from collections import defaultdict from copy import copy from itertools import chain, groupby from operator import itemgetter @@ -420,174 +420,3 @@ def lookup_lg_routers(ds: IMS): } } yield eq - - -def otrs_get_customer_company_rows(ds: IMS): - yield ['customer_id', 'name', 'street', 'zip', 'city', 'country', 'url', - 'comments'] - all_cus_comps = set() - for customer in ds.get_all_entities('Customer'): - all_cus_comps.add(customer['name']) - yield [customer['name'].replace(' ', ''), customer['name'], - '', '', '', '', '', ''] - for vendor in ds.get_all_entities('Vendor'): - if vendor['name'] not in all_cus_comps: - all_cus_comps.add(vendor['name']) - yield [vendor['name'].replace(' ', ''), vendor['name'], - '', '', '', '', '', ''] - - -# TODO - check the rules for validation once model has been confirmed -def _is_valid_customer(cus): - if not cus['email']: - return False - return True - - -def otrs_get_customer_contacts(ds: IMS): - - def _get_customer_id2(t): - if t['id'] == 3 or t['name'] == 'EU NREN': - return 'OTRS-GEANT-NREN' - return '' - - for customer in ds.get_all_entities( - 'Customer', - [ - ims.CUSTOMER_PROPERTIES['CustomerRelatedContacts'], - ims.CUSTOMER_PROPERTIES['CustomerType'] - ]): - - if customer['customerrelatedcontacts']: - - for contact in customer['customerrelatedcontacts']: - t_customer_user = OrderedDict({ - 'email': contact['contact']['mail'], - 'username': contact['contact']['mail'], # TODO if tal_id is going to be present use that # noqa - 'customer_id': customer['name'].replace(' ', ''), - 'customer_id_2': - _get_customer_id2(customer['customertype']), - 'title': '', - 'firstname': contact['contact']['name'], - 'lastname': - ' '.join(filter(None, [ - contact['contact']['infix'], - contact['contact']['lastname'] - ])), - 'phone': '', - 'fax': '', - 'mobile': '', - 'street': '', - 'zip': '', - 'city': '', - 'country': '', - 'comments': '', - }) - if not _is_valid_customer(t_customer_user): - continue - - yield t_customer_user - - -def otrs_get_vendor_contacts(ds: IMS): - - for vrc in ds.get_all_entities( - 'VendorRelatedContact', - [ - ims.VENDOR_RELATED_CONTACT_PROPERTIES['Vendor'], - ims.VENDOR_RELATED_CONTACT_PROPERTIES['Contact'] - ]): - t_customer_user = OrderedDict({ - 'email': vrc['contact']['mail'], - 'username': vrc['contact']['mail'], # TODO if tal_id is going to be present use that # noqa - 'customer_id': vrc['vendor']['name'].replace(' ', ''), - 'customer_id_2': '', - 'title': '', - 'firstname': vrc['contact']['name'], - 'lastname': - ' '.join(filter(None, [ - vrc['contact']['infix'], - vrc['contact']['lastname'] - ])), - 'phone': '', - 'fax': '', - 'mobile': '', - 'street': '', - 'zip': '', - 'city': '', - 'country': '', - 'comments': '', - }) - if not _is_valid_customer(t_customer_user): - continue - - yield t_customer_user - - -def otrs_get_customer_users_rows(ds: IMS, return_duplicates: bool = False): - yield ['email', 'username', 'customer_id', 'customer_id_2', 'title', - 'firstname', 'lastname', 'phone', 'fax', 'mobile', 'street', 'zip', - 'city', 'country', 'comments'] - - def get_all_cus_user_rows(): - yielded_customer_emails = set() - for c in otrs_get_customer_contacts(ds): - yielded_customer_emails.add(c['email']) - yield c - for c in otrs_get_vendor_contacts(ds): - if c['email'] not in yielded_customer_emails: - yield c - - def weed_duplicates(duplicates): - # this is here to allow for additional rules - id_rank = { - 'geant': 1 - } - top_rank = -1 - top_ranked = None - for d in duplicates: - rank = id_rank.get(d['customer_id'].lower(), 0) - if rank > top_rank: - top_rank = rank - top_ranked = [] - if rank == top_rank: - top_ranked.append(d) - return top_ranked - - cus_by_email = {} - duplicate_emails = set() - for cu in get_all_cus_user_rows(): - email = cu['email'] - cus_for_email = cus_by_email.get(email, []) - if cus_for_email: - duplicate_emails.add(email) - cus_for_email.append(cu) - cus_by_email[email] = cus_for_email - - remaining_duplicates = [] - if duplicate_emails: - logger.info('Duplicate emails found in OTRS customer-user export: ' - f'{duplicate_emails} - attempting to weed') - for email in duplicate_emails: - weeded = weed_duplicates(cus_by_email.pop(email)) - if len(weeded) == 1: - cus_by_email[email] = weeded - else: - remaining_duplicates.extend( - sorted( - [list(w.values()) for w in weeded], key=lambda x: x[2]) - ) - - if remaining_duplicates: - logger.error('Duplicate emails remain after weeding, ' - f'{"including" if return_duplicates else "excluding"}' - ' duplicates in returned data: ') - - for rd in remaining_duplicates: - logger.debug(rd) - - for email, details in sorted(cus_by_email.items()): - yield list(details[0].values()) - - if return_duplicates: - yield from remaining_duplicates diff --git a/inventory_provider/routes/common.py b/inventory_provider/routes/common.py index 6f9f072a8e931c60ac901720335abbfcefbebd39..79d7c481ece2353eacecf284403a132b776281ee 100644 --- a/inventory_provider/routes/common.py +++ b/inventory_provider/routes/common.py @@ -192,7 +192,8 @@ def _load_redis_docs( config_params, key_pattern, num_threads=10, - doc_type=_DECODE_TYPE_JSON): + doc_type=_DECODE_TYPE_JSON, + use_next_redis=False): """ load all docs from redis and decode as `doc_type` @@ -218,7 +219,10 @@ def _load_redis_docs( t.start() threads.append({'thread': t, 'queue': q}) - r = tasks_common.get_current_redis(config_params) + if use_next_redis: + r = tasks_common.get_next_redis(config_params) + else: + r = tasks_common.get_current_redis(config_params) if isinstance(key_pattern, str): # scan with bigger batches, to mitigate network latency effects @@ -250,25 +254,36 @@ def _load_redis_docs( t['thread'].join(timeout=0.5) # timeout, for sanity -def load_json_docs(config_params, key_pattern, num_threads=10): +def load_json_docs( + config_params, key_pattern, num_threads=10, use_next_redis=False): yield from _load_redis_docs( - config_params, key_pattern, num_threads, doc_type=_DECODE_TYPE_JSON) + config_params, + key_pattern, + num_threads, + doc_type=_DECODE_TYPE_JSON, + use_next_redis=use_next_redis + ) -def load_xml_docs(config_params, key_pattern, num_threads=10): +def load_xml_docs( + config_params, key_pattern, num_threads=10, use_next_redis=False): yield from _load_redis_docs( - config_params, key_pattern, num_threads, doc_type=_DECODE_TYPE_XML) + config_params, + key_pattern, + num_threads, + doc_type=_DECODE_TYPE_XML, + use_next_redis=use_next_redis) -@functools.lru_cache(maxsize=None) -def load_snmp_indexes(hostname=None): +def load_snmp_indexes(config, hostname=None, use_next_redis=False): result = dict() key_pattern = f'snmp-interfaces:{hostname}*' \ if hostname else 'snmp-interfaces:*' for doc in load_json_docs( - config_params=current_app.config['INVENTORY_PROVIDER_CONFIG'], - key_pattern=key_pattern): + config_params=config, + key_pattern=key_pattern, + use_next_redis=use_next_redis): router = doc['key'][len('snmp-interfaces:'):] result[router] = {e['name']: e for e in doc['value']} diff --git a/inventory_provider/routes/ims_otrs.py b/inventory_provider/routes/ims_otrs.py deleted file mode 100644 index b5143b657327bcb63100205fb4a8a7f3dc96dd02..0000000000000000000000000000000000000000 --- a/inventory_provider/routes/ims_otrs.py +++ /dev/null @@ -1,71 +0,0 @@ -import csv -import html -from io import StringIO - -import requests -from flask import Blueprint, request, Response, current_app - -from inventory_provider.db import ims_data -from inventory_provider.db.ims import IMS -from inventory_provider.routes import common -from inventory_provider.tasks.worker import OTRSFiles, export_data_for_otrs - -routes = Blueprint("otrs", __name__) - - -@routes.after_request -def after_request(resp): - return common.after_request(resp) - - -def get_otrs_output(result): - - with StringIO() as sio: - writer = csv.writer(sio, delimiter='^') - writer.writerows(result) - data = sio.getvalue() - return Response( - response=data, - status=requests.codes.ok, - mimetype="text/html") - - -@routes.route('customer-companies') -def get_customer_companies_data(): - ims_config = current_app.config['INVENTORY_PROVIDER_CONFIG']["ims"] - ds = IMS(ims_config['api'], ims_config['username'], ims_config['password']) - return get_otrs_output(ims_data.otrs_get_customer_company_rows(ds)) - - -@routes.route('customer-users') -def get_customer_users_data(): - ims_config = current_app.config['INVENTORY_PROVIDER_CONFIG']["ims"] - ds = IMS(ims_config['api'], ims_config['username'], ims_config['password']) - return_duplicates = request.args.get('duplicates', 'f').lower() == 'true' - return get_otrs_output(ims_data.otrs_get_customer_users_rows( - ds, return_duplicates=return_duplicates)) - - -@routes.route('export') -def send_exports(): - files_value = request.args.get('files', None) - duplicates = request.args.get('duplicates', 'f').lower() == 'true' - if files_value: - try: - files_value = int(files_value) - except ValueError: - return Response( - response=html.escape('<files> should be an Integer'), - status=requests.codes.bad_request, - mimetype="text/html") - if files_value < 0 or files_value > sum(OTRSFiles): - return Response( - response=html.escape(f'Bad value for <files> {files_value}'), - status=requests.codes.bad_request, - mimetype="text/html") - debug_uuid = export_data_for_otrs( - files_to_export=files_value, export_duplicates=duplicates) - return Response( - response=f'Exports sent, search logs for {debug_uuid} for details', - status=requests.codes.ok, - mimetype="text/html") diff --git a/inventory_provider/routes/lnetd.py b/inventory_provider/routes/lnetd.py index 346f857ed16eecb22537aa443e618371e25a3639..8402a0e061bcaa334e6e4391b08b88ffebb4f374 100644 --- a/inventory_provider/routes/lnetd.py +++ b/inventory_provider/routes/lnetd.py @@ -74,7 +74,8 @@ def _add_snmp_indexes(interfaces, hostname=None): :param hostname: hostname or None for all :return: generator that yields interfaces with 'ifIndex' added """ - snmp_indexes = common.load_snmp_indexes(hostname) + snmp_indexes = common.load_snmp_indexes( + current_app.config['INVENTORY_PROVIDER_CONFIG'], hostname) for ifc in interfaces: hostname = ifc['hostname'] diff --git a/inventory_provider/routes/poller.py b/inventory_provider/routes/poller.py index 9561ce47a0c4a0de06a5f62fc97e00d6fb104065..f4fa2897c37f676b7d89c049703998745b348ba7 100644 --- a/inventory_provider/routes/poller.py +++ b/inventory_provider/routes/poller.py @@ -30,6 +30,12 @@ These endpoints are intended for use by BRIAN. .. autofunction:: inventory_provider.routes.poller.gws_direct +/poller/gws/direct-config +--------------------------------- + +.. autofunction:: inventory_provider.routes.poller.gws_direct_config + + /poller/gws/indirect --------------------------------- @@ -48,12 +54,19 @@ These endpoints are intended for use by BRIAN. .. autofunction:: inventory_provider.routes.poller.get_service_types +support method: _get_dashboards +--------------------------------- + +.. autofunction:: inventory_provider.routes.poller._get_dashboards + + """ +from enum import Enum, auto import json import logging import re -from flask import Blueprint, Response, current_app, request +from flask import Blueprint, Response, current_app, request, jsonify from lxml import etree from inventory_provider import juniper @@ -68,6 +81,47 @@ routes = Blueprint('poller-support-routes', __name__) Gb = 1 << 30 + +class INTERFACE_TYPES(Enum): + UNKNOWN = auto() + LOGICAL = auto() + PHYSICAL = auto() + AGGREGATE = auto() + + +class BRIAN_DASHBOARDS(Enum): + CLS = auto() + RE_PEER = auto() + RE_CUST = auto() + GEANTOPEN = auto() + GCS = auto() + L2_CIRCUIT = auto() + LHCONE_PEER = auto() + LHCONE_CUST = auto() + MDVPN_CUSTOMERS = auto() + INFRASTRUCTURE_BACKBONE = auto() + IAS_PRIVATE = auto() + IAS_PUBLIC = auto() + IAS_CUSTOMER = auto() + IAS_UPSTREAM = auto() + GWS_PHY_UPSTREAM = auto() + + # aggregate dashboards + CLS_PEERS = auto() + IAS_PEERS = auto() + GWS_UPSTREAMS = auto() + LHCONE = auto() + CAE1 = auto() + + # NREN customer + NREN = auto() + + +# only used in INTERFACE_LIST_SCHEMA and sphinx docs +_DASHBOARD_IDS = [d.name for d in list(BRIAN_DASHBOARDS)] + +_INTERFACE_TYPES = [i.name for i in list(INTERFACE_TYPES)] + INTERFACE_LIST_SCHEMA = { '$schema': 'http://json-schema.org/draft-07/schema#', @@ -104,12 +158,25 @@ INTERFACE_LIST_SCHEMA = { 'circuits': { 'type': 'array', 'items': {'$ref': '#/definitions/service'} + }, + 'dashboards': { + 'type': 'array', + 'items': {'enum': _DASHBOARD_IDS} + }, + 'dashboard_info': { + 'type': 'object', + 'properties': { + 'name': {'type': 'string'}, + 'interface_type': {'enum': _INTERFACE_TYPES} + }, + 'required': ['name', 'interface_type'], + 'additionalProperties': False } }, 'required': [ 'router', 'name', 'description', 'snmp-index', 'bundle', 'bundle-parents', - 'circuits'], + 'circuits', 'dashboards'], 'additionalProperties': False }, }, @@ -177,6 +244,33 @@ GWS_DIRECT_DATA_SCHEMA = { 'type': 'string', 'pattern': r'^(\d+\.)*\d+$' }, + 'snmp-v2': { + 'type': 'object', + 'properties': { + 'community': {'type': 'string'} + }, + 'required': ['community'], + 'additionalProperties': False + }, + 'snmp-v3-cred': { + 'type': 'object', + 'properties': { + 'protocol': {'enum': ['MD5', 'DES']}, + 'password': {'type': 'string'} + }, + 'required': ['protocol', 'password'], + 'additionalProperties': False + }, + 'snmp-v3': { + 'type': 'object', + 'properties': { + 'sec-name': {'type': 'string'}, + 'auth': {'$ref': '#/definitions/snmp-v3-cred'}, + 'priv': {'$ref': '#/definitions/snmp-v3-cred'} + }, + 'required': ['sec-name'], + 'additionalProperties': False + }, 'counter': { 'type': 'object', 'properties': { @@ -191,9 +285,14 @@ GWS_DIRECT_DATA_SCHEMA = { ] }, 'oid': {'$ref': '#/definitions/oid'}, - 'community': {'type': 'string'} + 'snmp': { + 'oneOf': [ + {'$ref': '#/definitions/snmp-v2'}, + {'$ref': '#/definitions/snmp-v3'} + ] + } }, - 'required': ['field', 'oid', 'community'], + 'required': ['field', 'oid', 'snmp'], 'additionalProperties': False }, 'interface-counters': { @@ -296,14 +395,137 @@ def after_request(resp): return common.after_request(resp) -def _load_interface_bundles(hostname=None): +def _get_dashboards(interface): + """ + Yield enums from BRIAN_DASHBOARDS to indicate which dashboards + this interface should be included in. + + cf. POL1-482 + + Possible dashboard id's are: + + .. asjson:: + inventory_provider.routes.poller._DASHBOARD_IDS + + :param interface: a dict with keys like router, name, description + :return: generator that yields enums from BRIAN_DASHBOARDS + """ + + router = interface.get('router', '').lower() + ifc_name = interface.get('name', '') + description = interface.get('description', '').strip() + + if 'SRV_CLS' in description: + yield BRIAN_DASHBOARDS.CLS + if 'SRV_CLS PRIVATE' in description: + yield BRIAN_DASHBOARDS.CLS_PEERS + if 'SRV_IAS PUBLIC' in description: + yield BRIAN_DASHBOARDS.IAS_PUBLIC + yield BRIAN_DASHBOARDS.IAS_PEERS + if 'SRV_IAS PRIVATE' in description: + yield BRIAN_DASHBOARDS.IAS_PRIVATE + yield BRIAN_DASHBOARDS.IAS_PEERS + if 'SRV_IAS CUSTOMER' in description: + yield BRIAN_DASHBOARDS.IAS_CUSTOMER + if 'SRV_IAS UPSTREAM' in description: + yield BRIAN_DASHBOARDS.IAS_UPSTREAM + if re.match('(SRV_GLOBAL|SRV_L3VPN|LAG) RE_INTERCONNECT', description): + yield BRIAN_DASHBOARDS.RE_PEER + if re.match(r'(PHY|LAG|SRV_GLOBAL) CUSTOMER', description): + yield BRIAN_DASHBOARDS.RE_CUST + if re.match('^SRV_GCS', description): + yield BRIAN_DASHBOARDS.GCS + if 'GEANTOPEN' in description: + yield BRIAN_DASHBOARDS.GEANTOPEN + if 'SRV_L2CIRCUIT' in description: + yield BRIAN_DASHBOARDS.L2_CIRCUIT + if 'LHCONE' in description: + if 'SRV_L3VPN RE' in description: + yield BRIAN_DASHBOARDS.LHCONE_PEER + if 'SRV_L3VPN CUSTOMER' in description: + yield BRIAN_DASHBOARDS.LHCONE_CUST + if re.match('SRV_L3VPN (CUSTOMER|RE_INTERCONNECT)', description): + yield BRIAN_DASHBOARDS.LHCONE + if re.match('^SRV_MDVPN CUSTOMER', description): + yield BRIAN_DASHBOARDS.MDVPN_CUSTOMERS + if 'LAG' in description and \ + re.match('(SRV_GLOBAL|LAG|PHY) INFRASTRUCTURE BACKBONE', + description): + yield BRIAN_DASHBOARDS.INFRASTRUCTURE_BACKBONE + if router == 'mx1.lon.uk.geant.net' \ + and re.match(r'^ae12(\.\d+|$)$', ifc_name): + yield BRIAN_DASHBOARDS.CAE1 + if re.match('^PHY UPSTREAM', description): + yield BRIAN_DASHBOARDS.GWS_PHY_UPSTREAM + regex = r'(PHY|LAG|(SRV_(GLOBAL|LHCONE|MDVPN|IAS|CLS|L3VPN))) CUSTOMER' + if re.match(regex, description): + yield BRIAN_DASHBOARDS.NREN + + +def _get_dashboard_data(ifc): + + def _get_interface_type(description): + if re.match(r'^PHY', description): + return INTERFACE_TYPES.PHYSICAL + if re.match(r'^SRV_', description): + return INTERFACE_TYPES.LOGICAL + if re.match(r'^LAG', description): + return INTERFACE_TYPES.AGGREGATE + + return INTERFACE_TYPES.UNKNOWN + + description = ifc.get('description', '').strip() + dashboards = ifc.get('dashboards', []) + + interface_type = _get_interface_type(description) + + if len(dashboards) == 0: + return ifc + + if BRIAN_DASHBOARDS.INFRASTRUCTURE_BACKBONE.name in dashboards: + name = description.split('|') + name = name[1].strip() + name = name.replace('( ', '(') + elif BRIAN_DASHBOARDS.GWS_PHY_UPSTREAM.name in dashboards: + name = description.split(' ')[2].strip().upper() + host = ifc['router'] + location = host.split('.')[1].upper() + name = f'{name} - {location}' + else: + name = description.split(' ')[2].upper() + + return { + **ifc, + 'dashboard_info': { + 'name': name, + 'interface_type': interface_type.name + } + } + + +def _add_dashboards(interfaces): + """ + generator that dashboards to each interfaces. + + :param interfaces: result of _load_interfaces + :return: generator with `dashboards` populated in each element + """ + + for ifc in interfaces: + dashboards = _get_dashboards(ifc) + ifc['dashboards'] = sorted([d.name for d in dashboards]) + yield _get_dashboard_data(ifc) + + +def _load_interface_bundles(config, hostname=None, use_next_redis=False): result = dict() def _load_docs(key_pattern): for doc in common.load_json_docs( - config_params=current_app.config['INVENTORY_PROVIDER_CONFIG'], + config_params=config, key_pattern=key_pattern, - num_threads=20): + num_threads=20, + use_next_redis=use_next_redis): m = re.match( r'.*netconf-interface-bundles:([^:]+):(.+)', doc['key']) @@ -324,7 +546,7 @@ def _load_interface_bundles(hostname=None): return result -def _load_services(hostname=None): +def _load_services(config, hostname=None, use_next_redis=False): # if hostname: # hostname = get_ims_equipment_name(hostname) @@ -347,9 +569,10 @@ def _load_services(hostname=None): } for doc in common.load_json_docs( - config_params=current_app.config['INVENTORY_PROVIDER_CONFIG'], + config_params=config, key_pattern=key_pattern, - num_threads=20): + num_threads=20, + use_next_redis=use_next_redis): m = re.match(r'^ims:interface_services:([^:]+):(.+)', doc['key']) if not m: @@ -366,11 +589,14 @@ def _load_services(hostname=None): return result -def _load_interfaces(hostname): +def _load_interfaces( + config, hostname=None, no_lab=False, use_next_redis=False): """ loads basic interface data for production & lab routers + :param config: :param hostname: + :param use_next_redis: :return: """ def _load_docs(key_pattern): @@ -381,9 +607,10 @@ def _load_interfaces(hostname): assert key_prefix_len >= len('netconf:') # sanity for doc in common.load_xml_docs( - config_params=current_app.config['INVENTORY_PROVIDER_CONFIG'], + config_params=config, key_pattern=key_pattern, - num_threads=10): + num_threads=10, + use_next_redis=use_next_redis): router = doc['key'][key_prefix_len:] @@ -402,7 +629,6 @@ def _load_interfaces(hostname): base_key_pattern = f'netconf:{hostname}*' if hostname else 'netconf:*' yield from _load_docs(base_key_pattern) - no_lab = common.get_bool_request_arg('no-lab', False) if not no_lab: yield from _load_docs(f'lab:{base_key_pattern}') @@ -415,7 +641,8 @@ def _add_bundle_parents(interfaces, hostname=None): :param hostname: hostname or None for all :return: generator with bundle-parents populated in each element """ - bundles = _load_interface_bundles(hostname) + bundles = _load_interface_bundles( + current_app.config['INVENTORY_PROVIDER_CONFIG'], hostname) for ifc in interfaces: router_bundle = bundles.get(ifc['router'], None) @@ -436,7 +663,8 @@ def _add_circuits(interfaces, hostname=None): if hostname: hostname = get_ims_equipment_name(hostname) - services = _load_services(hostname) + services = _load_services( + current_app.config['INVENTORY_PROVIDER_CONFIG'], hostname=hostname) for ifc in interfaces: router_services = services.get( get_ims_equipment_name(ifc['router']), None) @@ -456,7 +684,8 @@ def _add_snmp_indexes(interfaces, hostname=None): :param hostname: hostname or None for all :return: generator with 'snmp-index' optionally added to each element """ - snmp_indexes = common.load_snmp_indexes(hostname) + snmp_indexes = common.load_snmp_indexes( + current_app.config['INVENTORY_PROVIDER_CONFIG'], hostname) for ifc in interfaces: router_snmp = snmp_indexes.get(ifc['router'], None) if router_snmp and ifc['name'] in router_snmp: @@ -475,16 +704,26 @@ def _load_interfaces_to_poll(hostname=None): :param hostname: hostname or None for all :return: generator yielding interface elements """ - basic_interfaces = _load_interfaces(hostname) + + no_lab = common.get_bool_request_arg('no-lab', False) + basic_interfaces = _load_interfaces( + current_app.config['INVENTORY_PROVIDER_CONFIG'], + hostname, + no_lab=no_lab) + # basic_interfaces = list(basic_interfaces) with_bundles = _add_bundle_parents(basic_interfaces, hostname) with_circuits = _add_circuits(with_bundles, hostname) + # with_circuits = list(with_circuits) with_snmp = _add_snmp_indexes(with_circuits, hostname) + # with_snmp = list(with_snmp) + # only return interfaces that can be polled def _has_snmp_index(ifc): return 'snmp-index' in ifc - # only return interfaces that can be polled - return filter(_has_snmp_index, with_snmp) + to_poll = filter(_has_snmp_index, with_snmp) + + return _add_dashboards(to_poll) @routes.route("/interfaces", methods=['GET', 'POST']) @@ -507,6 +746,9 @@ def interfaces(hostname=None): .. asjson:: inventory_provider.routes.poller.INTERFACE_LIST_SCHEMA + :meth:`inventory_provider.routes.poller._get_services` + is where dashboard mappings is handled. + :param hostname: optional, if present should be a router hostname :return: """ @@ -575,7 +817,11 @@ def _load_interfaces_and_speeds(hostname=None): :param hostname: hostname or None for all :return: generator yielding interface elements """ - basic_interfaces = _load_interfaces(hostname) + no_lab = common.get_bool_request_arg('no-lab', False) + basic_interfaces = _load_interfaces( + current_app.config['INVENTORY_PROVIDER_CONFIG'], + hostname, + no_lab=no_lab) with_bundles = _add_bundle_parents(basic_interfaces, hostname) def _result_ifc(ifc): @@ -653,9 +899,9 @@ def eumetsat_multicast(hostname=None): MX1_FRA = 'mx1.fra.de.geant.net' SUBSCRIPTIONS = [{ - 'subscription': f'232.223.222.{idx}', - 'endpoint': '193.17.9.3', - } for idx in range(1, 73)] + 'subscription': f'232.223.222.{idx}', + 'endpoint': '193.17.9.3', + } for idx in range(1, 73)] SUBSCRIPTIONS.append( {'subscription': '232.223.223.1', 'endpoint': '193.17.9.7'}) @@ -739,7 +985,24 @@ def gws_direct(): config_params = current_app.config['INVENTORY_PROVIDER_CONFIG'] for nren_isp in config_params['gws-direct']: for host in nren_isp['hosts']: + + snmp_params = {} + if 'community' in host: + # (snmp v2) + # sanity (already guaranteed by schema check) + assert 'sec-name' not in host + snmp_params['community'] = host['community'] + else: + # (snmp v3) + # sanity (already guaranteed by schema check) + assert 'sec-name' in host + snmp_params['sec-name'] = host['sec-name'] + if 'auth' in host: + snmp_params['auth'] = host['auth'] + if 'priv' in host: + snmp_params['priv'] = host['priv'] for ifc in host['interfaces']: + yield { 'nren': nren_isp['nren'], 'isp': nren_isp['isp'], @@ -749,8 +1012,9 @@ def gws_direct(): { 'field': k, 'oid': v, - 'community': host['community'] - } for k, v in ifc['counters'].items()] + 'snmp': snmp_params + } + for k, v in ifc['counters'].items()] } result = json.dumps(list(_interfaces())) @@ -855,7 +1119,8 @@ def _get_services_internal(service_type=None): yield doc['value'] def _add_snmp(s): - all_snmp_info = common.load_snmp_indexes() + all_snmp_info = common.load_snmp_indexes( + current_app.config['INVENTORY_PROVIDER_CONFIG'], ) snmp_interfaces = all_snmp_info.get(s['hostname'], {}) interface_info = snmp_interfaces.get(s['interface'], None) if interface_info: @@ -1034,3 +1299,76 @@ def get_service_types(): redis.set(cache_key, service_types) return Response(service_types, mimetype='application/json') + + +@routes.route('/gws/direct-config', methods=['GET', 'POST']) +def gws_direct_config(): + """ + Handler for `/poller/gws/direct-config` which returns + the basic gws-direct config. + + This api is only intended for config validation. + :return: + """ + + format = request.args.get('format', default='json', type=str) + format = format.lower() + if format not in ('html', 'json'): + return Response( + response='format must be one of: html, json', + status=400, + mimetype="text/html") + + def _counters(): + config_params = current_app.config['INVENTORY_PROVIDER_CONFIG'] + for nren_isp in config_params['gws-direct']: + for host in nren_isp['hosts']: + snmp_version = '2' if 'community' in host.keys() else '3' + for ifc in host['interfaces']: + for field, oid in ifc['counters'].items(): + yield { + 'nren': nren_isp['nren'], + 'isp': nren_isp['isp'], + 'hostname': host['hostname'], + 'snmp': snmp_version, + 'interface': ifc['tag'], + 'field': field, + 'oid': oid + } + + def _to_row(counter, header=False): + _columns = ( + 'nren', 'isp', 'hostname', 'snmp', 'interface', 'field', 'oid') + elems = ['<tr>'] + for name in _columns: + if header: + elems.append(f'<th>{name}</th>') + else: + elems.append(f'<td>{counter[name]}</td>') + elems.append('</tr>') + return ''.join(elems) + + if format == 'json': + if not request.accept_mimetypes.accept_json: + return Response( + response="response will be json", + status=406, + mimetype="text/html") + else: + return jsonify(list(_counters())) + + if not request.accept_mimetypes.accept_html: + return Response( + response="response will be html", + status=406, + mimetype="text/html") + + elems = ['<html>', '<body>', '<table>'] + elems.append(_to_row(None, header=True)) + elems += map(_to_row, _counters()) + elems += ['</table>', '</body>', '</html>'] + + return Response( + response=''.join(elems), + status=200, + mimetype="text/html") diff --git a/inventory_provider/routes/testing.py b/inventory_provider/routes/testing.py index f81982283885785da1dfaad497ba8e8ed1899f1c..8240efe5e92438674a881339b140f398f25b5695 100644 --- a/inventory_provider/routes/testing.py +++ b/inventory_provider/routes/testing.py @@ -18,6 +18,12 @@ routes = Blueprint("inventory-data-testing-support-routes", __name__) logger = logging.getLogger(__name__) +@routes.route("chord-update", methods=['GET', 'POST']) +def chord_update(): + r = worker.update_entry_point.delay().get() + return jsonify(r) + + @routes.route("flushdb", methods=['GET', 'POST']) def flushdb(): common.get_current_redis().flushdb() diff --git a/inventory_provider/tasks/common.py b/inventory_provider/tasks/common.py index ad929ed4677cee9750f0b9b52c2846ecc26584d1..9cfa8437a839502bcd9831adf32fce3ce64fa680 100644 --- a/inventory_provider/tasks/common.py +++ b/inventory_provider/tasks/common.py @@ -98,7 +98,7 @@ def update_latch_status(config, pending=False, failure=False): if not pending and not failure: if not latch['pending'] and not latch['failure']: logger.error( - 'updating latch for db {db} with pending=failure=True, ' + f'updating latch for db {db} with pending=True, ' f'but latch is already {latch}') latch['timestamp'] = now latch['pending'] = pending @@ -112,17 +112,21 @@ def set_latch(config, new_current, new_next, timestamp): new_current, new_next)) for db in config['redis-databases']: - latch = { - 'current': new_current, - 'next': new_next, - 'this': db, - 'pending': False, - 'failure': False, - 'timestamp': timestamp - } - r = _get_redis(config, dbid=db) - r.set('db:latch', json.dumps(latch)) + set_single_latch(r, db, new_current, new_next, timestamp) + + +def set_single_latch(r, db, new_current, new_next, timestamp): + + latch = { + 'current': new_current, + 'next': new_next, + 'this': db, + 'pending': False, + 'failure': False, + 'timestamp': timestamp + } + r.set('db:latch', json.dumps(latch)) def latch_db(config): diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index 544e10c098db1343ae249718c6011d50102b629b..e0e9bc2cf422562831d349b553d984150dd90970 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -1,23 +1,17 @@ -import csv +import concurrent.futures import functools import json import logging import os import re -import subprocess -import tempfile import threading import time -from datetime import datetime -from enum import IntFlag -from pathlib import Path from typing import List -from uuid import uuid4 from redis.exceptions import RedisError from kombu.exceptions import KombuError -from celery import Task, states +from celery import Task, states, chord from celery.result import AsyncResult from collections import defaultdict @@ -26,11 +20,17 @@ import jsonschema from inventory_provider.db import ims_data from inventory_provider.db.ims import IMS +from inventory_provider.routes.classifier import get_ims_interface, \ + get_ims_equipment_name +from inventory_provider.routes.common import load_snmp_indexes +from inventory_provider.routes.poller import _load_interfaces, \ + _load_interface_bundles, _get_dashboard_data, _get_dashboards, \ + _load_services from inventory_provider.tasks.app import app from inventory_provider.tasks.common \ import get_next_redis, get_current_redis, \ latch_db, get_latch, set_latch, update_latch_status, \ - ims_sorted_service_type_key + ims_sorted_service_type_key, set_single_latch from inventory_provider.tasks import monitor from inventory_provider import config from inventory_provider import environment @@ -881,59 +881,6 @@ def update_lg_routers(self, use_current=False): r.set(f'ims:lg:{router["equipment name"]}', json.dumps(router)) -class OTRSFiles(IntFlag): - CUSTOMER_COMPANIES = 1 - CUSTOMER_USERS = 2 - - -@app.task(base=InventoryTask, bind=True, name='export_data_for_otrs') -@log_task_entry_and_exit -def export_data_for_otrs(self, files_to_export=None, export_duplicates=False): - debug_uuid = uuid4() - logger.debug(f'debug uuid: {debug_uuid}') - if files_to_export: - files_to_export = OTRSFiles(files_to_export) - else: - files_to_export = set(OTRSFiles) - - ims_config = InventoryTask.config["ims"] - otrs_config = InventoryTask.config["otrs-export"] - - command_template = 'rsync -aPq --no-perms --rsh="ssh -l {user} -p 22 -i {key_file} -o \'UserKnownHostsFile {known_hosts}\'" {source_dir}/* {destination}' # noqa - - with tempfile.TemporaryDirectory() as temp_dir: - temp_path = Path(temp_dir) - ds = IMS( - ims_config['api'], - ims_config['username'], - ims_config['password']) - - prefix = datetime.now().strftime('%Y%m%d') + '_' - - if OTRSFiles.CUSTOMER_COMPANIES in files_to_export: - cus_co_path = temp_path.joinpath(f'{prefix}customer_company.csv') - with open(cus_co_path, 'w+') as f: - writer = csv.writer(f, delimiter='^') - writer.writerows(ims_data.otrs_get_customer_company_rows(ds)) - - if OTRSFiles.CUSTOMER_USERS in files_to_export: - cus_usr_path = temp_path.joinpath(f'{prefix}customer_user.csv') - with open(cus_usr_path, 'w+') as f: - writer = csv.writer(f, delimiter='^') - writer.writerows(ims_data.otrs_get_customer_users_rows( - ds, return_duplicates=export_duplicates)) - - command = command_template.format( - user=otrs_config['username'], - key_file=otrs_config['private-key'], - known_hosts=otrs_config['known-hosts'], - source_dir=temp_dir, - destination=otrs_config['destination'] - ) - subprocess.run(command, shell=True, check=True) - return debug_uuid - - def _wait_for_tasks(task_ids, update_callback=lambda s: None): all_successful = True @@ -993,6 +940,7 @@ def refresh_finalizer(self, pending_task_ids_json): _build_subnet_db(update_callback=self.log_info) _build_snmp_peering_db(update_callback=self.log_info) _build_juniper_peering_db(update_callback=self.log_info) + populate_poller_interfaces_cache(warning_callback=self.log_warning) except (jsonschema.ValidationError, json.JSONDecodeError, @@ -1205,3 +1153,765 @@ def check_task_status(task_id, parent=None, forget=False): r.forget() yield result + +# =================================== chorded - currently only here for testing + + +# new +@app.task(base=InventoryTask, bind=True, name='update_entry_point') +@log_task_entry_and_exit +def update_entry_point(self): + routers = retrieve_and_persist_neteng_managed_device_list( + info_callback=self.log_info, + warning_callback=self.log_warning + ) + lab_routers = InventoryTask.config.get('lab-routers', []) + + _erase_next_db_chorded(InventoryTask.config) + update_latch_status(InventoryTask.config, pending=True) + + tasks = chord( + ( + ims_task.s().on_error(task_error_handler.s()), + chord( + (reload_router_config_chorded.s(r) for r in routers), + empty_task.si('router tasks complete') + ), + chord( + (reload_lab_router_config_chorded.s(r) for r in lab_routers), + empty_task.si('lab router tasks complete') + ) + ), + final_task.si().on_error(task_error_handler.s()) + )() + return tasks + + +# new +@app.task +def task_error_handler(request, exc, traceback): + update_latch_status(InventoryTask.config, pending=False, failure=True) + logger.warning('Task {0!r} raised error: {1!r}'.format(request.id, exc)) + + +# new +@app.task(base=InventoryTask, bind=True, name='empty_task') +def empty_task(self, message): + logger.warning(f'message from empty task: {message}') + + +def retrieve_and_persist_neteng_managed_device_list( + info_callback=lambda s: None, + warning_callback=lambda s: None): + netdash_equipment = None + try: + info_callback('querying netdash for managed routers') + netdash_equipment = list(juniper.load_routers_from_netdash( + InventoryTask.config['managed-routers'])) + except Exception as e: + warning_callback(f'Error retrieving device list: {e}') + + if netdash_equipment: + info_callback(f'found {len(netdash_equipment)} routers') + else: + warning_callback('No devices retrieved, using previous list') + try: + current_r = get_current_redis(InventoryTask.config) + netdash_equipment = current_r.get('netdash') + netdash_equipment = json.loads(netdash_equipment.decode('utf-8')) + if not netdash_equipment: + raise InventoryTaskError( + 'No equipment retrieved from previous list') + except Exception as e: + warning_callback(str(e)) + update_latch_status(pending=False, failure=True) + raise e + + try: + next_r = get_next_redis(InventoryTask.config) + next_r.set('netdash', json.dumps(netdash_equipment)) + info_callback(f'saved {len(netdash_equipment)} managed routers') + except Exception as e: + warning_callback(str(e)) + update_latch_status(pending=False, failure=True) + raise e + return netdash_equipment + + +# updated with transaction +def _erase_next_db_chorded(config): + """ + flush next db, but first save latch and then restore afterwards + + TODO: handle the no latch scenario nicely + :param config: + :return: + """ + r = get_next_redis(config) + saved_latch = get_latch(r) + + if saved_latch: + # execute as transaction to ensure that latch is always available in + # db that is being flushed + rp = r.pipeline() + rp.multi() + rp.flushdb() + set_single_latch( + rp, + saved_latch['this'], + saved_latch['current'], + saved_latch['next'], + saved_latch.get('timestamp', 0) + ) + rp.execute() + + # ensure latch is consistent in all dbs + set_latch( + config, + new_current=saved_latch['current'], + new_next=saved_latch['next'], + timestamp=saved_latch.get('timestamp', 0)) + + +# updated +@app.task(base=InventoryTask, bind=True, name='reload_lab_router_config') +@log_task_entry_and_exit +def reload_lab_router_config_chorded(self, hostname): + try: + self.log_info(f'loading netconf data for lab {hostname} RL') + + # load new netconf data, in this thread + netconf_str = retrieve_and_persist_netconf_config( + hostname, lab=True, update_callback=self.log_warning) + netconf_doc = etree.fromstring(netconf_str) + + refresh_juniper_interface_list(hostname, netconf_doc, lab=True) + + # load snmp indexes + community = juniper.snmp_community_string(netconf_doc) + if not community: + raise InventoryTaskError( + f'error extracting community string for {hostname}') + else: + self.log_info(f'refreshing snmp interface indexes for {hostname}') + logical_systems = juniper.logical_systems(netconf_doc) + + # load snmp data, in this thread + snmp_refresh_interfaces(hostname, community, logical_systems) + + self.log_info(f'updated configuration for lab {hostname}') + except Exception as e: + logger.error(e) + update_latch_status(InventoryTask.config, pending=True, failure=True) + + +# updated +@app.task(base=InventoryTask, bind=True, name='reload_router_config') +@log_task_entry_and_exit +def reload_router_config_chorded(self, hostname): + try: + self.log_info(f'loading netconf data for {hostname} RL') + netconf_str = retrieve_and_persist_netconf_config( + hostname, update_callback=self.log_warning) + + netconf_doc = etree.fromstring(netconf_str) + + # clear cached classifier responses for this router, and + # refresh peering data + logger.info(f'refreshing peers & clearing cache for {hostname}') + refresh_juniper_bgp_peers(hostname, netconf_doc) + refresh_juniper_interface_list(hostname, netconf_doc) + + # load snmp indexes + community = juniper.snmp_community_string(netconf_doc) + if not community: + raise InventoryTaskError( + f'error extracting community string for {hostname}') + else: + self.log_info(f'refreshing snmp interface indexes for {hostname}') + logical_systems = juniper.logical_systems(netconf_doc) + + # load snmp data, in this thread + snmp_refresh_interfaces_chorded( + hostname, community, logical_systems, self.log_info) + snmp_refresh_peerings_chorded(hostname, community, logical_systems) + + logger.info(f'updated configuration for {hostname}') + except Exception as e: + logger.error(e) + update_latch_status(InventoryTask.config, pending=True, failure=True) + + +# new +def retrieve_and_persist_netconf_config( + hostname, lab=False, update_callback=lambda s: None): + redis_key = f'netconf:{hostname}' + if lab: + redis_key = f'lab:{redis_key}' + + try: + netconf_doc = juniper.load_config( + hostname, InventoryTask.config["ssh"]) + netconf_str = etree.tostring(netconf_doc, encoding='unicode') + except (ConnectionError, juniper.NetconfHandlingError, InventoryTaskError): + msg = f'error loading netconf data from {hostname}' + logger.exception(msg) + update_callback(msg) + r = get_current_redis(InventoryTask.config) + + netconf_str = r.get(redis_key) + if not netconf_str: + update_callback(f'no cached netconf for {redis_key}') + raise InventoryTaskError( + f'netconf error with {hostname}' + f' and no cached netconf data found') + logger.info(f'Returning cached netconf data for {hostname}') + update_callback(f'Returning cached netconf data for {hostname}') + + r = get_next_redis(InventoryTask.config) + r.set(redis_key, netconf_str) + logger.info(f'netconf info loaded from {hostname}') + return netconf_str + + +# updated as is no longer a task +@log_task_entry_and_exit +def snmp_refresh_interfaces_chorded( + hostname, community, logical_systems, update_callback=lambda s: None): + try: + interfaces = list( + snmp.get_router_snmp_indexes(hostname, community, logical_systems)) + except ConnectionError: + msg = f'error loading snmp interface data from {hostname}' + logger.exception(msg) + update_callback(msg) + r = get_current_redis(InventoryTask.config) + interfaces = r.get(f'snmp-interfaces:{hostname}') + if not interfaces: + raise InventoryTaskError( + f'snmp error with {hostname}' + f' and no cached snmp interface data found') + # unnecessary json encode/decode here ... could be optimized + interfaces = json.loads(interfaces.decode('utf-8')) + update_callback(f'using cached snmp interface data for {hostname}') + + r = get_next_redis(InventoryTask.config) + + rp = r.pipeline() + rp.set(f'snmp-interfaces:{hostname}', json.dumps(interfaces)) + + # optimization for DBOARD3-372 + # interfaces is a list of dicts like: {'name': str, 'index': int} + for ifc in interfaces: + ifc['hostname'] = hostname + rp.set( + f'snmp-interfaces-single:{hostname}:{ifc["name"]}', + json.dumps(ifc)) + + rp.execute() + + update_callback(f'snmp interface info loaded from {hostname}') + + +# updated as is no longer a task +@log_task_entry_and_exit +def snmp_refresh_peerings_chorded( + hostname, community, logical_systems, update_callback=lambda S: None): + try: + peerings = list( + snmp.get_peer_state_info(hostname, community, logical_systems)) + except ConnectionError: + msg = f'error loading snmp peering data from {hostname}' + logger.exception(msg) + update_callback(msg) + r = get_current_redis(InventoryTask.config) + peerings = r.get(f'snmp-peerings:hosts:{hostname}') + if peerings is None: + raise InventoryTaskError( + f'snmp error with {peerings}' + f' and no cached peering data found') + # unnecessary json encode/decode here ... could be optimized + peerings = json.loads(peerings.decode('utf-8')) + update_callback(f'using cached snmp peering data for {hostname}') + + r = get_next_redis(InventoryTask.config) + r.set(f'snmp-peerings:hosts:{hostname}', json.dumps(peerings)) + + update_callback(f'snmp peering info loaded from {hostname}') + + +# new +@app.task(base=InventoryTask, bind=True, name='ims_task') +@log_task_entry_and_exit +def ims_task(self, use_current=False): + + try: + extracted_data = extract_ims_data() + transformed_data = transform_ims_data(extracted_data) + transformed_data['locations'] = extracted_data['locations'] + transformed_data['lg_routers'] = extracted_data['lg_routers'] + persist_ims_data(transformed_data, use_current) + except Exception as e: + logger.error(e) + update_latch_status(InventoryTask.config, pending=True, failure=True) + + +# new +def extract_ims_data(): + + c = InventoryTask.config["ims"] + ds1 = IMS(c['api'], c['username'], c['password']) + ds2 = IMS(c['api'], c['username'], c['password']) + ds3 = IMS(c['api'], c['username'], c['password']) + ds4 = IMS(c['api'], c['username'], c['password']) + ds5 = IMS(c['api'], c['username'], c['password']) + + locations = {} + lg_routers = [] + customer_contacts = {} + circuit_ids_to_monitor = [] + additional_circuit_customer_ids = {} + + hierarchy = {} + port_id_details = defaultdict(list) + port_id_services = defaultdict(list) + + def _populate_locations(): + nonlocal locations + locations = {k: v for k, v in ims_data.get_node_locations(ds1)} + + def _populate_lg_routers(): + nonlocal lg_routers + lg_routers = list(ims_data.lookup_lg_routers(ds5)) + + def _populate_customer_contacts(): + nonlocal customer_contacts + customer_contacts = \ + {k: v for k, v in ims_data.get_customer_service_emails(ds2)} + + def _populate_circuit_ids_to_monitor(): + nonlocal circuit_ids_to_monitor + circuit_ids_to_monitor = \ + list(ims_data.get_monitored_circuit_ids(ds3)) + + def _populate_additional_circuit_customer_ids(): + nonlocal additional_circuit_customer_ids + additional_circuit_customer_ids = \ + ims_data.get_circuit_related_customer_ids(ds4) + + exceptions = {} + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = { + executor.submit(_populate_locations): 'locations', + executor.submit(_populate_lg_routers): 'lg_routers', + executor.submit(_populate_customer_contacts): 'customer_contacts', + executor.submit(_populate_circuit_ids_to_monitor): + 'circuit_ids_to_monitor', + executor.submit(_populate_additional_circuit_customer_ids): + 'additional_circuit_customer_ids' + } + + for future in concurrent.futures.as_completed(futures): + if future.exception(): + exceptions[futures[future]] = str(future.exception()) + + if exceptions: + raise InventoryTaskError(json.dumps(exceptions, indent=2)) + + def _populate_hierarchy(): + nonlocal hierarchy + hierarchy = {d['id']: d for d in ims_data.get_circuit_hierarchy(ds1)} + logger.debug("hierarchy complete") + + def _populate_port_id_details(): + nonlocal port_id_details + for x in ims_data.get_port_details(ds2): + pd = port_id_details[x['port_id']] + pd.append(x) + logger.debug("Port details complete") + + def _populate_circuit_info(): + for x in ims_data.get_port_id_services(ds3): + port_id_services[x['port_a_id']].append(x) + logger.debug("port circuits complete") + + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = { + executor.submit(_populate_hierarchy): 'hierarchy', + executor.submit(_populate_port_id_details): 'port_id_details', + executor.submit(_populate_circuit_info): 'circuit_info' + } + + for future in concurrent.futures.as_completed(futures): + if future.exception(): + exceptions[futures[future]] = str(future.exception()) + + if exceptions: + raise InventoryTaskError(json.dumps(exceptions, indent=2)) + + return { + 'locations': locations, + 'lg_routers': lg_routers, + 'customer_contacts': customer_contacts, + 'circuit_ids_to_monitor': circuit_ids_to_monitor, + 'additional_circuit_customer_ids': additional_circuit_customer_ids, + 'hierarchy': hierarchy, + 'port_id_details': port_id_details, + 'port_id_services': port_id_services + } + + +# new +def transform_ims_data(data): + locations = data['locations'] + customer_contacts = data['customer_contacts'] + circuit_ids_to_monitor = data['circuit_ids_to_monitor'] + additional_circuit_customer_ids = data['additional_circuit_customer_ids'] + hierarchy = data['hierarchy'] + port_id_details = data['port_id_details'] + port_id_services = data['port_id_services'] + + def _get_circuit_contacts(c): + customer_ids = {c['customerid']} + customer_ids.update(additional_circuit_customer_ids.get(c['id'], [])) + return set().union( + *[customer_contacts.get(cid, []) for cid in customer_ids]) + + for d in hierarchy.values(): + d['contacts'] = sorted(list(_get_circuit_contacts(d))) + + def _convert_to_bits(value, unit): + unit = unit.lower() + conversions = { + 'm': 1 << 20, + 'mb': 1 << 20, + 'g': 1 << 30, + 'gbe': 1 << 30, + } + return int(value) * conversions[unit] + + def _get_speed(circuit_id): + c = hierarchy[circuit_id] + if c['status'] != 'operational': + return 0 + pattern = re.compile(r'^(\d+)([a-zA-z]+)$') + m = pattern.match(c['speed']) + if m: + try: + return _convert_to_bits(m[1], m[2]) + except KeyError as e: + logger.debug(f'Could not find key: {e} ' + f'for circuit: {circuit_id}') + return 0 + else: + if c['circuit-type'] == 'service' \ + or c['product'].lower() == 'ethernet': + return sum( + (_get_speed(x) for x in c['carrier-circuits']) + ) + else: + return 0 + + def _get_fibre_routes(c_id): + _circ = hierarchy.get(c_id, None) + if _circ is None: + return + if _circ['speed'].lower() == 'fibre_route': + yield _circ['id'] + else: + for cc in _circ['carrier-circuits']: + yield from _get_fibre_routes(cc) + + def _get_related_services(circuit_id: str) -> List[dict]: + rs = {} + c = hierarchy.get(circuit_id, None) + if c: + + if c['circuit-type'] == 'service': + rs[c['id']] = { + 'id': c['id'], + 'name': c['name'], + 'circuit_type': c['circuit-type'], + 'service_type': c['product'], + 'project': c['project'], + 'contacts': c['contacts'] + } + if c['id'] in circuit_ids_to_monitor: + rs[c['id']]['status'] = c['status'] + else: + rs[c['id']]['status'] = 'non-monitored' + + if c['sub-circuits']: + for sub in c['sub-circuits']: + temp_parents = \ + _get_related_services(sub) + rs.update({t['id']: t for t in temp_parents}) + return list(rs.values()) + + def _format_service(s): + + if s['circuit_type'] == 'service' \ + and s['id'] not in circuit_ids_to_monitor: + s['status'] = 'non-monitored' + pd_a = port_id_details[s['port_a_id']][0] + location_a = locations.get(pd_a['equipment_name'], None) + if location_a: + loc_a = location_a['pop'] + else: + loc_a = locations['UNKNOWN_LOC']['pop'] + logger.warning( + f'Unable to find location for {pd_a["equipment_name"]} - ' + f'Service ID {s["id"]}') + s['pop_name'] = loc_a['name'] + s['pop_abbreviation'] = loc_a['abbreviation'] + s['equipment'] = pd_a['equipment_name'] + s['card_id'] = '' # this is redundant I believe + s['port'] = pd_a['interface_name'] + s['logical_unit'] = '' # this is redundant I believe + if 'port_b_id' in s: + pd_b = port_id_details[s['port_b_id']][0] + location_b = locations.get(pd_b['equipment_name'], None) + if location_b: + loc_b = location_b['pop'] + else: + loc_b = locations['UNKNOWN_LOC']['pop'] + logger.warning( + f'Unable to find location for {pd_b["equipment_name"]} - ' + f'Service ID {s["id"]}') + + s['other_end_pop_name'] = loc_b['name'] + s['other_end_pop_abbreviation'] = loc_b['abbreviation'] + s['other_end_equipment'] = pd_b['equipment_name'] + s['other_end_port'] = pd_b['interface_name'] + else: + s['other_end_pop_name'] = '' + s['other_end_pop_abbreviation'] = '' + s['other_end_equipment'] = '' + s['other_end_port'] = '' + + s.pop('port_a_id', None) + s.pop('port_b_id', None) + + services_by_type = {} + interface_services = defaultdict(list) + + for key, value in port_id_details.items(): + for details in value: + k = f"{details['equipment_name']}:" \ + f"{details['interface_name']}" + circuits = port_id_services.get(details['port_id'], []) + + for circ in circuits: + contacts = _get_circuit_contacts(circ) + circ['fibre-routes'] = [] + for x in set(_get_fibre_routes(circ['id'])): + c = { + 'id': hierarchy[x]['id'], + 'name': hierarchy[x]['name'], + 'status': hierarchy[x]['status'] + } + circ['fibre-routes'].append(c) + + circ['related-services'] = \ + _get_related_services(circ['id']) + + for tlc in circ['related-services']: + contacts.update(tlc.pop('contacts')) + circ['contacts'] = sorted(list(contacts)) + + circ['calculated-speed'] = _get_speed(circ['id']) + _format_service(circ) + + type_services = services_by_type.setdefault( + ims_sorted_service_type_key(circ['service_type']), dict()) + type_services[circ['id']] = circ + + interface_services[k].extend(circuits) + + return { + 'hierarchy': hierarchy, + 'interface_services': interface_services, + 'services_by_type': services_by_type + } + + +# new +def persist_ims_data(data, use_current=False): + hierarchy = data['hierarchy'] + locations = data['locations'] + lg_routers = data['lg_routers'] + interface_services = data['interface_services'] + services_by_type = data['services_by_type'] + + if use_current: + r = get_current_redis(InventoryTask.config) + + # only need to delete the individual keys if it's just an IMS update + # rather than a complete update (the db will have been flushed) + for key_pattern in [ + 'ims:location:*', + 'ims:lg:*', + 'ims:circuit_hierarchy:*', + 'ims:interface_services:*', + 'ims:access_services:*', + 'ims:gws_indirect:*' + ]: + rp = r.pipeline() + for k in r.scan_iter(key_pattern, count=1000): + rp.delete(k) + else: + r = get_next_redis(InventoryTask.config) + + rp = r.pipeline() + for h, d in locations.items(): + rp.set(f'ims:location:{h}', json.dumps([d])) + rp.execute() + rp = r.pipeline() + for router in lg_routers: + rp.set(f'ims:lg:{router["equipment name"]}', json.dumps(router)) + rp.execute() + rp = r.pipeline() + for circ in hierarchy.values(): + rp.set(f'ims:circuit_hierarchy:{circ["id"]}', json.dumps([circ])) + rp.execute() + rp = r.pipeline() + for k, v in interface_services.items(): + rp.set( + f'ims:interface_services:{k}', + json.dumps(v)) + rp.execute() + rp = r.pipeline() + + populate_poller_cache(interface_services, r) + + for service_type, services in services_by_type.items(): + for v in services.values(): + rp.set( + f'ims:services:{service_type}:{v["name"]}', + json.dumps({ + 'id': v['id'], + 'name': v['name'], + 'project': v['project'], + 'here': { + 'pop': { + 'name': v['pop_name'], + 'abbreviation': v['pop_abbreviation'] + }, + 'equipment': v['equipment'], + 'port': v['port'], + }, + 'there': { + 'pop': { + 'name': v['other_end_pop_name'], + 'abbreviation': v['other_end_pop_abbreviation'] + }, + 'equipment': v['other_end_equipment'], + 'port': v['other_end_port'], + }, + 'speed_value': v['calculated-speed'], + 'speed_unit': 'n/a', + 'status': v['status'], + 'type': v['service_type'] + })) + + rp.execute() + + +# new +@app.task(base=InventoryTask, bind=True, name='final_task') +@log_task_entry_and_exit +def final_task(self): + + r = get_current_redis(InventoryTask.config) + latch = get_latch(r) + if latch['failure']: + raise InventoryTaskError('Sub-task failed - check logs for details') + + _build_subnet_db(update_callback=self.log_info) + _build_snmp_peering_db(update_callback=self.log_info) + _build_juniper_peering_db(update_callback=self.log_info) + populate_poller_interfaces_cache() + + latch_db(InventoryTask.config) + self.log_info('latched current/next dbs') + + +def populate_poller_interfaces_cache(warning_callback=lambda s: None): + no_lab_cache_key = 'classifier-cache:poller-interfaces:no-lab' + all_cache_key = 'classifier-cache:poller-interfaces:all' + non_lab_populated_interfaces = None + all_populated_interfaces = None + + r = get_next_redis(InventoryTask.config) + + try: + lab_keys_pattern = 'lab:netconf-interfaces-hosts:*' + lab_equipment = [h.decode('utf-8')[len(lab_keys_pattern) - 1:] + for h in r.keys(lab_keys_pattern)] + standard_interfaces = _load_interfaces( + InventoryTask.config, + no_lab=False, + use_next_redis=True) + + bundles = _load_interface_bundles( + InventoryTask.config, + use_next_redis=True + ) + snmp_indexes = load_snmp_indexes( + InventoryTask.config, use_next_redis=True) + + services = _load_services(InventoryTask.config, use_next_redis=True) + + def _get_populated_interfaces(interfaces): + + for ifc in interfaces: + router_snmp = snmp_indexes.get(ifc['router'], None) + if router_snmp and ifc['name'] in router_snmp: + ifc['snmp-index'] = router_snmp[ifc['name']]['index'] + + router_bundle = bundles.get(ifc['router'], None) + if router_bundle: + base_ifc = ifc['name'].split('.')[0] + ifc['bundle-parents'] = router_bundle.get(base_ifc, []) + + router_services = services.get( + get_ims_equipment_name(ifc['router'], r), None) + if router_services: + ifc['circuits'] = router_services.get( + get_ims_interface(ifc['name']), [] + ) + + dashboards = _get_dashboards(ifc) + ifc['dashboards'] = sorted([d.name for d in dashboards]) + yield _get_dashboard_data(ifc) + else: + continue + + all_populated_interfaces = \ + list(_get_populated_interfaces(standard_interfaces)) + non_lab_populated_interfaces = [x for x in all_populated_interfaces + if x['router'] not in lab_equipment] + + except Exception as e: + warning_callback(f"Failed to retrieve all required data {e}") + + if not non_lab_populated_interfaces or not all_populated_interfaces: + previous_r = get_current_redis(InventoryTask.config) + + def _load_previous(key): + try: + warning_callback(f"populating {key} " + "from previously cached data") + return json.loads(previous_r.get(key)) + except Exception as e: + warning_callback(f"Failed to load {key} " + f"from previously cached data: {e}") + + if not non_lab_populated_interfaces: + non_lab_populated_interfaces = _load_previous(no_lab_cache_key) + + if not all_populated_interfaces: + all_populated_interfaces = _load_previous(all_cache_key) + + r.set(no_lab_cache_key, json.dumps(non_lab_populated_interfaces)) + r.set(all_cache_key, json.dumps(all_populated_interfaces)) diff --git a/setup.py b/setup.py index d5fb5262362f356e6af7553d0662c6e3ae7badf5..4e84bda1964e518d5f898d628bc89de2b343cf79 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup, find_packages setup( name='inventory-provider', - version="0.69", + version="0.70", author='GEANT', author_email='swd@geant.org', description='Dashboard inventory provider', diff --git a/test/conftest.py b/test/conftest.py index bdfe95676907fa170cde88260eadc5385ffdb9a4..c71ce4daff1c3b6c1fd569576e0b9de89b4bd628 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -45,12 +45,6 @@ def data_config_filename(): "socket_timeout": 2.8 }, "redis-databases": [0, 7], - "otrs-export": { - "username": "otrs_username", - "private-key": "otrs_ky_loc", - "destination": "otrs_dest", - "known-hosts": "otrs_known_hosts" - }, "ims": { "api": "ims_api", "username": "ims_username", diff --git a/test/data/gws-direct.json b/test/data/gws-direct.json index db422736b59a697fda797b4a1f9390fc8f430ecd..3e399fc623f0ffa1481f8c90ed4075af08186918 100644 --- a/test/data/gws-direct.json +++ b/test/data/gws-direct.json @@ -285,5 +285,32 @@ ] } ] + }, + { + "nren": "HEANET", + "isp": "CenturyLink", + "hosts": [ + { + "hostname": "core2-cwt.nn.hea.net", + "sec-name": "abababab", + "auth": { + "protocol": "DES", + "password": "ccccddddccccdddd" + }, + "priv": { + "protocol": "MD5", + "password": "eeeeffffeeeeffff" + }, + "interfaces": [ + { + "tag": "a", + "counters": { + "traffic_in": "1.3.6.1.2.1.31.1.1.1.6.645", + "traffic_out": "1.3.6.1.2.1.31.1.1.1.10.645" + } + } + ] + } + ] } ] \ No newline at end of file diff --git a/test/test_general_poller_routes.py b/test/test_general_poller_routes.py index e17d8f3f6c1cbd2438020a7b21740bf7390bd05b..f9244bb5e6ef72d9a8225f83a868ecdc21004737 100644 --- a/test/test_general_poller_routes.py +++ b/test/test_general_poller_routes.py @@ -4,8 +4,7 @@ import pytest from inventory_provider.routes import poller DEFAULT_REQUEST_HEADERS = { - "Content-type": "application/json", - "Accept": ["application/json"] + 'Accept': ['application/json'] } @@ -43,7 +42,7 @@ def test_all_router_interface_speeds(client): headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 - response = json.loads(rv.data.decode("utf-8")) + response = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response, poller.INTERFACE_SPEED_LIST_SCHEMA) assert response # at least shouldn't be empty response_routers = {ifc['router'] for ifc in response} @@ -55,7 +54,7 @@ def test_eumetsat_multicast(mocker, client): # routers don't have snmp acl's for us mocker.patch('inventory_provider.juniper.snmp_community_string') \ - .return_value = 'blah' + .return_value = 'blah' rv = client.get( '/poller/eumetsat-multicast', @@ -215,3 +214,128 @@ def test_dcu_oid_values(ifIndex, expected_oid): def test_fw_counter_bytes_oid_values(customer, interface_name, expected_oid): assert poller._jnx_fw_counter_bytes_oid( customer, interface_name) == expected_oid + + +@pytest.mark.parametrize('description,expected_dashboards', [ + ('SRV_IAS CUSTOMER JISC #JISC-AP1-IAS IASPS | ASN786', + ['IAS_CUSTOMER', 'NREN']), + ('SRV_L2CIRCUIT CUSTOMER JISC JISC #DUB-LON-NRENBBEXT-JANET-13015 | backup for niran ', # noqa: E501 + ['L2_CIRCUIT']), + ('SRV_L3VPN CUSTOMER EENET #EENET-AP2-LHCONE | ASN3221', + ['LHCONE', 'LHCONE_CUST', 'NREN']), + ('SRV_IAS PRIVATE OPTIMA-TELEKOM #HR-EduZone | For Eduzone', + ['IAS_PEERS', 'IAS_PRIVATE']), + ('SRV_CLS PRIVATE AWS #AT-AWS-CLS|ASN16509 | ', + ['CLS', 'CLS_PEERS']), + ('SRV_IAS PUBLIC MIX #IX_Peerings_in_MIX |', + ['IAS_PEERS', 'IAS_PUBLIC']), + ('LAG INFRASTRUCTURE BACKBONE SRF0000001 | bil-por', + ['INFRASTRUCTURE_BACKBONE']), + ('SRV_GCS CUSTOMER FCCN MICROSOFT #FCCN_NoveSBE_ExpressRoute_Vlan1945 | UNIT CONFIGURATION HAS BEEN SYSTEM GENERATED', # noqa: E501 + ['GCS']), + ('PHY UPSTREAM TELIA SRF9940473 | Telia ID: IC-326863', + ['GWS_PHY_UPSTREAM']), + ('SRV_IAS UPSTREAM COGENT #COGENT_GWS_VIE | ASN174', + ['IAS_UPSTREAM']), + ('SRV_L3VPN RE_INTERCONNECT CLARA #REDCLARA-LIS-LHCONE | ASN27750', + ['LHCONE', 'LHCONE_PEER', 'RE_PEER']), + ('SRV_MDVPN CUSTOMER REDIRIS #RedIRIS_AP1_BGP_LU_CoC_1 | MD VPN CoC-REDIRIS - ', # noqa: E501 + ['MDVPN_CUSTOMERS', 'NREN']), + ('SRV_L2CIRCUIT CUSTOMER TENET PSNC #lon-lon-GEANTOPEN-PSNC-TENET-18067 |', # noqa: E501 + ['GEANTOPEN', 'L2_CIRCUIT']) +]) +def test_interface_dashboard_mapping(description, expected_dashboards): + interface = { + 'router': '', + 'name': '', + 'description': description + } + dashboards = poller._get_dashboards(interface) + dashboards = [d.name for d in dashboards] + assert set(list(dashboards)) == set(expected_dashboards) + + +@pytest.mark.parametrize('interface,dashboard_info', [ + ({ + 'description': 'SRV_IAS CUSTOMER JISC #JISC-AP1-IAS IASPS | ASN786', + 'dashboards': ['IAS_CUSTOMER', 'NREN'] + }, {'name': 'JISC', 'interface_type': 'LOGICAL'}), + ({ + 'description': 'SRV_L2CIRCUIT CUSTOMER JISC JISC #DUB-LON-NRENBBEXT-JANET-13015 | backup for niran ', # noqa: E501 + 'dashboards': ['L2_CIRCUIT'] + }, {'name': 'JISC', 'interface_type': 'LOGICAL'}), + ({ + 'description': 'SRV_L3VPN CUSTOMER EENET #EENET-AP2-LHCONE | ASN3221', + 'dashboards': ['LHCONE', 'LHCONE_CUST', 'NREN'] + }, {'name': 'EENET', 'interface_type': 'LOGICAL'}), + ({ + 'description': 'SRV_IAS PRIVATE OPTIMA-TELEKOM #HR-EduZone | For Eduzone', # noqa: E501 + 'dashboards': ['IAS_PEERS', 'IAS_PRIVATE'] + }, {'name': 'OPTIMA-TELEKOM', 'interface_type': 'LOGICAL'}), + ({ + 'description': 'SRV_CLS PRIVATE AWS #AT-AWS-CLS|ASN16509 | ', + 'dashboards': ['CLS', 'CLS_PEERS'] + }, {'name': 'AWS', 'interface_type': 'LOGICAL'}), + ({ + 'description': 'SRV_IAS PUBLIC MIX #IX_Peerings_in_MIX |', + 'dashboards': ['IAS_PEERS', 'IAS_PUBLIC'] + }, {'name': 'MIX', 'interface_type': 'LOGICAL'}), + ({ + 'description': 'LAG INFRASTRUCTURE BACKBONE SRF0000001 | bil-por', + 'dashboards': ['INFRASTRUCTURE_BACKBONE'] + }, {'name': 'bil-por', 'interface_type': 'AGGREGATE'}), + ({ + 'description': 'SRV_GCS CUSTOMER FCCN MICROSOFT #FCCN_NoveSBE_ExpressRoute_Vlan1945 | UNIT CONFIGURATION HAS BEEN SYSTEM GENERATED', # noqa: E501 + 'dashboards': ['GCS'] + }, {'name': 'FCCN', 'interface_type': 'LOGICAL'}), + ({ + 'description': 'PHY UPSTREAM TELIA SRF9940473 | Telia ID: IC-326863', + 'router': 'mx1.bud.hu.geant.net', + 'dashboards': ['GWS_PHY_UPSTREAM'] + }, {'name': 'TELIA - BUD', 'interface_type': 'PHYSICAL'}), + ({ + 'description': 'SRV_IAS UPSTREAM COGENT #COGENT_GWS_VIE | ASN174', + 'dashboards': ['IAS_UPSTREAM'] + }, {'name': 'COGENT', 'interface_type': 'LOGICAL'}), + ({ + 'description': 'SRV_L3VPN RE_INTERCONNECT CLARA #REDCLARA-LIS-LHCONE | ASN27750', # noqa: E501 + 'dashboards': ['LHCONE', 'LHCONE_PEER', 'RE_PEER'] + }, {'name': 'CLARA', 'interface_type': 'LOGICAL'}), + ({ + 'description': 'SRV_MDVPN CUSTOMER REDIRIS #RedIRIS_AP1_BGP_LU_CoC_1 | MD VPN CoC-REDIRIS - ', # noqa: E501 + 'dashboards': ['MDVPN_CUSTOMERS', 'NREN'] + }, {'name': 'REDIRIS', 'interface_type': 'LOGICAL'}), + ({ + 'description': 'SRV_L2CIRCUIT CUSTOMER TENET PSNC #lon-lon-GEANTOPEN-PSNC-TENET-18067 |', # noqa: E501 + 'dashboards': ['GEANTOPEN', 'L2_CIRCUIT'] + }, {'name': 'TENET', 'interface_type': 'LOGICAL'}), +]) +def test_description_dashboard_parsing(interface, dashboard_info): + + updated = poller._get_dashboard_data(interface) + info = updated['dashboard_info'] + assert info == dashboard_info + + +def test_gws_config_json(client): + rv = client.get( + '/poller/gws/direct-config', + headers={'Accept': ['application/json']}) + assert rv.status_code == 200 + assert rv.is_json + response_data = json.loads(rv.data.decode('utf-8')) + # just a sanity check, no validation + # ... for now, this isn't an important interface + assert response_data + + +def test_gws_config_html(client): + rv = client.get( + '/poller/gws/direct-config?format=html', + headers={'Accept': ['text/html']}) + assert rv.status_code == 200 + response_data = rv.data.decode('utf-8') + # just a sanity check, no validation + # ... for now, this isn't an important interface + assert response_data.startswith('<html>') + assert response_data.endswith('</html>') diff --git a/test/test_ims_data.py b/test/test_ims_data.py index 74fa71c3df0ec3d797511b72ac077048d1142f49..cd8d7e421eb8750f077bf74a22c8f6b3ee3d019c 100644 --- a/test/test_ims_data.py +++ b/test/test_ims_data.py @@ -3,8 +3,7 @@ import json import inventory_provider from inventory_provider.db.ims import InventoryStatus from inventory_provider.db.ims_data import lookup_lg_routers, \ - otrs_get_customer_company_rows, \ - otrs_get_customer_users_rows, get_node_locations, IMS_OPSDB_STATUS_MAP, \ + get_node_locations, IMS_OPSDB_STATUS_MAP, \ get_port_id_services, get_port_details, \ get_circuit_hierarchy @@ -287,89 +286,3 @@ def test_get_node_location(mocker): 'latitude': 51.5308142, } }) - - -def test_otrs_get_customer_company_rows(mocker): - ims = mocker.patch('inventory_provider.db.ims.IMS') - with open('test/data/ims_otrs_customers.json') as data: - resp_data = json.load(data) - - def se(*args, **kargs): - return resp_data[args[0]] - mocked_get_all_entities = ims.return_value.get_all_entities - mocked_get_all_entities.side_effect = se - ds = inventory_provider.db.ims.IMS( - 'dummy_base', 'dummy_username', 'dummy_password') - cus_comp_rows = list(otrs_get_customer_company_rows(ds)) - assert len(cus_comp_rows) == 6 - mocked_get_all_entities.assert_any_call('Customer') - mocked_get_all_entities.assert_any_call('Vendor') - assert cus_comp_rows[0] == ['customer_id', 'name', 'street', 'zip', - 'city', 'country', 'url', 'comments'] - ids = [] - names = [] - for row in cus_comp_rows[1:]: - assert len(row) == 8 - ids.append(row[0]) - names.append(row[1]) - assert ids == ['DUMMY1', 'DUMMY2', 'DUMMY3', 'DUMMY4', 'DUMMY5'] - assert names == ['DUMMY 1', 'DUMMY 2', 'DUMMY 3', 'DUMMY 4', 'DUMMY 5'] - - -def test_otrs_get_customer_users(mocker): - ims = mocker.patch('inventory_provider.db.ims.IMS') - resp_data = {} - with open('test/data/ims_otrs_customers_extra.json') as data: - resp_data['Customer'] = json.load(data) - with open('test/data/ims_otrs_vendor_contact_extra.json') as data: - resp_data['VendorRelatedContact'] = json.load(data) - - def se(*args, **kargs): - return resp_data.get(args[0], []) - - mocked_get_all_entities = ims.return_value.get_all_entities - mocked_get_all_entities.side_effect = se - - mocked_get_all_entities = ims.return_value.get_all_entities - ds = inventory_provider.db.ims.IMS( - 'dummy_base', 'dummy_username', 'dummy_password') - - customer_users = list(otrs_get_customer_users_rows(ds)) - mocked_get_all_entities.assert_any_call('Customer', [32768, 262144]) - mocked_get_all_entities.assert_any_call('VendorRelatedContact', [8, 16]) - - assert customer_users[0] == [ - 'email', - 'username', - 'customer_id', - 'customer_id_2', - 'title', - 'firstname', - 'lastname', - 'phone', - 'fax', - 'mobile', - 'street', - 'zip', - 'city', - 'country', - 'comments' - ] - - assert len(customer_users) == 13 - assert customer_users[1] == [ - 'BANOC@DUMMY2.COM', 'BANOC@DUMMY2.COM', 'DUMMY2', '', '', - 'DUMMY 2 NOC', '-', '', '', '', - '', '', '', '', '' - ] - assert customer_users[6] == [ - 'HNOC@DUMMY8.COM', 'HNOC@DUMMY8.COM', 'DUMMY8', 'OTRS-GEANT-NREN', '', - 'H D_FIRST', 'H D_INFIX H D_LAST', '', '', '', - '', '', '', '', '' - ] - assert customer_users[10] == [ - 'K@DUMMY10.FR', 'K@DUMMY10.FR', 'DUMMY10', '', '', - 'K D_FIRST', 'K D_INFIX K D_LAST', '', '', '', - '', '', '', '', '' - ] - assert customer_users[12][6] == 'M LAST' diff --git a/test/test_worker.py b/test/test_worker.py index 4a2a53e1b466a8f7f4e7338567996546d2d94ba0..fa254f671fce9d5e1b96109b839ce26dd9aa2ed2 100644 --- a/test/test_worker.py +++ b/test/test_worker.py @@ -1,44 +1,508 @@ -import os -import re +import json -from inventory_provider.tasks.worker import export_data_for_otrs, OTRSFiles +from inventory_provider.tasks import common +from inventory_provider.tasks.worker import transform_ims_data, \ + extract_ims_data, persist_ims_data, \ + retrieve_and_persist_neteng_managed_device_list, \ + populate_poller_interfaces_cache -def test_otrs_exports(data_config_filename, data_config, mocker): - os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME'] = data_config_filename - otrs_config = data_config["otrs-export"] - mocked_writer = \ - mocker.patch('inventory_provider.tasks.worker.csv.writer') - mocked_row_writer = mocked_writer.return_value.writerows - mocked_run = mocker.patch( - 'inventory_provider.tasks.worker.subprocess.run') +def test_extract_ims_data(mocker): + + mocker.patch( + 'inventory_provider.tasks.worker.InventoryTask.config' + ) + mocker.patch( + 'inventory_provider.tasks.worker.ims_data.get_node_locations', + return_value=[('loc_a', 'LOC A'), ('loc_b', 'LOC B')] + ) + mocker.patch( + 'inventory_provider.tasks.worker.ims_data.lookup_lg_routers', + return_value=['lg router 1', 'lg router 2'] + ) + mocker.patch( + 'inventory_provider.tasks.worker.ims_data.get_customer_service_emails', + return_value=[('123', 'CON A'), ('456', 'CON B')] + ) + mocker.patch( + 'inventory_provider.tasks.worker.ims_data.get_monitored_circuit_ids', + return_value=[123, 456, 789] + ) mocker.patch( - 'inventory_provider.tasks.worker.ims_data.otrs_get_customer_company_rows', # noqa - return_value=[[1, 2, 3, 4], [5, 6, 7, 8]] + 'inventory_provider.tasks.worker.ims_data.' + 'get_circuit_related_customer_ids', + return_value=[{'id a': ['A', 'A2']}, {'id b': ['B']}] ) mocker.patch( - 'inventory_provider.tasks.worker.ims_data.otrs_get_customer_users_rows', # noqa - return_value=[[9, 10, 11, 12], [13, 14, 15, 16]] + 'inventory_provider.tasks.worker.ims_data.get_circuit_hierarchy', + return_value=[ + {'id': '1', 'value': 'A'}, + {'id': '2', 'value': 'B'} + ] + ) + mocker.patch( + 'inventory_provider.tasks.worker.ims_data.get_port_details', + return_value=[ + {'port_id': 'A', 'value': 'a'}, + {'port_id': 'B', 'value': 'b'}, + {'port_id': 'B', 'value': 'c'} + ] + ) + mocker.patch( + 'inventory_provider.tasks.worker.ims_data.get_port_id_services', + return_value=[ + {'port_a_id': '1', 'value': '1A'}, + {'port_a_id': '1', 'value': '1B'}, + {'port_a_id': '2', 'value': '2A'} + ] + ) + res = extract_ims_data() + assert res['locations'] == {'loc_a': 'LOC A', 'loc_b': 'LOC B'} + assert res['lg_routers'] == ['lg router 1', 'lg router 2'] + assert res['customer_contacts'] == {'123': 'CON A', '456': 'CON B'} + assert res['circuit_ids_to_monitor'] == [123, 456, 789] + assert res['additional_circuit_customer_ids'] == \ + [{'id a': ['A', 'A2']}, {'id b': ['B']}] + assert res['hierarchy'] == { + '1': {'id': '1', 'value': 'A'}, + '2': {'id': '2', 'value': 'B'} + } + assert res['port_id_details'] == { + 'A': [{'port_id': 'A', 'value': 'a'}], + 'B': [ + {'port_id': 'B', 'value': 'b'}, + {'port_id': 'B', 'value': 'c'} + ] + } + assert res['port_id_services'] == { + '1': [ + {'port_a_id': '1', 'value': '1A'}, + {'port_a_id': '1', 'value': '1B'} + ], + '2': [{'port_a_id': '2', 'value': '2A'}] + } + + +def test_transform_ims_data(): + locations = { + "eq_a": { + "pop": { + "name": "pop_loc_a", + "abbreviation": "pla", + } + }, + "eq_b": { + "pop": { + "name": "pop_loc_b", + "abbreviation": "plb", + } + }, + "UNKNOWN_LOC": { + "pop": { + "name": "UNKNOWN", + "abbreviation": "UNKNOWN", + } + } + } + + additional_circuit_customer_ids = { + "circ_id_1": "cu_1_1" + } + + customer_contacts = { + "cu_1": ["customer_1@a.org"], + "cu_1_1": ["customer_1_1@a.org"] + } + + port_id_details = { + "port_id_1": [{ + "equipment_name": "eq_a", + "interface_name": "if_a", + "port_id": "port_id_1" + }], + "port_id_2": [{ + "equipment_name": "eq_b", + "interface_name": "if_b", + "port_id": "port_id_2" + }] + } + + port_id_services = { + "port_id_1": [ + { + "id": "circ_id_1", + "customerid": "cu_1", + "circuit_type": "circuit", + "service_type": "ETHERNET", + "status": "operational", + "port_a_id": "port_id_1", + "port_b_id": "port_id_2", + + } + ], + "port_id_2": [ + { + "id": "circ_id_1", + "customerid": "cu_1", + "circuit_type": "circuit", + "service_type": "ETHERNET", + "status": "operational", + "port_a_id": "port_id_2", + "port_b_id": "port_id_1", + + } + ] + } + + hierarchy = { + "circ_id_1": { + "id": "circ_id_1", + "name": "circ_name_1", + "status": "operational", + "circuit-type": "circuit", + "product": "ethernet", + "speed": "not fibre_route", + "carrier-circuits": ["carrier_id_1"], + "sub-circuits": ["sub_circuit_1"], + "customerid": "cu_1", + }, + "carrier_id_1": { + "id": "carrier_id_1", + "name": "circ_carrier_name_1", + "status": "operational", + "circuit-type": "circuit", + "product": "ethernet", + "speed": "10G", + "carrier-circuits": ["carrier_id_2"], + "sub-circuits": ["circ_id_1"], + "customerid": "cu_1", + }, + "carrier_id_2": { + "id": "carrier_id_2", + "name": "circ_carrier_name_3", + "status": "operational", + "circuit-type": "circuit", + "product": "ethernet", + "speed": "not fibre_route", + "carrier-circuits": ["carrier_id_3"], + "sub-circuits": ["carrier_id_1"], + "customerid": "cu_1", + }, + "carrier_id_3": { + "id": "carrier_id_3", + "name": "Fiber Route Circuit", + "status": "operational", + "circuit-type": "circuit", + "product": "OCG4", + "speed": "fibre_route", + "carrier-circuits": [], + "sub-circuits": ["carrier_id_2"], + "customerid": "cu_1", + }, + "sub_circuit_1": { + "id": "sub_circuit_1", + "name": "sub_circuit_name_1", + "status": "operational", + "circuit-type": "circuit", + "product": "ethernet", + "speed": "not fibre_route", + "carrier-circuits": ["circ_id_1"], + "sub-circuits": ["sub_circuit_2"], + "customerid": "cu_1", + }, + "sub_circuit_2": { + "id": "sub_circuit_2", + "name": "sub_circuit_name_2", + "status": "operational", + "circuit-type": "service", + "product": "PEERING R & E", + "speed": "not fiber route", + "project": "Project A", + "carrier-circuits": ["sub_circuit_1"], + "sub-circuits": [], + "customerid": "cu_1", + } + } + data = { + "locations": locations, + "customer_contacts": customer_contacts, + "circuit_ids_to_monitor": [], + "additional_circuit_customer_ids": additional_circuit_customer_ids, + "hierarchy": hierarchy, + "port_id_details": port_id_details, + "port_id_services": port_id_services + } + res = transform_ims_data(data) + ifs = res["interface_services"] + assert list(ifs.keys()) == ["eq_a:if_a", "eq_b:if_b"] + for v in ifs.values(): + assert len(v) == 1 + assert len(v[0]["related-services"]) == 1 + assert v[0]["related-services"][0]["id"] == "sub_circuit_2" + assert len(v[0]["fibre-routes"]) == 1 + assert v[0]["fibre-routes"][0]["id"] == "carrier_id_3" + + +def test_persist_ims_data(mocker, data_config, mocked_redis): + + r = common._get_redis(data_config) + mocker.patch('inventory_provider.tasks.worker.get_next_redis', + return_value=r) + + data = { + "locations": {"loc_a": "LOC A", "loc_b": "LOC B"}, + "lg_routers": [ + {"equipment name": "lg_eq1"}, {"equipment name": "lg_eq2"} + ], + "hierarchy": {"c1": {"id": "123"}, "c2": {"id": "456"}}, + "interface_services": { + "if1": [ + { + "equipment": "eq1", + "port": "port1", + "id": "id1", + "name": "name1", + "service_type": "type1", + "status": "operational" + }, + { + "equipment": "eq1", + "port": "port2", + "id": "id3", + "name": "name2", + "service_type": "type2", + "status": "operational" + } + ], + "if2": [ + { + "equipment": "eq2", + "port": "port1", + "id": "id3", + "name": "name3", + "service_type": "type1", + "status": "operational" + } + ] + }, + "services_by_type": {}, + } + for k in r.keys("ims:*"): + r.delete(k) + persist_ims_data(data) + + assert [k.decode("utf-8") for k in r.keys("ims:location:*")] == \ + ["ims:location:loc_a", "ims:location:loc_b"] + + assert [k.decode("utf-8") for k in r.keys("ims:lg:*")] == \ + ["ims:lg:lg_eq1", "ims:lg:lg_eq2"] + + assert [k.decode("utf-8") for k in r.keys("ims:circuit_hierarchy:*")] == \ + ["ims:circuit_hierarchy:123", "ims:circuit_hierarchy:456"] + + assert [k.decode("utf-8") for k in r.keys("ims:interface_services:*")] == \ + ["ims:interface_services:if1", "ims:interface_services:if2"] + + assert [k.decode("utf-8") for k in r.keys("poller_cache:*")] == \ + ["poller_cache:eq1", "poller_cache:eq2"] + + +def test_retrieve_and_persist_neteng_managed_device_list( + mocker, data_config, mocked_redis): + device_list = ['abc', 'def'] + r = common._get_redis(data_config) + + mocker.patch( + 'inventory_provider.tasks.worker.InventoryTask.config' + ) + mocker.patch('inventory_provider.tasks.worker.get_next_redis', + return_value=r) + r.delete('netdash') + mocked_j = mocker.patch( + 'inventory_provider.tasks.worker.juniper.load_routers_from_netdash' ) + mocked_j.return_value = device_list + result = retrieve_and_persist_neteng_managed_device_list() + assert result == device_list + assert json.loads(r.get('netdash')) == device_list + - export_data_for_otrs(OTRSFiles.CUSTOMER_COMPANIES) - mocked_row_writer.assert_called_with([[1, 2, 3, 4], [5, 6, 7, 8]]) +def test_populate_poller_interfaces_cache( + mocker, data_config, mocked_redis): + r = common._get_redis(data_config) + all_interfaces = [ + { + "router": "router_a.geant.net", + "name": "interface_a", + "bundle": ["ae_a"], + "bundle-parents": [], + "description": "DESCRIPTION A", + "circuits": [] + }, + { + "router": "router_a.geant.net", + "name": "ae_a", + "bundle": [], + "bundle-parents": [], + "description": "DESCRIPTION B", + "circuits": [] + }, + { + "router": "router_a.geant.net", + "name": "ae_a.123", + "bundle": [], + "bundle-parents": [], + "description": "DESCRIPTION C", + "circuits": [] + }, + { + "router": "lab_router_a.geant.net", + "name": "lab_interface_a", + "bundle": ["ae_c"], + "bundle-parents": [], + "description": "DESCRIPTION C", + "circuits": [] + }, + { + "router": "lab_router_a.geant.net", + "name": "ae_c", + "bundle": [], + "bundle-parents": [], + "description": "DESCRIPTION D", + "circuits": [] + }, + ] - export_data_for_otrs(OTRSFiles.CUSTOMER_USERS) - mocked_row_writer.assert_called_with([[9, 10, 11, 12], [13, 14, 15, 16]]) + bundles = { + "router_z.geant.net": {"ae_1": ["interface_z"]}, + "lab_router_a.geant.net": {"ae_c": ["lab_interface_a"]}, + "router_a.geant.net": {"ae_a": ["interface_a"]}, + } - export_data_for_otrs() - assert mocked_row_writer.call_count == 4 + snmp_indexes = { + "router_a.geant.net": { + "ae_a": { + "name": "ae_a", + "index": 1, + "community": "COMMUNITY_A" + }, + "ae_a.123": { + "name": "ae_a.123", + "index": 1231, + "community": "COMMUNITY_A" + }, + "interface_a": { + "name": "interface_a", + "index": 12, + "community": "COMMUNITY_A" + } + }, + "router_b.geant.net": { + "ae_a": { + "name": "ae_a", + "index": 2, + "community": "COMMUNITY_A" + } + }, + "lab_router_a.geant.net": { + "ae_c": { + "name": "ae_c", + "index": 3, + "community": "COMMUNITY_A" + } + }, + } + services = { + "ROUTER_A": { + "AE_A.123": [{ + "id": 321, + "name": "SERVICE A", + "type": "SERVICE TYPE", + "status": "operational" + }], + "AE_A.456": [{ + "id": 654, + "name": "SERVICE B", + "type": "SERVICE TYPE", + "status": "operational" + }] + } + } - args, kwargs = mocked_run.call_args - called_with = args[0] + no_lab_res = [ + { + "router": "router_a.geant.net", + "name": "interface_a", + "bundle": ["ae_a"], + "bundle-parents": [], + "description": "DESCRIPTION A", + "circuits": [], + "snmp-index": 12, + "dashboards": [] + }, + { + "router": "router_a.geant.net", + "name": "ae_a", + "bundle": [], + "bundle-parents": ["interface_a"], + "description": "DESCRIPTION B", + "circuits": [], + "snmp-index": 1, + "dashboards": [] + }, + { + "router": "router_a.geant.net", + "name": "ae_a.123", + "bundle": [], + "bundle-parents": ["interface_a"], + "description": "DESCRIPTION C", + "circuits": [{ + "id": 321, + "name": "SERVICE A", + "type": "SERVICE TYPE", + "status": "operational" + }], + "snmp-index": 1231, + "dashboards": [] + }, + ] + lab_res = [ + { + "router": "lab_router_a.geant.net", + "name": "ae_c", + "bundle": [], + "bundle-parents": ["lab_interface_a"], + "description": "DESCRIPTION D", + "circuits": [], + "snmp-index": 3, + "dashboards": [] + }, + ] - t = r'^rsync -aPq --no-perms --rsh="ssh -l {user} -p 22 -i {key_file} -o \'UserKnownHostsFile {known_hosts}\'" /\S+/\* {destination}$' # noqa + for k in r.keys("lab:netconf-interfaces-hosts:*"): + r.delete(k) + r.set("lab:netconf-interfaces-hosts:lab_router_a.geant.net", "dummy") + r.set("lab:netconf-interfaces-hosts:lab_router_b.geant.net", "dummy") - p = t.format( - user=otrs_config['username'], - key_file=otrs_config['private-key'], - destination=otrs_config['destination'], - known_hosts=otrs_config['known-hosts'] + mocker.patch('inventory_provider.tasks.worker._load_interfaces', + side_effect=[all_interfaces, ]) + mocker.patch('inventory_provider.tasks.worker._load_interface_bundles', + return_value=bundles) + mocker.patch('inventory_provider.tasks.worker.load_snmp_indexes', + return_value=snmp_indexes) + mocker.patch('inventory_provider.tasks.worker._load_services', + return_value=services) + mocker.patch( + 'inventory_provider.tasks.worker.InventoryTask.config' ) - assert bool(re.match(p, called_with)) + mocker.patch('inventory_provider.tasks.worker.get_next_redis', + return_value=r) + + populate_poller_interfaces_cache() + assert r.exists("classifier-cache:poller-interfaces:no-lab") + assert r.exists("classifier-cache:poller-interfaces:all") + no_lab = r.get("classifier-cache:poller-interfaces:no-lab").decode("utf-8") + all = r.get("classifier-cache:poller-interfaces:all").decode("utf-8") + assert json.loads(no_lab) == no_lab_res + all_res = no_lab_res + lab_res + assert json.loads(all) == all_res diff --git a/tox.ini b/tox.ini index 8d569cbc25d09e7397d4dc13d97de759cdcbf228..7d0d25df10b59e54c9a10b05c6cdee9fa24920da 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,9 @@ [tox] envlist = py36 +[flake8] +exclude = venv,.tox + [testenv] passenv = TEST_OPSDB_HOSTNAME TEST_OPSDB_DBNAME TEST_OPSDB_USERNAME TEST_OPSDB_PASSWORD deps =