""" This module is responsible for the entire provisioning lifecycle. """ import os import logging import time import json import datetime from functools import reduce from concurrent.futures import Future from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from brian_dashboard_manager.config import DEFAULT_ORGANIZATIONS, STATE_PATH from brian_dashboard_manager.grafana.utils.request import AdminRequest, \ TokenRequest from brian_dashboard_manager.grafana.organization import \ get_organizations, create_organization, create_api_token, \ delete_api_token, delete_expired_api_tokens, set_home_dashboard from brian_dashboard_manager.grafana.dashboard import find_dashboard, \ get_dashboard_definitions, create_dashboard, delete_dashboard from brian_dashboard_manager.grafana.datasource import \ check_provisioned, create_datasource from brian_dashboard_manager.grafana.folder import find_folder, \ delete_folder, get_folders from brian_dashboard_manager.inventory_provider.interfaces import \ get_gws_direct, get_gws_indirect, get_interfaces from brian_dashboard_manager.templating.helpers import \ get_aggregate_dashboard_data, get_interface_data, \ get_nren_interface_data, get_dashboard_data, \ get_nren_dashboard_data, get_aggregate_interface_data from brian_dashboard_manager.templating.gws import generate_gws, \ generate_indirect from brian_dashboard_manager.templating.render import render_dashboard logger = logging.getLogger(__name__) def provision_folder(token_request, folder_name, dash, ds_name, excluded_dashboards): """ Function to provision dashboards within a folder. """ def _check_valid(interface): return interface['dashboard_info']['name'] != '' folder = find_folder(token_request, folder_name) tag = dash['tag'] interfaces = list(filter(_check_valid, dash['interfaces'])) # dashboard should include error panels errors = dash.get('errors', False) is_nren = folder_name == 'NREN Access' if is_nren: data = get_nren_interface_data(interfaces) dash_data = get_nren_dashboard_data(data, ds_name, tag) else: data = get_interface_data(interfaces) dash_data = get_dashboard_data(data, ds_name, tag, errors) if not isinstance(excluded_dashboards, list): excluded_dashboards = [] else: excluded_dashboards = [s.lower() for s in excluded_dashboards] provisioned = [] with ThreadPoolExecutor(max_workers=4) as executor: for dashboard in dash_data: rendered = render_dashboard(dashboard, nren=is_nren) if rendered.get('title').lower() in excluded_dashboards: executor.submit(delete_dashboard, token_request, rendered, folder['id']) continue provisioned.append(executor.submit(create_dashboard, token_request, rendered, folder['id'])) return [r.result() for r in provisioned] def provision_aggregate(token_request, folder, dash, ds_name): name = dash['dashboard_name'] tag = dash['tag'] interfaces = dash['interfaces'] data = get_aggregate_interface_data(interfaces, name) dashboard = get_aggregate_dashboard_data( f'Aggregate - {name}', data, ds_name, tag) rendered = render_dashboard(dashboard) return create_dashboard(token_request, rendered, folder['id']) def provision_maybe(config): with open(STATE_PATH, 'r+') as f: def write_timestamp(timestamp, provisioning): f.seek(0) f.write(json.dumps( {'timestamp': timestamp, 'provisioning': provisioning})) f.truncate() try: # don't conditionally provision in dev val = os.environ.get('FLASK_ENV') != 'development' now = datetime.datetime.now() write_timestamp(now.timestamp(), val) provision(config) except Exception as e: logger.exception('Uncaught Exception:') raise e finally: now = datetime.datetime.now() write_timestamp(now.timestamp(), False) def provision(config): request = AdminRequest(**config) all_orgs = get_organizations(request) orgs_to_provision = config.get('organizations', DEFAULT_ORGANIZATIONS) ignored_folders = config.get('ignored_folders', []) missing = (org['name'] for org in orgs_to_provision if org['name'] not in [org['name'] for org in all_orgs]) for org_name in missing: org_data = create_organization(request, org_name) all_orgs.append(org_data) interfaces = get_interfaces(config['inventory_provider']) tokens = [] start = time.time() for org in all_orgs: org_id = org['id'] delete_expired_api_tokens(request, org_id) token = create_api_token(request, org_id) token_request = TokenRequest(token=token['key'], **config) tokens.append((org_id, token['id'])) logger.info( f'--- Provisioning org {org["name"]} (ID #{org_id}) ---') try: org_config = next( o for o in orgs_to_provision if o['name'] == org['name']) except StopIteration: org_config = None if not org_config: logger.error( f'Org {org["name"]} does not have valid configuration.') org['info'] = 'Org exists in grafana but is not configured' continue # Only provision influxdb datasource for now datasource = config.get('datasources').get('influxdb') # Provision missing data sources if not check_provisioned(token_request, datasource): ds = create_datasource(token_request, datasource, config.get('datasources')) if ds: logger.info( f'Provisioned datasource: {datasource["name"]}') excluded_nrens = org_config['excluded_nrens'] def excluded(interface): desc = interface['description'].lower() lab = 'lab.office' in interface['router'].lower() to_exclude = any(nren.lower() in desc for nren in excluded_nrens) return not (to_exclude or lab) relevant_interfaces = list(filter(excluded, interfaces)) dashboards = { 'NREN': { 'tag': ['customers'], 'folder_name': 'NREN Access', 'interfaces': [] }, 'CLS': { 'tag': 'CLS', 'folder_name': 'CLS', 'interfaces': [] }, 'RE_PEER': { 'tag': 'RE_PEER', 'folder_name': 'RE Peer', 'interfaces': [] }, 'RE_CUST': { 'tag': 'RE_CUST', 'folder_name': 'RE Customer', 'interfaces': [] }, 'GEANTOPEN': { 'tag': 'GEANTOPEN', 'folder_name': 'GEANTOPEN', 'interfaces': [] }, 'GCS': { 'tag': 'AUTOMATED_L2_CIRCUITS', 'folder_name': 'GCS', 'interfaces': [] }, 'L2_CIRCUIT': { 'tag': 'L2_CIRCUITS', 'folder_name': 'L2 Circuit', 'interfaces': [] }, 'LHCONE_PEER': { 'tag': 'LHCONE_PEER', 'folder_name': 'LHCONE Peer', 'interfaces': [] }, 'LHCONE_CUST': { 'tag': 'LHCONE_CUST', 'folder_name': 'LHCONE Customer', 'interfaces': [] }, 'MDVPN_CUSTOMERS': { 'tag': 'MDVPN', 'folder_name': 'MDVPN Customers', 'interfaces': [] }, 'INFRASTRUCTURE_BACKBONE': { 'tag': 'BACKBONE', 'errors': True, 'folder_name': 'Infrastructure Backbone', 'interfaces': [] }, 'IAS_PRIVATE': { 'tag': 'IAS_PRIVATE', 'folder_name': 'IAS Private', 'interfaces': [] }, 'IAS_PUBLIC': { 'tag': 'IAS_PUBLIC', 'folder_name': 'IAS Public', 'interfaces': [] }, 'IAS_CUSTOMER': { 'tag': 'IAS_CUSTOMER', 'folder_name': 'IAS Customer', 'interfaces': [] }, 'IAS_UPSTREAM': { 'tag': ['IAS_UPSTREAM', 'UPSTREAM'], 'folder_name': 'IAS Upstream', 'interfaces': [] }, 'GWS_PHY_UPSTREAM': { 'tag': ['GWS_UPSTREAM', 'UPSTREAM'], 'errors': True, 'folder_name': 'GWS PHY Upstream', 'interfaces': [] } } agg_dashboards = { 'CLS_PEERS': { 'tag': 'cls_peers', 'dashboard_name': 'CLS Peers', 'interfaces': [] }, 'IAS_PEERS': { 'tag': 'ias_peers', 'dashboard_name': 'IAS Peers', 'interfaces': [] }, 'IAS_UPSTREAM': { 'tag': 'gws_upstreams', 'dashboard_name': 'GWS Upstreams', 'interfaces': [] }, 'LHCONE': { 'tag': 'lhcone', 'dashboard_name': 'LHCONE', 'interfaces': [] }, 'CAE1': { 'tag': 'cae', 'dashboard_name': 'CAE1', 'interfaces': [] } } # Provision dashboards, overwriting existing ones. ds_name = datasource.get('name', 'PollerInfluxDB') excluded_folders = org_config.get('excluded_folders', {}) def get_uid(prev, curr): prev[curr.get('uid')] = False return prev # Map of dashboard UID -> whether it has been updated. # This is used to remove stale dashboards at the end. updated = find_dashboard(token_request) or [] updated = reduce(get_uid, updated, {}) # General is a base folder present in Grafana folders_to_keep = ['General', 'GWS Indirect', 'GWS Direct', 'Aggregates'] folders_to_keep.extend([dash['folder_name'] for dash in dashboards.values()]) def update_dash_list(dashboards): for dashboard in dashboards: if isinstance(dashboard, Future): dashboard = dashboard.result() if dashboard is None: continue updated[dashboard.get('uid')] = True # loop over interfaces and add them to the dashboard_name # -> folder mapping structure `dashboards` above, for convenience. for iface in relevant_interfaces: for dash_name in iface['dashboards']: # add interface to matched dashboard if dash_name in dashboards: ifaces = dashboards[dash_name]['interfaces'] ifaces.append(iface) # add to matched aggregate dashboard if dash_name in agg_dashboards: ifaces = agg_dashboards[dash_name]['interfaces'] ifaces.append(iface) # provision dashboards and their folders with ProcessPoolExecutor(max_workers=4) as executor: provisioned = [] for folder in dashboards.values(): folder_name = folder['folder_name'] exclude = excluded_folders.get(folder_name) # boolean True means entire folder excluded # if list, it is specific dashboard names not to provision # so is handled at provision time. if exclude: if isinstance(exclude, bool): executor.submit( delete_folder, token_request, folder_name) continue logger.info( f'Provisioning {org["name"]}/{folder_name} dashboards') res = executor.submit(provision_folder, token_request, folder_name, folder, ds_name, exclude) provisioned.append(res) for result in provisioned: folder = result.result() if folder is None: continue update_dash_list(folder) # fetch GWS direct data and provision related dashboards logger.info('Provisioning GWS Indirect dashboards') folder_name = 'GWS Indirect' exclude_indirect = excluded_folders.get(folder_name, False) if isinstance(exclude_indirect, bool) and exclude_indirect: # don't provision GWS Direct folder delete_folder(token_request, folder_name) else: folder = find_folder(token_request, folder_name) with ThreadPoolExecutor(max_workers=4) as executor: gws_indirect_data = get_gws_indirect( config['inventory_provider']) provisioned = [] dashes = generate_indirect(gws_indirect_data, ds_name) for dashboard in dashes: rendered = render_dashboard(dashboard) provisioned.append(executor.submit(create_dashboard, token_request, rendered, folder['id'])) update_dash_list(provisioned) # fetch GWS direct data and provision related dashboards logger.info('Provisioning GWS Direct dashboards') folder_name = 'GWS Direct' exclude_gws = excluded_folders.get(folder_name, False) if isinstance(exclude_gws, bool) and exclude_gws: # don't provision GWS Direct folder delete_folder(token_request, folder_name) else: folder = find_folder(token_request, folder_name) with ThreadPoolExecutor(max_workers=4) as executor: gws_data = get_gws_direct(config['inventory_provider']) provisioned = [] for dashboard in generate_gws(gws_data, ds_name): rendered = render_dashboard(dashboard) provisioned.append(executor.submit(create_dashboard, token_request, rendered, folder['id'])) update_dash_list(provisioned) exclude_agg = excluded_folders.get('Aggregates', []) if isinstance(exclude_agg, bool) and exclude_agg: # don't provision aggregate folder delete_folder(token_request, 'Aggregates') else: with ProcessPoolExecutor(max_workers=4) as executor: provisioned = [] agg_folder = find_folder(token_request, 'Aggregates') for dash in agg_dashboards.values(): if dash['dashboard_name'] in exclude_agg: dash_name = { 'title': f'Aggregate - {dash["dashboard_name"]}'} executor.submit(delete_dashboard, token_request, dash_name, agg_folder['id']) continue logger.info(f'Provisioning {org["name"]}' + f'/Aggregate {dash["dashboard_name"]} dashboards') # noqa: E501 res = executor.submit( provision_aggregate, token_request, agg_folder, dash, ds_name) provisioned.append(res) update_dash_list(provisioned) # Statically defined dashboards from json files excluded_dashboards = org_config.get('excluded_dashboards', []) logger.info('Provisioning static dashboards') for dashboard in get_dashboard_definitions(): if dashboard['title'] not in excluded_dashboards: res = create_dashboard(token_request, dashboard) if res: updated[res.get('uid')] = True else: delete_dashboard(token_request, dashboard) # Home dashboard is always called "Home" # Make sure it's set for the organization logger.info('Configuring Home dashboard') is_staff = org['name'] == 'GÉANT Staff' set_home_dashboard(token_request, is_staff) # just hardcode that we updated home dashboard updated['home'] = True # get dashboard UIDs from ignored folders # and make sure we don't touch them for name in ignored_folders: folders_to_keep.append(name) logger.info( f'Ignoring dashboards under the folder {org["name"]}/{name}') folder = find_folder(token_request, name, create=False) if folder is None: continue to_ignore = find_dashboard(token_request, folder_id=folder['id']) if to_ignore is None: continue for dash in to_ignore: # mark it updated, so we don't modify it. updated[dash['uid']] = True for dash, provisioned in updated.items(): if not provisioned: logger.info(f'Deleting stale dashboard with UID {dash}') delete_dashboard(token_request, {'uid': dash}) all_folders = get_folders(token_request) folders_to_keep = set(folders_to_keep) for folder in all_folders: if folder['title'] not in folders_to_keep: delete_folder(token_request, uid=folder['uid']) logger.info(f'Time to complete: {time.time() - start}') for org_id, token in tokens: delete_api_token(request, org_id, token) return all_orgs