""" This module is responsible for the entire provisioning lifecycle. """ import os import logging import time import json import datetime from functools import reduce from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from brian_dashboard_manager.config import DEFAULT_ORGANIZATIONS, STATE_PATH from brian_dashboard_manager.grafana.utils.request import \ AdminRequest, \ TokenRequest from brian_dashboard_manager.grafana.organization import \ get_organizations, create_organization, create_api_token, \ delete_api_token, delete_expired_api_tokens, set_home_dashboard from brian_dashboard_manager.grafana.dashboard import find_dashboard, \ get_dashboard_definitions, create_dashboard, delete_dashboard from brian_dashboard_manager.grafana.datasource import \ check_provisioned, create_datasource from brian_dashboard_manager.grafana.folder import find_folder, \ delete_folder from brian_dashboard_manager.inventory_provider.interfaces import \ get_gws_direct, get_interfaces from brian_dashboard_manager.templating.nren_access import generate_nrens from brian_dashboard_manager.templating.helpers import is_re_customer, \ is_cls_peer, is_cls, is_ias_customer, is_ias_private, is_ias_public, \ is_ias_upstream, is_ias_peer, is_lag_backbone, is_nren, is_phy_upstream, \ is_re_peer, is_gcs, is_cae1, is_geantopen, is_l2circuit, is_lhcone_peer, \ is_lhcone_customer, is_lhcone, is_mdvpn, get_aggregate_dashboard_data, \ get_interface_data, parse_backbone_name, parse_phy_upstream_name, \ get_dashboard_data, get_aggregate_interface_data from brian_dashboard_manager.templating.gws import generate_gws from brian_dashboard_manager.templating.render import render_dashboard logger = logging.getLogger(__name__) def generate_all_nrens(token_request, nrens, folder_id, datasource_name): provisioned = [] with ThreadPoolExecutor(max_workers=8) as executor: for dashboard in generate_nrens(nrens, datasource_name): res = executor.submit(create_dashboard, token_request, dashboard, folder_id) provisioned.append(res) return [r.result() for r in provisioned] def provision_folder(token_request, folder_name, dash, excluded_interfaces, datasource_name, excluded_dashboards): folder = find_folder(token_request, folder_name) predicate = dash['predicate'] tag = dash['tag'] # dashboard will include error panel errors = dash.get('errors', False) # custom parsing function for description to dashboard name parse_func = dash.get('parse_func') relevant_interfaces = filter(predicate, excluded_interfaces) data = get_interface_data(relevant_interfaces, parse_func) dash_data = get_dashboard_data(data, datasource_name, tag, errors) if not isinstance(excluded_dashboards, list): excluded_dashboards = [] else: excluded_dashboards = list( map(lambda s: s.lower(), excluded_dashboards)) provisioned = [] with ThreadPoolExecutor(max_workers=4) as executor: for dashboard in dash_data: rendered = render_dashboard(dashboard) if rendered.get('title').lower() in excluded_dashboards: executor.submit(delete_dashboard, token_request, rendered, folder['id']) continue provisioned.append(executor.submit(create_dashboard, token_request, rendered, folder['id'])) return [r.result() for r in provisioned] def provision_aggregate(token_request, agg_type, aggregate_folder, dash, excluded_interfaces, datasource_name): predicate = dash['predicate'] tag = dash['tag'] relevant_interfaces = filter(predicate, excluded_interfaces) data = get_aggregate_interface_data(relevant_interfaces, agg_type) dashboard = get_aggregate_dashboard_data( f'Aggregate - {agg_type}', data, datasource_name, tag) rendered = render_dashboard(dashboard) return create_dashboard(token_request, rendered, aggregate_folder['id']) def provision_maybe(config): with open(STATE_PATH, 'r+') as f: def write_timestamp(timestamp, provisioning): f.seek(0) f.write(json.dumps( {'timestamp': timestamp, 'provisioning': provisioning})) f.truncate() try: # don't conditionally provision in dev val = os.environ.get('FLASK_ENV') != 'development' now = datetime.datetime.now() write_timestamp(now.timestamp(), val) provision(config) finally: now = datetime.datetime.now() write_timestamp(now.timestamp(), False) def provision(config): request = AdminRequest(**config) all_orgs = get_organizations(request) orgs_to_provision = config.get('organizations', DEFAULT_ORGANIZATIONS) missing = (org['name'] for org in orgs_to_provision if org['name'] not in [org['name'] for org in all_orgs]) for org_name in missing: org_data = create_organization(request, org_name) all_orgs.append(org_data) interfaces = get_interfaces(config['inventory_provider']) tokens = [] start = time.time() for org in all_orgs: org_id = org['id'] delete_expired_api_tokens(request, org_id) token = create_api_token(request, org_id) token_request = TokenRequest(token=token['key'], **config) tokens.append((org_id, token['id'])) logger.info( f'--- Provisioning org {org["name"]} (ID #{org_id}) ---') try: org_config = next( o for o in orgs_to_provision if o['name'] == org['name']) except StopIteration: org_config = None if not org_config: logger.error( f'Org {org["name"]} does not have valid configuration.') org['info'] = 'Org exists in grafana but is not configured' continue # Only provision influxdb datasource for now datasource = config.get('datasources').get('influxdb') # Provision missing data sources if not check_provisioned(token_request, datasource): ds = create_datasource(token_request, datasource, config.get('datasources')) if ds: logger.info( f'Provisioned datasource: {datasource["name"]}') excluded_nrens = org_config.get('excluded_nrens', []) excluded_nrens = list(map(lambda f: f.lower(), excluded_nrens)) def excluded(interface): desc = interface.get('description', '').lower() return not any(nren.lower() in desc for nren in excluded_nrens) excluded_interfaces = list(filter(excluded, interfaces)) dashboards = { 'CLS': { 'predicate': is_cls, 'tag': 'CLS' }, 'RE PEER': { 'predicate': is_re_peer, 'tag': 'RE_PEER' }, 'RE CUST': { 'predicate': is_re_customer, 'tag': 'RE_CUST' }, 'GEANTOPEN': { 'predicate': is_geantopen, 'tag': 'GEANTOPEN' }, 'GCS': { 'predicate': is_gcs, 'tag': 'AUTOMATED_L2_CIRCUITS' }, 'L2 CIRCUIT': { 'predicate': is_l2circuit, 'tag': 'L2_CIRCUITS' }, 'LHCONE PEER': { 'predicate': is_lhcone_peer, 'tag': 'LHCONE_PEER' }, 'LHCONE CUST': { 'predicate': is_lhcone_customer, 'tag': 'LHCONE_CUST' }, 'MDVPN Customers': { 'predicate': is_mdvpn, 'tag': 'MDVPN' }, 'Infrastructure Backbone': { 'predicate': is_lag_backbone, 'tag': 'BACKBONE', 'errors': True, 'parse_func': parse_backbone_name }, 'IAS PRIVATE': { 'predicate': is_ias_private, 'tag': 'IAS_PRIVATE' }, 'IAS PUBLIC': { 'predicate': is_ias_public, 'tag': 'IAS_PUBLIC' }, 'IAS CUSTOMER': { 'predicate': is_ias_customer, 'tag': 'IAS_CUSTOMER' }, 'IAS UPSTREAM': { 'predicate': is_ias_upstream, 'tag': ['IAS_UPSTREAM', 'UPSTREAM'] }, 'GWS PHY Upstream': { 'predicate': is_phy_upstream, 'tag': ['GWS_UPSTREAM', 'UPSTREAM'], 'errors': True, 'parse_func': parse_phy_upstream_name } } # Provision dashboards, overwriting existing ones. datasource_name = datasource.get('name', 'PollerInfluxDB') excluded_folders = org_config.get('excluded_folders', {}) def get_uid(prev, curr): prev[curr.get('uid')] = False return prev # Map of dashboard UID -> whether it has been updated. # This is used to remove stale dashboards at the end. dash_list = find_dashboard(token_request) or [] dash_list = reduce(get_uid, dash_list, {}) with ProcessPoolExecutor(max_workers=4) as executor: provisioned = [] for folder_name, dash in dashboards.items(): exclude = excluded_folders.get(folder_name) if exclude: if isinstance(exclude, bool): # boolean True -> entire folder excluded # list -> dashboard names not to provision executor.submit( delete_folder, token_request, folder_name) continue logger.info( f'Provisioning {org["name"]}/{folder_name} dashboards') res = executor.submit(provision_folder, token_request, folder_name, dash, excluded_interfaces, datasource_name, exclude) provisioned.append(res) for result in provisioned: folder = result.result() if folder is None: continue for dashboard in folder: if dashboard is None: continue dash_list[dashboard.get('uid')] = True # fetch GWS direct data and provision related dashboards logger.info('Provisioning GWS Direct dashboards') folder_name = 'GWS Direct' exclude_gws = excluded_folders.get(folder_name, []) exclude_gws = list(map(lambda f: f.lower(), exclude_gws)) if isinstance(exclude_gws, bool) and exclude_gws: # don't provision GWS Direct folder delete_folder(token_request, folder_name) else: folder = find_folder(token_request, folder_name) with ProcessPoolExecutor(max_workers=4) as executor: gws_data = get_gws_direct(config['inventory_provider']) provisioned = [] for dashboard in generate_gws(gws_data, datasource_name): rendered = render_dashboard(dashboard) if rendered.get('title').lower() in exclude_gws: executor.submit(delete_dashboard, token_request, rendered, folder['id']) continue provisioned.append(executor.submit(create_dashboard, token_request, rendered, folder['id'])) for result in provisioned: dashboard = result.result() if dashboard is None: continue dash_list[dashboard.get('uid')] = True aggregate_dashboards = { 'CLS PEERS': { 'predicate': is_cls_peer, 'tag': 'cls_peers', }, 'IAS PEERS': { 'predicate': is_ias_peer, 'tag': 'ias_peers', }, 'GWS UPSTREAMS': { 'predicate': is_ias_upstream, 'tag': 'gws_upstreams', }, 'LHCONE': { 'predicate': is_lhcone, 'tag': 'lhcone', }, 'CAE1': { 'predicate': is_cae1, 'tag': 'cae', } } exclude_agg = excluded_folders.get('Aggregates', []) if isinstance(exclude_agg, bool) and exclude_agg: # don't provision aggregate folder delete_folder(token_request, 'Aggregates') else: with ProcessPoolExecutor(max_workers=4) as executor: provisioned = [] agg_folder = find_folder(token_request, 'Aggregates') for agg_type, dash in aggregate_dashboards.items(): if agg_type in exclude_agg: dash_name = {'title': f'Aggregate - {agg_type}'} executor.submit(delete_dashboard, token_request, dash_name, agg_folder['id']) continue logger.info(f'Provisioning {org["name"]}' + f'/Aggregate {agg_type} dashboards') res = executor.submit(provision_aggregate, token_request, agg_type, agg_folder, dash, excluded_interfaces, datasource_name) provisioned.append(res) for result in provisioned: dashboard = result.result() if dashboard is None: continue dash_list[dashboard.get('uid')] = True # NREN Access dashboards # uses a different template than the above. logger.info('Provisioning NREN Access dashboards') folder = find_folder(token_request, 'NREN Access') nrens = filter(is_nren, excluded_interfaces) provisioned = generate_all_nrens( token_request, nrens, folder['id'], datasource_name) for dashboard in provisioned: if dashboard is None: continue dash_list[dashboard.get('uid')] = True # Non-generated dashboards excluded_dashboards = org_config.get('excluded_dashboards', []) logger.info('Provisioning static dashboards') for dashboard in get_dashboard_definitions(): if dashboard['title'] not in excluded_dashboards: res = create_dashboard(token_request, dashboard) if res: dash_list[res.get('uid')] = True else: delete_dashboard(token_request, dashboard) # Home dashboard is always called "Home" # Make sure it's set for the organization logger.info('Configuring Home dashboard') is_staff = org['name'] == 'GÉANT Staff' set_home_dashboard(token_request, is_staff) # just hardcode that we updated home dashboard dash_list['home'] = True for dash, provisioned in dash_list.items(): if not provisioned: logger.info(f'Deleting stale dashboard with UID {dash}') delete_dashboard(token_request, {'uid': dash}) logger.info(f'Time to complete: {time.time() - start}') for org_id, token in tokens: delete_api_token(request, org_id, token) return all_orgs