import concurrent.futures import functools import json import logging import os import re from typing import List import ncclient.transport.errors from celery import Task, states, chord from celery.result import AsyncResult from collections import defaultdict from kombu.exceptions import KombuError from lxml import etree from inventory_provider.db import ims_data from inventory_provider.db.ims import IMS from inventory_provider.routes.poller import load_interfaces_to_poll from inventory_provider.tasks.app import app from inventory_provider.tasks.common \ import get_next_redis, get_current_redis, \ latch_db, get_latch, set_latch, update_latch_status, \ ims_sorted_service_type_key, set_single_latch from inventory_provider.tasks import monitor from inventory_provider import config from inventory_provider import environment from inventory_provider import snmp from inventory_provider import juniper from redis import RedisError FINALIZER_POLLING_FREQUENCY_S = 2.5 FINALIZER_TIMEOUT_S = 300 # TODO: error callback (cf. http://docs.celeryproject.org/en/latest/userguide/calling.html#linking-callbacks-errbacks) # noqa: E501 environment.setup_logging() logger = logging.getLogger(__name__) log_task_entry_and_exit = functools.partial( environment.log_entry_and_exit, logger=logger) class InventoryTaskError(Exception): pass class InventoryTask(Task): config = None def __init__(self): self.args = [] if InventoryTask.config: return assert os.path.isfile( os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']), ( 'config file %r not found' % os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']) with open(os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']) as f: logging.info( "Initializing worker with config from: %r" % os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']) InventoryTask.config = config.load(f) logging.debug("loaded config: %r" % InventoryTask.config) def log_info(self, message): logger.debug(message) self.send_event('task-info', message=message) def log_warning(self, message): logger.warning(message) self.send_event('task-warning', message=message) def log_error(self, message): logger.error(message) self.send_event('task-error', message=message) def _unmanaged_interfaces(): def _convert(d): # the config file keys are more readable than # the keys used in redis return { 'name': d['address'], 'interface address': d['network'], 'interface name': d['interface'].lower(), 'router': d['router'].lower() } yield from map( _convert, InventoryTask.config.get('unmanaged-interfaces', [])) @log_task_entry_and_exit def refresh_juniper_bgp_peers(hostname, netconf): host_peerings = list(juniper.all_bgp_peers(netconf)) r = get_next_redis(InventoryTask.config) r.set(f'juniper-peerings:hosts:{hostname}', json.dumps(host_peerings)) @log_task_entry_and_exit def refresh_juniper_interface_list(hostname, netconf, interface_info, lab=False): """ load all interfaces from the netconf doc save under 'lab:...' if lab is true :param hostname: :param netconf: :param interface_info: :param lab: :return: """ logger.debug( 'removing cached netconf-interfaces for %r' % hostname) r = get_next_redis(InventoryTask.config) interfaces_keybase = f'netconf-interfaces:{hostname}' bundles_keybase = f'netconf-interface-bundles:{hostname}' interfaces_all_key = f'netconf-interfaces-hosts:{hostname}' if lab: interfaces_keybase = f'lab:{interfaces_keybase}' interfaces_all_key = f'lab:{interfaces_all_key}' bundles_keybase = f'lab:{bundles_keybase}' rp = r.pipeline() rp.delete(interfaces_all_key) # scan with bigger batches, to mitigate network latency effects for k in r.scan_iter(f'{interfaces_keybase}:*', count=1000): rp.delete(k) for k in r.scan_iter( f'{bundles_keybase}:*', count=1000): rp.delete(k) rp.execute() all_bundles = defaultdict(list) rp = r.pipeline() rp.set( interfaces_all_key, json.dumps(list(juniper.interface_addresses(netconf)))) interface_speeds = {} if interface_info: interface_speeds = juniper.get_interface_speeds(interface_info) for ifc in juniper.list_interfaces(netconf): ifc['speed'] = interface_speeds.get(ifc['name'], '') bundles = ifc.get('bundle', None) for bundle in bundles: if bundle: all_bundles[bundle].append(ifc['name']) rp.set( f'{interfaces_keybase}:{ifc["name"]}', json.dumps(ifc)) for k, v in all_bundles.items(): rp.set( f'{bundles_keybase}:{k}', json.dumps(v)) rp.execute() def _erase_next_db(config): """ flush next db, but first save latch and then restore afterwards TODO: handle the no latch scenario nicely :param config: :return: """ r = get_next_redis(config) saved_latch = get_latch(r) if saved_latch: # execute as transaction to ensure that latch is always available in # db that is being flushed rp = r.pipeline() rp.multi() rp.flushdb() set_single_latch( rp, saved_latch['this'], saved_latch['current'], saved_latch['next'], saved_latch.get('timestamp', 0), saved_latch.get('update-started', 0) ) rp.execute() # ensure latch is consistent in all dbs set_latch( config, new_current=saved_latch['current'], new_next=saved_latch['next'], timestamp=saved_latch.get('timestamp', 0), update_timestamp=saved_latch.get('update-started', 0)) def populate_poller_cache(interface_services, r): host_services = defaultdict(dict) for v in interface_services.values(): # logger.debug(v) if v: h = v[0]['equipment'] i = v[0]['port'] host_services[h][i] = [{ 'id': s['id'], 'name': s['name'], 'type': s['service_type'], 'status': s['status'] } for s in v] # todo - delete from redis rp = r.pipeline() for k in r.scan_iter('poller_cache:*', count=1000): rp.delete(k) rp.execute() rp = r.pipeline() for host, interface_services in host_services.items(): rp.set(f'poller_cache:{host}', json.dumps(interface_services)) rp.execute() def _build_subnet_db(update_callback=lambda s: None): r = get_next_redis(InventoryTask.config) update_callback('loading all network addresses') subnets = {} # scan with bigger batches, to mitigate network latency effects for k in r.scan_iter('netconf-interfaces-hosts:*', count=1000): k = k.decode('utf-8') hostname = k[len('netconf-interfaces-hosts:'):] host_interfaces = r.get(k).decode('utf-8') host_interfaces = json.loads(host_interfaces) for ifc in host_interfaces: ifc['router'] = hostname entry = subnets.setdefault(ifc['interface address'], []) entry.append(ifc) for ifc in _unmanaged_interfaces(): entry = subnets.setdefault(ifc['interface address'], []) entry.append(ifc) update_callback('saving {} subnets'.format(len(subnets))) rp = r.pipeline() for k, v in subnets.items(): rp.set(f'subnets:{k}', json.dumps(v)) rp.execute() def _build_juniper_peering_db(update_callback=lambda s: None): def _is_ix(peering_info): if peering_info.get('instance', '') != 'IAS': return False if not peering_info.get('group', '').startswith('GEANT-IX'): return False expected_keys = ('description', 'local-asn', 'remote-asn') if any(peering_info.get(x, None) is None for x in expected_keys): logger.error('internal data error, looks like ix peering but' f'some expected keys are missing: {peering_info}') return False return True r = get_next_redis(InventoryTask.config) update_callback('loading all juniper network peerings') peerings_per_address = {} ix_peerings = [] peerings_per_asn = {} peerings_per_logical_system = {} peerings_per_group = {} peerings_per_routing_instance = {} all_peerings = [] # scan with bigger batches, to mitigate network latency effects key_prefix = 'juniper-peerings:hosts:' for k in r.scan_iter(f'{key_prefix}*', count=1000): key_name = k.decode('utf-8') hostname = key_name[len(key_prefix):] host_peerings = r.get(key_name).decode('utf-8') host_peerings = json.loads(host_peerings) for p in host_peerings: p['hostname'] = hostname peerings_per_address.setdefault(p['address'], []).append(p) if _is_ix(p): ix_peerings.append(p) asn = p.get('remote-asn', None) if asn: peerings_per_asn.setdefault(asn, []).append(p) logical_system = p.get('logical-system', None) if logical_system: peerings_per_logical_system.setdefault( logical_system, []).append(p) group = p.get('group', None) if group: peerings_per_group.setdefault(group, []).append(p) routing_instance = p.get('instance', None) if routing_instance: peerings_per_routing_instance.setdefault( routing_instance, []).append(p) all_peerings.append(p) # sort ix peerings by group ix_groups = {} for p in ix_peerings: description = p['description'] keyword = description.split(' ')[0] # regex needed??? (e.g. tabs???) ix_groups.setdefault(keyword, set()).add(p['address']) rp = r.pipeline() # for use with /msr/bgp rp.set('juniper-peerings:all', json.dumps(all_peerings)) # create peering entries, keyed by remote addresses update_callback(f'saving {len(peerings_per_address)} remote peers') for k, v in peerings_per_address.items(): rp.set(f'juniper-peerings:remote:{k}', json.dumps(v)) # create pivoted ix group name lists update_callback(f'saving {len(ix_groups)} remote ix peering groups') for k, v in ix_groups.items(): group_addresses = list(v) rp.set(f'juniper-peerings:ix-groups:{k}', json.dumps(group_addresses)) # create pivoted asn peering lists update_callback(f'saving {len(peerings_per_asn)} asn peering lists') for k, v in peerings_per_asn.items(): rp.set(f'juniper-peerings:peer-asn:{k}', json.dumps(v)) # create pivoted logical-systems peering lists update_callback( f'saving {len(peerings_per_logical_system)}' ' logical-system peering lists') for k, v in peerings_per_logical_system.items(): rp.set(f'juniper-peerings:logical-system:{k}', json.dumps(v)) # create pivoted group peering lists update_callback( f'saving {len(peerings_per_group)} group peering lists') for k, v in peerings_per_group.items(): rp.set(f'juniper-peerings:group:{k}', json.dumps(v)) # create pivoted routing instance peering lists update_callback( f'saving {len(peerings_per_routing_instance)} group peering lists') for k, v in peerings_per_routing_instance.items(): rp.set(f'juniper-peerings:routing-instance:{k}', json.dumps(v)) rp.execute() def _build_snmp_peering_db(update_callback=lambda s: None): r = get_next_redis(InventoryTask.config) update_callback('loading all snmp network peerings') peerings = {} # scan with bigger batches, to mitigate network latency effects key_prefix = 'snmp-peerings:hosts:' for k in r.scan_iter(f'{key_prefix}*', count=1000): key_name = k.decode('utf-8') hostname = key_name[len(key_prefix):] host_peerings = r.get(key_name).decode('utf-8') host_peerings = json.loads(host_peerings) for p in host_peerings: p['hostname'] = hostname peerings.setdefault(p['remote'], []).append(p) update_callback(f'saving {len(peerings)} remote peers') rp = r.pipeline() for k, v in peerings.items(): rp.set(f'snmp-peerings:remote:{k}', json.dumps(v)) rp.execute() def check_task_status(task_id, parent=None, forget=False): r = AsyncResult(task_id, app=app) assert r.id == task_id # sanity result = { 'id': task_id, 'status': r.status, 'exception': r.status in states.EXCEPTION_STATES, 'ready': r.status in states.READY_STATES, 'success': r.status == states.SUCCESS, 'parent': parent } # TODO: only discovered this case by testing, is this the only one? # ... otherwise need to pre-test json serialization if isinstance(r.result, Exception): result['result'] = { 'error type': type(r.result).__name__, 'message': str(r.result) } else: result['result'] = r.result def child_taskids(children): # reverse-engineered, can't find documentation on this for child in children: if not child: continue if isinstance(child, list): logger.debug(f'list: {child}') yield from child_taskids(child) continue if isinstance(child, str): yield child continue assert isinstance(child, AsyncResult) yield child.id for child_id in child_taskids(getattr(r, 'children', []) or []): yield from check_task_status(child_id, parent=task_id) if forget and result['ready']: r.forget() yield result @app.task(base=InventoryTask, bind=True, name='update_entry_point') @log_task_entry_and_exit def update_entry_point(self): try: _erase_next_db(InventoryTask.config) update_latch_status(InventoryTask.config, pending=True) monitor.clear_joblog(get_current_redis(InventoryTask.config)) self.log_info("Starting update") routers = retrieve_and_persist_neteng_managed_device_list( info_callback=self.log_info, warning_callback=self.log_warning ) lab_routers = InventoryTask.config.get('lab-routers', []) chord( ( ims_task.s().on_error(task_error_handler.s()), chord( (reload_router_config_chorded.s(r) for r in routers), empty_task.si('router tasks complete') ), chord( (reload_lab_router_config_chorded.s(r) for r in lab_routers), empty_task.si('lab router tasks complete') ) ), final_task.si().on_error(task_error_handler.s()) )() return self.request.id except (RedisError, KombuError): update_latch_status(InventoryTask.config, pending=False, failure=True) logger.exception('error launching refresh subtasks') raise @app.task def task_error_handler(request, exc, traceback): update_latch_status(InventoryTask.config, pending=False, failure=True) logger.warning('Task {0!r} raised error: {1!r}'.format(request.id, exc)) @app.task(base=InventoryTask, bind=True, name='empty_task') def empty_task(self, message): logger.warning(f'message from empty task: {message}') def retrieve_and_persist_neteng_managed_device_list( info_callback=lambda s: None, warning_callback=lambda s: None): netdash_equipment = None try: info_callback('querying netdash for managed routers') netdash_equipment = list(juniper.load_routers_from_netdash( InventoryTask.config['managed-routers'])) except Exception as e: warning_callback(f'Error retrieving device list: {e}') if netdash_equipment: info_callback(f'found {len(netdash_equipment)} routers') else: warning_callback('No devices retrieved, using previous list') try: current_r = get_current_redis(InventoryTask.config) netdash_equipment = current_r.get('netdash') netdash_equipment = json.loads(netdash_equipment.decode('utf-8')) if not netdash_equipment: raise InventoryTaskError( 'No equipment retrieved from previous list') except Exception as e: warning_callback(str(e)) update_latch_status( InventoryTask.config, pending=False, failure=True) raise e try: next_r = get_next_redis(InventoryTask.config) next_r.set('netdash', json.dumps(netdash_equipment)) info_callback(f'saved {len(netdash_equipment)} managed routers') except Exception as e: warning_callback(str(e)) update_latch_status(InventoryTask.config, pending=False, failure=True) raise e return netdash_equipment @app.task(base=InventoryTask, bind=True, name='reload_lab_router_config') @log_task_entry_and_exit def reload_lab_router_config_chorded(self, hostname): try: self.log_info(f'loading netconf data for lab {hostname}') # load new netconf data, in this thread netconf_str = retrieve_and_persist_netconf_config( hostname, lab=True, update_callback=self.log_warning) netconf_doc = etree.fromstring(netconf_str) interface_info_str = retrieve_and_persist_interface_info( hostname, update_callback=self.log_warning) if interface_info_str: interface_info = etree.fromstring(interface_info_str) else: interface_info = None refresh_juniper_interface_list(hostname, netconf_doc, interface_info, lab=True) # load snmp indexes community = juniper.snmp_community_string(netconf_doc) if not community: raise InventoryTaskError( f'error extracting community string for {hostname}') else: self.log_info(f'refreshing snmp interface indexes for {hostname}') logical_systems = juniper.logical_systems(netconf_doc) # load snmp data, in this thread snmp_refresh_interfaces_chorded( hostname, community, logical_systems, self.log_info) self.log_info(f'updated configuration for lab {hostname}') except Exception as e: errmsg = f'unhandled exception loading {hostname} info: {e}' logger.exception(errmsg) update_latch_status(InventoryTask.config, pending=True, failure=True) self.log_error(errmsg) # TODO: re-raise and handle in some common way for all tasks # raise @app.task(base=InventoryTask, bind=True, name='reload_router_config') @log_task_entry_and_exit def reload_router_config_chorded(self, hostname): try: self.log_info(f'loading netconf data for {hostname}') netconf_str = retrieve_and_persist_netconf_config( hostname, update_callback=self.log_warning) netconf_doc = etree.fromstring(netconf_str) interface_info_str = retrieve_and_persist_interface_info( hostname, update_callback=self.log_warning) if interface_info_str: interface_info = etree.fromstring(interface_info_str) else: interface_info = None # clear cached classifier responses for this router, and # refresh peering data logger.info(f'refreshing peers & clearing cache for {hostname}') refresh_juniper_bgp_peers(hostname, netconf_doc) refresh_juniper_interface_list(hostname, netconf_doc, interface_info) # load snmp indexes community = juniper.snmp_community_string(netconf_doc) if not community: raise InventoryTaskError( f'error extracting community string for {hostname}') else: self.log_info(f'refreshing snmp interface indexes for {hostname}') logical_systems = juniper.logical_systems(netconf_doc) # load snmp data, in this thread snmp_refresh_interfaces_chorded( hostname, community, logical_systems, self.log_info) snmp_refresh_peerings_chorded(hostname, community, logical_systems) logger.info(f'updated configuration for {hostname}') except Exception as e: errmsg = f'unhandled exception loading {hostname} info: {e}' logger.exception(errmsg) update_latch_status(InventoryTask.config, pending=True, failure=True) self.log_error(errmsg) # TODO: re-raise and handle in some common way for all tasks # raise def retrieve_and_persist_netconf_config( hostname, lab=False, update_callback=lambda s: None): redis_key = f'netconf:{hostname}' if lab: redis_key = f'lab:{redis_key}' try: netconf_doc = juniper.load_config( hostname, InventoryTask.config["ssh"]) netconf_str = etree.tostring(netconf_doc, encoding='unicode') except (ConnectionError, juniper.NetconfHandlingError, InventoryTaskError): msg = f'error loading netconf data from {hostname}' logger.exception(msg) update_callback(msg) r = get_current_redis(InventoryTask.config) netconf_str = r.get(redis_key) if not netconf_str: update_callback(f'no cached netconf for {redis_key}') raise InventoryTaskError( f'netconf error with {hostname}' f' and no cached netconf data found') logger.info(f'Returning cached netconf data for {hostname}') update_callback(f'Returning cached netconf data for {hostname}') r = get_next_redis(InventoryTask.config) r.set(redis_key, netconf_str) logger.info(f'netconf info loaded from {hostname}') return netconf_str def retrieve_and_persist_interface_info( hostname, lab=False, update_callback=lambda s: None): redis_key = f'intinfo:{hostname}' if lab: redis_key = f'lab:{redis_key}' try: interface_info_str = juniper.get_interface_info_for_router(hostname, InventoryTask.config["ssh"]) logger.info(f'interface-info rpc success from {hostname}') except (ConnectionError, juniper.TimeoutError, InventoryTaskError, ncclient.transport.errors.SSHError): msg = f'error loading interface-info data from {hostname}' logger.exception(msg) update_callback(msg) r = get_current_redis(InventoryTask.config) interface_info_str = r.get(redis_key) if interface_info_str: logger.info(f'Returning cached interface info data for {hostname}') update_callback(f'Returning cached interface info data for {hostname}') else: update_callback(f'no cached interface info for {redis_key}') logger.warning(f'interface-info could not be retrieved from {hostname}, ignoring this host') return None r = get_next_redis(InventoryTask.config) r.set(redis_key, interface_info_str) logger.info(f'interface info loaded from {hostname}') return interface_info_str @log_task_entry_and_exit def snmp_refresh_interfaces_chorded( hostname, community, logical_systems, update_callback=lambda s: None): try: interfaces = list( snmp.get_router_snmp_indexes(hostname, community, logical_systems)) except ConnectionError: msg = f'error loading snmp interface data from {hostname}' logger.exception(msg) update_callback(msg) r = get_current_redis(InventoryTask.config) interfaces = r.get(f'snmp-interfaces:{hostname}') if not interfaces: raise InventoryTaskError( f'snmp error with {hostname}' f' and no cached snmp interface data found') # unnecessary json encode/decode here ... could be optimized interfaces = json.loads(interfaces.decode('utf-8')) update_callback(f'using cached snmp interface data for {hostname}') r = get_next_redis(InventoryTask.config) rp = r.pipeline() rp.set(f'snmp-interfaces:{hostname}', json.dumps(interfaces)) # optimization for DBOARD3-372 # interfaces is a list of dicts like: {'name': str, 'index': int} for ifc in interfaces: ifc['hostname'] = hostname rp.set( f'snmp-interfaces-single:{hostname}:{ifc["name"]}', json.dumps(ifc)) rp.execute() update_callback(f'snmp interface info loaded from {hostname}') @log_task_entry_and_exit def snmp_refresh_peerings_chorded( hostname, community, logical_systems, update_callback=lambda S: None): try: peerings = list( snmp.get_peer_state_info(hostname, community, logical_systems)) except ConnectionError: msg = f'error loading snmp peering data from {hostname}' logger.exception(msg) update_callback(msg) r = get_current_redis(InventoryTask.config) peerings = r.get(f'snmp-peerings:hosts:{hostname}') if peerings is None: raise InventoryTaskError( f'snmp error with {peerings}' f' and no cached peering data found') # unnecessary json encode/decode here ... could be optimized peerings = json.loads(peerings.decode('utf-8')) update_callback(f'using cached snmp peering data for {hostname}') r = get_next_redis(InventoryTask.config) r.set(f'snmp-peerings:hosts:{hostname}', json.dumps(peerings)) update_callback(f'snmp peering info loaded from {hostname}') def cache_extracted_ims_data(extracted_data, use_current=False): if use_current: r = get_current_redis(InventoryTask.config) else: r = get_next_redis(InventoryTask.config) for k, v in extracted_data.items(): r.set(f'ims:cache:{k}', json.dumps(v)) @app.task(base=InventoryTask, bind=True, name='ims_task') @log_task_entry_and_exit def ims_task(self, use_current=False): try: extracted_data = extract_ims_data() cache_extracted_ims_data(extracted_data) transformed_data = transform_ims_data(extracted_data) persist_ims_data(transformed_data, use_current) except Exception: logger.exception('Error in IMS task:') update_latch_status(InventoryTask.config, pending=True, failure=True) def extract_ims_data(): c = InventoryTask.config["ims"] return _extract_ims_data( ims_api_url=c['api'], ims_username=c['username'], ims_password=c['password']) def _extract_ims_data(ims_api_url, ims_username, ims_password): """ convenient entry point for testing ... :param ims_api_url: :param ims_username: :param ims_password: :return: """ def _ds() -> IMS: return IMS(ims_api_url, ims_username, ims_password) _ds().clear_dynamic_context_cache() locations = {} site_locations = {} lg_routers = [] geant_nodes = [] customer_contacts = {} planned_work_contacts = {} circuit_ids_to_monitor = [] circuit_ids_and_sids = {} additional_circuit_customers = {} flexils_data = {} customers = {} hierarchy = {} port_id_details = defaultdict(list) port_id_services = defaultdict(list) @log_task_entry_and_exit def _populate_locations(): nonlocal locations locations = {k: v for k, v in ims_data.get_node_locations(ds=_ds())} @log_task_entry_and_exit def _populate_site_locations(): nonlocal site_locations site_locations = {k: v for k, v in ims_data.get_site_locations(ds=_ds())} @log_task_entry_and_exit def _populate_lg_routers(): nonlocal lg_routers lg_routers = list(ims_data.lookup_lg_routers(ds=_ds())) @log_task_entry_and_exit def _populate_geant_nodes(): nonlocal geant_nodes geant_nodes = list(ims_data.lookup_geant_nodes(ds=_ds())) @log_task_entry_and_exit def _populate_customer_contacts(): nonlocal customer_contacts customer_contacts = \ {k: v for k, v in ims_data.get_customer_tts_contacts(ds=_ds())} @log_task_entry_and_exit def _populate_customer_planned_work_contacts(): nonlocal planned_work_contacts planned_work_contacts = \ {k: v for k, v in ims_data.get_customer_planned_work_contacts(ds=_ds())} @log_task_entry_and_exit def _populate_circuit_ids_to_monitor(): nonlocal circuit_ids_to_monitor circuit_ids_to_monitor = \ list(ims_data.get_monitored_circuit_ids(ds=_ds())) @log_task_entry_and_exit def _populate_sids(): nonlocal circuit_ids_and_sids circuit_ids_and_sids = \ {cid: sid for cid, sid in ims_data.get_ids_and_sids(ds=_ds())} @log_task_entry_and_exit def _populate_additional_circuit_customers(): nonlocal additional_circuit_customers additional_circuit_customers = \ ims_data.get_circuit_related_customers(ds=_ds()) exceptions = {} with concurrent.futures.ThreadPoolExecutor() as executor: futures = { executor.submit(_populate_locations): 'locations', executor.submit(_populate_site_locations): 'site_locations', executor.submit(_populate_geant_nodes): 'geant_nodes', executor.submit(_populate_lg_routers): 'lg_routers', executor.submit(_populate_customer_contacts): 'customer_contacts', executor.submit(_populate_customer_planned_work_contacts): 'planned_work_contacts', executor.submit(_populate_circuit_ids_to_monitor): 'circuit_ids_to_monitor', executor.submit(_populate_sids): 'sids', executor.submit(_populate_additional_circuit_customers): 'additional_circuit_customers' } for future in concurrent.futures.as_completed(futures): if future.exception(): exceptions[futures[future]] = str(future.exception()) if exceptions: raise InventoryTaskError(json.dumps(exceptions, indent=2)) @log_task_entry_and_exit def _populate_customers(): nonlocal customers customers = {c['id']: c for c in _ds().get_all_entities('customer')} @log_task_entry_and_exit def _populate_flexils_data(): nonlocal flexils_data flexils_data = ims_data.get_flexils_by_circuitid(ds=_ds()) @log_task_entry_and_exit def _populate_hierarchy(): nonlocal hierarchy hierarchy = { d['id']: d for d in ims_data.get_circuit_hierarchy(ds=_ds())} logger.debug("hierarchy complete") @log_task_entry_and_exit def _populate_port_id_details(): nonlocal port_id_details for x in ims_data.get_port_details(ds=_ds()): pd = port_id_details[x['port_id']] pd.append(x) logger.debug("Port details complete") @log_task_entry_and_exit def _populate_circuit_info(): for x in ims_data.get_port_id_services(ds=_ds()): port_id_services[x['port_a_id']].append(x) logger.debug("port circuits complete") with concurrent.futures.ThreadPoolExecutor() as executor: futures = { executor.submit(_populate_hierarchy): 'hierarchy', executor.submit(_populate_port_id_details): 'port_id_details', executor.submit(_populate_circuit_info): 'circuit_info', executor.submit(_populate_flexils_data): 'flexils_data', executor.submit(_populate_customers): 'customers' } for future in concurrent.futures.as_completed(futures): if future.exception(): exceptions[futures[future]] = str(future.exception()) if exceptions: raise InventoryTaskError(json.dumps(exceptions, indent=2)) return { 'locations': locations, 'site_locations': site_locations, 'lg_routers': lg_routers, 'customer_contacts': customer_contacts, 'planned_work_contacts': planned_work_contacts, 'circuit_ids_to_monitor': circuit_ids_to_monitor, 'circuit_ids_sids': circuit_ids_and_sids, 'additional_circuit_customers': additional_circuit_customers, 'hierarchy': hierarchy, 'port_id_details': port_id_details, 'port_id_services': port_id_services, 'geant_nodes': geant_nodes, 'flexils_data': flexils_data, 'customers': customers } def transform_ims_data(data): locations = data['locations'] customer_contacts = data['customer_contacts'] planned_work_contacts = data['planned_work_contacts'] circuit_ids_to_monitor = data['circuit_ids_to_monitor'] additional_circuit_customers = data['additional_circuit_customers'] hierarchy = data['hierarchy'] port_id_details = data['port_id_details'] port_id_services = data['port_id_services'] circuit_ids_and_sids = data['circuit_ids_sids'] geant_nodes = data['geant_nodes'] flexils_data = data['flexils_data'] customers = data['customers'] sid_services = defaultdict(list) def _get_circuit_contacts(c): customer_ids = {c['customerid']} customer_ids.update( [ac['id'] for ac in additional_circuit_customers.get(c['id'], [])] ) tts_contacts = set().union( *[customer_contacts.get(cid, []) for cid in customer_ids]) pw_contacts = set().union( *[planned_work_contacts.get(cid, []) for cid in customer_ids]) return tts_contacts, pw_contacts for d in hierarchy.values(): c, ttc = _get_circuit_contacts(d) d['contacts'] = sorted(list(c)) d['planned_work_contacts'] = sorted(list(ttc)) # add flexils data to port_id_details and port_id_services all_ils_details = flexils_data.get(d['id']) if all_ils_details: for ils_details in all_ils_details: pid = port_id_details.get(ils_details['key'], []) pid.append({ 'port_id': ils_details['key'], 'equipment_name': ils_details['node_name'], 'interface_name': ils_details['full_port_name'] }) port_id_details[ils_details['key']] = pid pis = port_id_services.get(ils_details['key'], []) pis.append({ 'id': d['id'], 'name': d['name'], 'project': d['project'], 'port_a_id': ils_details['key'], 'circuit_type': d['circuit-type'], 'status': d['status'], 'service_type': d['product'], 'customerid': d['customerid'], 'customer': customers.get(d['customerid'], ''), 'contacts': d['contacts'], 'planned_work_contacts': d['planned_work_contacts'] }) port_id_services[ils_details['key']] = pis def _convert_to_bits(value, unit): unit = unit.lower() conversions = { 'm': 1 << 20, 'mb': 1 << 20, 'g': 1 << 30, 'gbe': 1 << 30, } return int(value) * conversions[unit] def _get_speed(circuit_id): c = hierarchy.get(circuit_id) if c is None: return 0 if c['status'] != 'operational': return 0 pattern = re.compile(r'^(\d+)([a-zA-z]+)$') m = pattern.match(c['speed']) if m: try: return _convert_to_bits(m[1], m[2]) except KeyError as e: logger.debug(f'Could not find key: {e} ' f'for circuit: {circuit_id}') return 0 else: if c['circuit-type'] == 'service' \ or c['product'].lower() == 'ethernet': return sum( (_get_speed(x) for x in c['carrier-circuits']) ) else: return 0 def _get_fibre_routes(c_id): _circ = hierarchy.get(c_id, None) if _circ is None: return if _circ['speed'].lower() == 'fibre_route': yield _circ['id'] else: for cc in _circ['carrier-circuits']: yield from _get_fibre_routes(cc) def _get_related_services(circuit_id: str) -> List[dict]: rs = {} c = hierarchy.get(circuit_id, None) if c: if c['circuit-type'] == 'service': rs[c['id']] = { 'id': c['id'], 'name': c['name'], 'circuit_type': c['circuit-type'], 'service_type': c['product'], 'project': c['project'], 'contacts': c['contacts'], 'planned_work_contacts': c['planned_work_contacts'] } if c['id'] in circuit_ids_to_monitor: rs[c['id']]['status'] = c['status'] else: rs[c['id']]['status'] = 'non-monitored' if c['id'] in circuit_ids_and_sids: rs[c['id']]['sid'] = circuit_ids_and_sids[c['id']] if c['sub-circuits']: for sub in c['sub-circuits']: temp_parents = \ _get_related_services(sub) rs.update({t['id']: t for t in temp_parents}) return list(rs.values()) def _format_service(s): s['additional_customers'] = \ additional_circuit_customers.get(s['id'], []) s['original_status'] = s['status'] s['monitored'] = True if s['circuit_type'] == 'service' \ and s['id'] not in circuit_ids_to_monitor: s['monitored'] = False s['status'] = 'non-monitored' pd_a = port_id_details[s['port_a_id']][0] location_a = locations.get(pd_a['equipment_name'], None) if location_a: loc_a = location_a['pop'] else: loc_a = locations['UNKNOWN_LOC']['pop'] logger.warning( f'Unable to find location for {pd_a["equipment_name"]} - ' f'Service ID {s["id"]}') s['pop_name'] = loc_a['name'] s['pop_abbreviation'] = loc_a['abbreviation'] s['equipment'] = pd_a['equipment_name'] s['card_id'] = '' # this is redundant I believe s['port'] = pd_a['interface_name'] s['logical_unit'] = '' # this is redundant I believe if 'port_b_id' in s: pd_b = port_id_details[s['port_b_id']][0] location_b = locations.get(pd_b['equipment_name'], None) if location_b: loc_b = location_b['pop'] else: loc_b = locations['UNKNOWN_LOC']['pop'] logger.warning( f'Unable to find location for {pd_b["equipment_name"]} - ' f'Service ID {s["id"]}') s['other_end_pop_name'] = loc_b['name'] s['other_end_pop_abbreviation'] = loc_b['abbreviation'] s['other_end_equipment'] = pd_b['equipment_name'] s['other_end_port'] = pd_b['interface_name'] else: s['other_end_pop_name'] = '' s['other_end_pop_abbreviation'] = '' s['other_end_equipment'] = '' s['other_end_port'] = '' s.pop('port_a_id', None) s.pop('port_b_id', None) services_by_type = {} interface_services = defaultdict(list) # using a dict to ensure no duplicates node_pair_services = defaultdict(dict) pop_nodes = defaultdict(list) for value in locations.values(): pop_nodes[value['pop']['name']].append(value['equipment-name']) for key, value in port_id_details.items(): for details in value: k = f"{details['equipment_name']}:" \ f"{details['interface_name']}" circuits = port_id_services.get(details['port_id'], []) for circ in circuits: # contacts, pw_contacts = _get_circuit_contacts(circ) # we only want to include the Related Services contacts contacts = set() pw_contacts = set() circ['fibre-routes'] = [] for x in set(_get_fibre_routes(circ['id'])): c = { 'id': hierarchy[x]['id'], 'name': hierarchy[x]['name'], 'status': hierarchy[x]['status'] } circ['fibre-routes'].append(c) circ['related-services'] = \ _get_related_services(circ['id']) for tlc in circ['related-services']: # why were these removed? # contacts.update(tlc.pop('contacts')) if circ['status'] == 'operational' \ and circ['id'] in circuit_ids_to_monitor \ and tlc['status'] == 'operational' \ and tlc['id'] in circuit_ids_to_monitor: contacts.update(tlc.get('contacts')) pw_contacts.update( tlc.get('planned_work_contacts', [])) circ['contacts'] = sorted(list(contacts)) circ['planned_work_contacts'] = sorted(list(pw_contacts)) circ['calculated-speed'] = _get_speed(circ['id']) _format_service(circ) type_services = services_by_type.setdefault( ims_sorted_service_type_key(circ['service_type']), dict()) type_services[circ['id']] = circ if circ['other_end_equipment']: node_pair_services[ f"{circ['equipment']}/{circ['other_end_equipment']}" ][circ['id']] = circ try: # get the physical port circuit, if it exists # https://jira.software.geant.org/browse/POL1-687 port_circuit = next( c for c in circuits if c.get('port_type') == 'ports') except StopIteration: port_circuit = None sid = None if circ['id'] in circuit_ids_and_sids: sid = circuit_ids_and_sids[circ['id']] elif 'sid' in details: if len(circuits) > 1: if port_circuit != circ: # if this is not the physical port circuit # related to this port, then we don't want to # assign the SID to this circuit, so skip. continue # assign the SID from the port to this circuit sid = details['sid'] if sid is None: continue circ['sid'] = sid sid_info = { 'circuit_id': circ['id'], 'sid': sid, 'status': circ['original_status'], 'monitored': circ['monitored'], 'name': circ['name'], 'speed': circ['calculated-speed'], 'service_type': circ['service_type'], 'project': circ['project'], 'customer': circ['customer'], 'equipment': circ['equipment'], 'port': circ['port'], 'geant_equipment': circ['equipment'] in geant_nodes } if sid_info not in sid_services[sid]: sid_services[sid].append(sid_info) interface_services[k].extend(circuits) return { 'hierarchy': hierarchy, 'interface_services': interface_services, 'services_by_type': services_by_type, 'node_pair_services': node_pair_services, 'sid_services': sid_services, 'pop_nodes': pop_nodes, 'locations': data['locations'], 'site_locations': data['site_locations'], 'lg_routers': data['lg_routers'], } def persist_ims_data(data, use_current=False): hierarchy = data['hierarchy'] locations = data['locations'] site_locations = data['site_locations'] lg_routers = data['lg_routers'] interface_services = data['interface_services'] services_by_type = data['services_by_type'] node_pair_services = data['node_pair_services'] sid_services = data['sid_services'] pop_nodes = data['pop_nodes'] def _get_sites(): # de-dupe the sites (by abbreviation) sites = { site_location['abbreviation']: site_location for site_location in site_locations.values() } return sites.values() if use_current: r = get_current_redis(InventoryTask.config) r.delete('ims:sid_services') # only need to delete the individual keys if it's just an IMS update # rather than a complete update (the db will have been flushed) for key_pattern in [ 'ims:location:*', 'ims:lg:*', 'ims:circuit_hierarchy:*', 'ims:interface_services:*', 'ims:access_services:*', 'ims:gws_indirect:*', 'ims:node_pair_services:*', 'ims:pop_nodes:*' ]: rp = r.pipeline() for k in r.scan_iter(key_pattern, count=1000): rp.delete(k) else: r = get_next_redis(InventoryTask.config) r.set('ims:sid_services', json.dumps(sid_services)) rp = r.pipeline() for h, d in locations.items(): rp.set(f'ims:location:{h}', json.dumps([d])) for site in _get_sites(): rp.set(f'ims:site:{site["abbreviation"]}', json.dumps(site)) rp.execute() rp = r.pipeline() for router in lg_routers: rp.set(f'ims:lg:{router["equipment name"]}', json.dumps(router)) rp.execute() rp = r.pipeline() for circ in hierarchy.values(): rp.set(f'ims:circuit_hierarchy:{circ["id"]}', json.dumps([circ])) rp.execute() rp = r.pipeline() for k, v in interface_services.items(): rp.set( f'ims:interface_services:{k}', json.dumps(v)) rp.execute() rp = r.pipeline() for k, v in node_pair_services.items(): rp.set( f'ims:node_pair_services:{k}', json.dumps(list(v.values()))) rp.execute() rp = r.pipeline() for k, v in pop_nodes.items(): rp.set( f'ims:pop_nodes:{k}', json.dumps(sorted(v))) rp.execute() rp = r.pipeline() populate_poller_cache(interface_services, r) populate_mic_cache(interface_services, r) for service_type, services in services_by_type.items(): for v in services.values(): rp.set( f'ims:services:{service_type}:{v["name"]}', json.dumps({ 'id': v['id'], 'name': v['name'], 'project': v['project'], 'here': { 'pop': { 'name': v['pop_name'], 'abbreviation': v['pop_abbreviation'] }, 'equipment': v['equipment'], 'port': v['port'], }, 'there': { 'pop': { 'name': v['other_end_pop_name'], 'abbreviation': v['other_end_pop_abbreviation'] }, 'equipment': v['other_end_equipment'], 'port': v['other_end_port'], }, 'speed_value': v['calculated-speed'], 'speed_unit': 'n/a', 'status': v['status'], 'type': v['service_type'] })) rp.execute() def populate_mic_cache(interface_services, r): cache_key = "mic:impact:all-data" all_data = defaultdict(lambda: defaultdict(dict)) def _get_formatted_rs(_d): for rs in _d['related-services']: if rs['status'] == 'operational': yield { 'id': rs['id'], 'sid': rs.get('sid', ''), 'status': rs['status'], 'name': rs['name'], 'service_type': rs['service_type'], 'contacts': rs['contacts'], 'planned_work_contacts': rs['planned_work_contacts'] } for services in interface_services.values(): if services: current_interface_services = [] for d in services: if d.get('related-services'): current_interface_services.extend( list(_get_formatted_rs(d))) if current_interface_services: site = f'{services[0]["pop_name"]} ' \ f'({services[0]["pop_abbreviation"]})' eq_name = services[0]['equipment'] if_name = services[0]['port'] all_data[site][eq_name][if_name] = current_interface_services result = json.dumps(all_data) r.set(cache_key, result.encode('utf-8')) @app.task(base=InventoryTask, bind=True, name='final_task') @log_task_entry_and_exit def final_task(self): r = get_current_redis(InventoryTask.config) latch = get_latch(r) if latch['failure']: raise InventoryTaskError('Sub-task failed - check logs for details') _build_subnet_db(update_callback=self.log_info) _build_snmp_peering_db(update_callback=self.log_info) _build_juniper_peering_db(update_callback=self.log_info) populate_poller_interfaces_cache(warning_callback=self.log_warning) collate_netconf_interfaces_all_cache(warning_callback=self.log_warning) latch_db(InventoryTask.config) self.log_info('latched current/next dbs') @log_task_entry_and_exit def populate_poller_interfaces_cache(warning_callback=lambda s: None): no_lab_cache_key = 'classifier-cache:poller-interfaces:all:no_lab' all_cache_key = 'classifier-cache:poller-interfaces:all' non_lab_populated_interfaces = None all_populated_interfaces = None r = get_next_redis(InventoryTask.config) try: lab_keys_pattern = 'lab:netconf-interfaces-hosts:*' lab_equipment = [h.decode('utf-8')[len(lab_keys_pattern) - 1:] for h in r.keys(lab_keys_pattern)] all_populated_interfaces = list( load_interfaces_to_poll(InventoryTask.config, use_next_redis=True)) non_lab_populated_interfaces = [x for x in all_populated_interfaces if x['router'] not in lab_equipment] except Exception as e: warning_callback(f"Failed to retrieve all required data {e}") logger.exception( "Failed to retrieve all required data, logging exception") if not non_lab_populated_interfaces or not all_populated_interfaces: previous_r = get_current_redis(InventoryTask.config) def _load_previous(key): try: warning_callback(f"populating {key} " "from previously cached data") return json.loads(previous_r.get(key)) except Exception as e: warning_callback(f"Failed to load {key} " f"from previously cached data: {e}") if not non_lab_populated_interfaces: non_lab_populated_interfaces = _load_previous(no_lab_cache_key) if not all_populated_interfaces: all_populated_interfaces = _load_previous(all_cache_key) r.set(no_lab_cache_key, json.dumps(non_lab_populated_interfaces)) r.set(all_cache_key, json.dumps(all_populated_interfaces)) @log_task_entry_and_exit def collate_netconf_interfaces_all_cache(warning_callback=lambda s: None): """ Fetch all existing netconf-interface redis entries and assemble them into collated documents. Used for fetching speed data more efficiently in /poller/speeds :param warning_callback: :return: """ def _fetch_docs_for_key_pattern(r, key_pattern): for k in r.scan_iter(key_pattern, count=1000): key = k.decode('utf-8') doc_str = r.get(key).decode('utf-8') doc = json.loads(doc_str) doc['hostname'] = key.split(':')[1] # get hostname part of key yield doc netconf_all_key = 'netconf-interfaces:all' lab_netconf_all_key = 'lab:netconf-interfaces:all' netconf_interface_docs = None lab_netconf_interface_docs = None r = get_next_redis(InventoryTask.config) try: netconf_interface_key_pattern = 'netconf-interfaces:*' lab_netconf_interface_key_pattern = 'lab:netconf-interfaces:*' netconf_interface_docs = list(_fetch_docs_for_key_pattern(r, netconf_interface_key_pattern)) lab_netconf_interface_docs = list(_fetch_docs_for_key_pattern(r, lab_netconf_interface_key_pattern)) except Exception as e: warning_callback(f"Failed to collate netconf-interfaces {e}") logger.exception( "Failed to collate netconf-interfaces, logging exception") r.set(netconf_all_key, json.dumps(netconf_interface_docs)) r.set(lab_netconf_all_key, json.dumps(lab_netconf_interface_docs))