""" This module is responsible for the entire provisioning lifecycle. """ import itertools import os import logging import time import json import datetime from concurrent.futures import Future from concurrent.futures import ThreadPoolExecutor from brian_dashboard_manager.config import DEFAULT_ORGANIZATIONS, STATE_PATH from brian_dashboard_manager.grafana.utils.request import AdminRequest, \ TokenRequest from brian_dashboard_manager.grafana.organization import \ get_organizations, create_organization, create_api_token, \ delete_api_token, delete_expired_api_tokens, set_home_dashboard from brian_dashboard_manager.grafana.dashboard import find_dashboard, \ get_dashboard_definitions, create_dashboard, delete_dashboard from brian_dashboard_manager.grafana.datasource import \ check_provisioned, create_datasource from brian_dashboard_manager.grafana.folder import find_folder, \ delete_folder, get_folders from brian_dashboard_manager.inventory_provider.interfaces import \ get_gws_direct, get_gws_indirect, get_interfaces, \ get_eumetsat_multicast_subscriptions from brian_dashboard_manager.templating.helpers import \ get_aggregate_dashboard_data, get_interface_data, \ get_nren_interface_data, get_dashboard_data, \ get_nren_dashboard_data, get_aggregate_interface_data from brian_dashboard_manager.templating.gws import generate_gws, \ generate_indirect from brian_dashboard_manager.templating.eumetsat \ import generate_eumetsat_multicast from brian_dashboard_manager.templating.render import render_dashboard logger = logging.getLogger(__name__) MAX_WORKERS = 4 DASHBOARDS = { 'NREN': { 'tag': ['customers'], 'folder_name': 'NREN Access', 'interfaces': [] }, 'CLS': { 'tag': 'CLS', 'folder_name': 'CLS', 'interfaces': [] }, 'RE_PEER': { 'tag': 'RE_PEER', 'folder_name': 'RE Peer', 'interfaces': [] }, 'RE_CUST': { 'tag': 'RE_CUST', 'folder_name': 'RE Customer', 'interfaces': [] }, 'GEANTOPEN': { 'tag': 'GEANTOPEN', 'folder_name': 'GEANTOPEN', 'interfaces': [] }, 'GCS': { 'tag': 'AUTOMATED_L2_CIRCUITS', 'folder_name': 'GCS', 'interfaces': [] }, 'L2_CIRCUIT': { 'tag': 'L2_CIRCUITS', 'folder_name': 'L2 Circuit', 'interfaces': [] }, 'LHCONE_PEER': { 'tag': 'LHCONE_PEER', 'folder_name': 'LHCONE Peer', 'interfaces': [] }, 'LHCONE_CUST': { 'tag': 'LHCONE_CUST', 'folder_name': 'LHCONE Customer', 'interfaces': [] }, 'MDVPN_CUSTOMERS': { 'tag': 'MDVPN', 'folder_name': 'MDVPN Customers', 'interfaces': [] }, 'INFRASTRUCTURE_BACKBONE': { 'tag': 'BACKBONE', 'errors': True, 'folder_name': 'Infrastructure Backbone', 'interfaces': [] }, 'IAS_PRIVATE': { 'tag': 'IAS_PRIVATE', 'folder_name': 'IAS Private', 'interfaces': [] }, 'IAS_PUBLIC': { 'tag': 'IAS_PUBLIC', 'folder_name': 'IAS Public', 'interfaces': [] }, 'IAS_CUSTOMER': { 'tag': 'IAS_CUSTOMER', 'folder_name': 'IAS Customer', 'interfaces': [] }, 'IAS_UPSTREAM': { 'tag': ['IAS_UPSTREAM', 'UPSTREAM'], 'folder_name': 'IAS Upstream', 'interfaces': [] }, 'GWS_PHY_UPSTREAM': { 'tag': ['GWS_UPSTREAM', 'UPSTREAM'], 'errors': True, 'folder_name': 'GWS PHY Upstream', 'interfaces': [] } } AGG_DASHBOARDS = { 'CLS_PEERS': { 'tag': 'cls_peers', 'dashboard_name': 'CLS Peers', 'interfaces': [] }, 'IAS_PEERS': { 'tag': 'ias_peers', 'dashboard_name': 'IAS Peers', 'interfaces': [] }, 'IAS_UPSTREAM': { 'tag': 'gws_upstreams', 'dashboard_name': 'GWS Upstreams', 'interfaces': [] }, 'LHCONE': { 'tag': 'lhcone', 'dashboard_name': 'LHCONE', 'interfaces': [] }, 'CAE1': { 'tag': 'cae', 'dashboard_name': 'CAE1', 'interfaces': [] }, 'COPERNICUS': { 'tag': 'copernicus', 'dashboard_name': 'COPERNICUS', 'interfaces': [] } } def provision_folder(token_request, folder_name, dash, ds_name, excluded_dashboards): """ Function to provision dashboards within a folder. """ def _check_valid(interface): return interface['dashboard_info']['name'] != '' folder = find_folder(token_request, folder_name) tag = dash['tag'] interfaces = list(filter(_check_valid, dash['interfaces'])) # dashboard should include error panels errors = dash.get('errors', False) is_nren = folder_name == 'NREN Access' if is_nren: data = get_nren_interface_data(interfaces) dash_data = get_nren_dashboard_data(data, ds_name, tag) else: data = get_interface_data(interfaces) dash_data = get_dashboard_data( data=data, datasource=ds_name, tag=tag, errors=errors) if not isinstance(excluded_dashboards, list): excluded_dashboards = [] else: excluded_dashboards = [s.lower() for s in excluded_dashboards] provisioned = [] with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: for dashboard in dash_data: rendered = render_dashboard(dashboard, nren=is_nren) if rendered.get('title').lower() in excluded_dashboards: executor.submit(delete_dashboard, token_request, rendered, folder['id']) continue provisioned.append(executor.submit(create_dashboard, token_request, rendered, folder['id'])) return [r.result() for r in provisioned] def provision_aggregate(token_request, folder, dash, ds_name): name = dash['dashboard_name'] tag = dash['tag'] interfaces = dash['interfaces'] data = get_aggregate_interface_data(interfaces, name) dashboard = get_aggregate_dashboard_data( f'Aggregate - {name}', data, ds_name, tag) rendered = render_dashboard(dashboard) return create_dashboard(token_request, rendered, folder['id']) def provision_maybe(config): with open(STATE_PATH, 'r+') as f: def write_timestamp(timestamp, provisioning): f.seek(0) f.write(json.dumps( {'timestamp': timestamp, 'provisioning': provisioning})) f.truncate() try: # don't conditionally provision in dev val = os.environ.get('FLASK_ENV') != 'development' now = datetime.datetime.now() write_timestamp(now.timestamp(), val) provision(config) except Exception as e: logger.exception('Uncaught Exception:') raise e finally: now = datetime.datetime.now() write_timestamp(now.timestamp(), False) def is_excluded_folder(org_config, folder_name): excluded_folders = org_config.get('excluded_folders', {}) excluded = excluded_folders.get(folder_name, False) # boolean True means entire folder excluded # if list, it is specific dashboard names not to provision # so is handled at provision time. return isinstance(excluded, bool) and excluded def excluded_folder_dashboards(org_config, folder_name): excluded_folders = org_config.get('excluded_folders', {}) excluded = excluded_folders.get(folder_name, []) return excluded if isinstance(excluded, list) else [] def _provision_interfaces(config, org_config, ds_name, token): """ Provision dashboards, overwriting existing ones. :param config: :param org_config: :param ds_name: :param token: :return: yields dashboards that were created """ interfaces = get_interfaces(config['inventory_provider']) excluded_nrens = org_config['excluded_nrens'] def excluded(interface): desc = interface['description'].lower() lab = 'lab.office' in interface['router'].lower() to_exclude = any(nren.lower() in desc for nren in excluded_nrens) return not (to_exclude or lab) relevant_interfaces = list(filter(excluded, interfaces)) # loop over interfaces and add them to the dashboard_name # -> folder mapping structure `dashboards` above, for convenience. for dash in DASHBOARDS: DASHBOARDS[dash]['interfaces'] = [] for dash in AGG_DASHBOARDS: AGG_DASHBOARDS[dash]['interfaces'] = [] for iface in relevant_interfaces: for dash_name in iface['dashboards']: # add interface to matched dashboard if dash_name in DASHBOARDS: ifaces = DASHBOARDS[dash_name]['interfaces'] ifaces.append(iface) # add to matched aggregate dashboard if dash_name in AGG_DASHBOARDS: ifaces = AGG_DASHBOARDS[dash_name]['interfaces'] ifaces.append(iface) # provision dashboards and their folders with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: provisioned = [] for folder in DASHBOARDS.values(): folder_name = folder['folder_name'] # boolean True means entire folder excluded # if list, it is specific dashboard names not to provision # so is handled at provision time. if is_excluded_folder(org_config, folder_name): executor.submit( delete_folder, token, title=folder_name) continue logger.info( f'Provisioning {org_config["name"]}/{folder_name} dashboards') res = executor.submit( provision_folder, token, folder_name, folder, ds_name, excluded_folder_dashboards(org_config, folder_name)) provisioned.append(res) for result in provisioned: folder = result.result() if folder is None: continue yield from folder def _provision_gws_indirect(config, org_config, ds_name, token): # fetch GWS direct data and provision related dashboards logger.info('Provisioning GWS Indirect dashboards') folder_name = 'GWS Indirect' if is_excluded_folder(org_config, folder_name): # don't provision GWS Direct folder delete_folder(token, title=folder_name) else: folder = find_folder(token, folder_name) with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: gws_indirect_data = get_gws_indirect( config['inventory_provider']) provisioned = [] dashes = generate_indirect(gws_indirect_data, ds_name) for dashboard in dashes: rendered = render_dashboard(dashboard) provisioned.append(executor.submit(create_dashboard, token, rendered, folder['id'])) yield from provisioned def _provision_gws_direct(config, org_config, ds_name, token): # fetch GWS direct data and provision related dashboards logger.info('Provisioning GWS Direct dashboards') folder_name = 'GWS Direct' if is_excluded_folder(org_config, folder_name): # don't provision GWS Direct folder delete_folder(token, title=folder_name) else: folder = find_folder(token, folder_name) with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: gws_data = get_gws_direct(config['inventory_provider']) provisioned = [] for dashboard in generate_gws(gws_data, ds_name): rendered = render_dashboard(dashboard) provisioned.append(executor.submit(create_dashboard, token, rendered, folder['id'])) yield from provisioned def _provision_eumetsat_multicast(config, org_config, ds_name, token): # fetch EUMETSAT multicast provision related dashboards logger.info('Provisioning EUMETSAT Multicast dashboards') folder_name = 'EUMETSAT Multicast' if is_excluded_folder(org_config, folder_name): # don't provision EUMETSAT Multicast folder delete_folder(token, title=folder_name) else: folder = find_folder(token, folder_name) with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: subscriptions = get_eumetsat_multicast_subscriptions( config['inventory_provider']) provisioned = [] for dashboard in generate_eumetsat_multicast( subscriptions, ds_name): rendered = render_dashboard(dashboard) provisioned.append( executor.submit( create_dashboard, token, rendered, folder['id'])) yield from provisioned def _provision_aggregates(config, org_config, ds_name, token): if is_excluded_folder(org_config, 'Aggregates'): # don't provision aggregate folder delete_folder(token, title='Aggregates') else: with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: provisioned = [] agg_folder = find_folder(token, 'Aggregates') for dash in AGG_DASHBOARDS.values(): excluded_dashboards = excluded_folder_dashboards( org_config, 'Aggregates') if dash['dashboard_name'] in excluded_dashboards: dash_name = { 'title': f'Aggregate - {dash["dashboard_name"]}'} executor.submit(delete_dashboard, token, dash_name, agg_folder['id']) continue logger.info(f'Provisioning {org_config["name"]}' + f'/Aggregate {dash["dashboard_name"]} dashboards') res = executor.submit( provision_aggregate, token, agg_folder, dash, ds_name) provisioned.append(res) yield from provisioned def _provision_static_dashboards(config, org_config, ds_name, token): # Statically defined dashboards from json files excluded_dashboards = org_config.get('excluded_dashboards', []) logger.info('Provisioning static dashboards') for dashboard in get_dashboard_definitions(): if dashboard['title'] not in excluded_dashboards: res = create_dashboard(token, dashboard) if res: # yield a fake dashboard dict # ... only the 'uid' element is referenced yield {'uid': res.get('uid')} else: delete_dashboard(token, dashboard) # Home dashboard is always called "Home" # Make sure it's set for the organization logger.info('Configuring Home dashboard') set_home_dashboard(token, org_config['name'] == 'GÉANT Staff') yield {'uid': 'home'} def _get_ignored_dashboards(config, org_config, token): # get dashboard UIDs from ignored folders # and make sure we don't touch them ignored_folders = config.get('ignored_folders', []) for name in ignored_folders: logger.info( 'Ignoring dashboards under ' f'the folder {org_config["name"]}/{name}') folder = find_folder(token, name, create=False) if folder is None: continue to_ignore = find_dashboard(token, folder_id=folder['id']) if to_ignore is None: continue for dash in to_ignore: # return a hard-coded fake dashboard dict # ... only the 'uid' element is referenced yield {'uid': dash['uid']} # could just yield dash def _delete_unknown_folders(config, token): all_folders = get_folders(token) folders_to_keep = [ # General is a base folder present in Grafana 'General', # other folders, created outside of the DASHBOARDS list 'GWS Indirect', 'GWS Direct', 'Aggregates', 'EUMETSAT Multicast' ] folders_to_keep.extend([dash['folder_name'] for dash in DASHBOARDS.values()]) ignored_folders = config.get('ignored_folders', []) folders_to_keep.extend(ignored_folders) folders_to_keep = set(folders_to_keep) # de-dupe for folder in all_folders: if folder['title'] in folders_to_keep: continue delete_folder(token, uid=folder['uid']) def _provision_datasource(config, token): # Only provision influxdb datasource for now datasource = config.get('datasources').get('influxdb') # Provision missing data sources if not check_provisioned(token, datasource): ds = create_datasource(token, datasource, config.get('datasources')) if ds: logger.info( f'Provisioned datasource: {datasource["name"]}') return datasource def _provision_orgs(config): request = AdminRequest(**config) all_orgs = get_organizations(request) orgs_to_provision = config.get('organizations', DEFAULT_ORGANIZATIONS) missing = (org['name'] for org in orgs_to_provision if org['name'] not in [org['name'] for org in all_orgs]) for org_name in missing: org_data = create_organization(request, org_name) all_orgs.append(org_data) return all_orgs def provision(config): start = time.time() tokens = [] all_orgs = _provision_orgs(config) request = AdminRequest(**config) delete_expired_api_tokens(request) def _find_org_config(org): orgs_to_provision = config.get('organizations', DEFAULT_ORGANIZATIONS) try: return next( o for o in orgs_to_provision if o['name'] == org['name']) except StopIteration: logger.error( f'Org {org["name"]} does not have valid configuration.') org['info'] = 'Org exists in grafana but is not configured' return None for org in all_orgs: org_id = org['id'] logger.info( f'--- Provisioning org {org["name"]} (ID #{org_id}) ---') org_config = _find_org_config(org) if not org_config: # message logged from _find_org_config continue token = create_api_token(request, org_id) token_request = TokenRequest(token=token['key'], **config) tokens.append((org_id, token['id'])) logger.debug(tokens) all_original_dashboards = find_dashboard(token_request) or [] all_original_dashboard_uids = { d['uid'] for d in all_original_dashboards} datasource = _provision_datasource(config, token_request) ds_name = datasource.get('name', 'PollerInfluxDB') managed_dashboards = itertools.chain( _provision_interfaces( config, org_config, ds_name, token_request), _provision_gws_indirect( config, org_config, ds_name, token_request), _provision_gws_direct( config, org_config, ds_name, token_request), _provision_eumetsat_multicast( config, org_config, ds_name, token_request), _provision_aggregates( config, org_config, ds_name, token_request), _provision_static_dashboards( config, org_config, ds_name, token_request), _get_ignored_dashboards( config, org_config, token_request) ) managed_dashboard_uids = set() for dashboard in managed_dashboards: if isinstance(dashboard, Future): dashboard = dashboard.result() if dashboard is None: continue managed_dashboard_uids.add(dashboard['uid']) for uid in all_original_dashboard_uids - managed_dashboard_uids: logger.info(f'Deleting stale dashboard with UID {uid}') delete_dashboard(token_request, {'uid': uid}) _delete_unknown_folders(config, token_request) delete_api_token(request, token['id'], org_id=org_id) logger.info(f'Time to complete: {time.time() - start}') return all_orgs