Skip to content
Snippets Groups Projects
provision.py 20.34 KiB
"""
This module is responsible for the
entire provisioning lifecycle.
"""
import itertools
import os
import logging
import time
import json
import datetime
from concurrent.futures import Future
from concurrent.futures import ThreadPoolExecutor
from brian_dashboard_manager.config import DEFAULT_ORGANIZATIONS, STATE_PATH
from brian_dashboard_manager.grafana.utils.request import AdminRequest, \
    TokenRequest

from brian_dashboard_manager.grafana.organization import \
    get_organizations, create_organization, create_api_token, \
    delete_api_token, delete_expired_api_tokens, set_home_dashboard
from brian_dashboard_manager.grafana.dashboard import find_dashboard, \
    get_dashboard_definitions, create_dashboard, delete_dashboard
from brian_dashboard_manager.grafana.datasource import \
    check_provisioned, create_datasource
from brian_dashboard_manager.grafana.folder import find_folder, \
    delete_folder, get_folders
from brian_dashboard_manager.inventory_provider.interfaces import \
    get_gws_direct, get_gws_indirect, get_interfaces, \
    get_eumetsat_multicast_subscriptions

from brian_dashboard_manager.templating.helpers import \
    get_aggregate_dashboard_data, get_interface_data, \
    get_nren_interface_data, get_dashboard_data, \
    get_nren_dashboard_data, get_aggregate_interface_data

from brian_dashboard_manager.templating.gws import generate_gws, \
    generate_indirect
from brian_dashboard_manager.templating.eumetsat \
    import generate_eumetsat_multicast
from brian_dashboard_manager.templating.render import render_dashboard

logger = logging.getLogger(__name__)

MAX_WORKERS = 4
DASHBOARDS = {
    'NREN': {
        'tag': ['customers'],
        'folder_name': 'NREN Access',
        'interfaces': []
    },
    'CLS': {
        'tag': 'CLS',
        'folder_name': 'CLS',
        'interfaces': []
    },
    'RE_PEER': {
        'tag': 'RE_PEER',
        'folder_name': 'RE Peer',
        'interfaces': []
    },
    'RE_CUST': {
        'tag': 'RE_CUST',
        'folder_name': 'RE Customer',
        'interfaces': []
    },
    'GEANTOPEN': {
        'tag': 'GEANTOPEN',
        'folder_name': 'GEANTOPEN',
        'interfaces': []
    },
    'GCS': {
        'tag': 'AUTOMATED_L2_CIRCUITS',
        'folder_name': 'GCS',
        'interfaces': []
    },
    'L2_CIRCUIT': {
        'tag': 'L2_CIRCUITS',
        'folder_name': 'L2 Circuit',
        'interfaces': []
    },
    'LHCONE_PEER': {
        'tag': 'LHCONE_PEER',
        'folder_name': 'LHCONE Peer',
        'interfaces': []
    },
    'LHCONE_CUST': {
        'tag': 'LHCONE_CUST',
        'folder_name': 'LHCONE Customer',
        'interfaces': []
    },
    'MDVPN_CUSTOMERS': {
        'tag': 'MDVPN',
        'folder_name': 'MDVPN Customers',
        'interfaces': []
    },
    'INFRASTRUCTURE_BACKBONE': {
        'tag': 'BACKBONE',
        'errors': True,
        'folder_name': 'Infrastructure Backbone',
        'interfaces': []
    },
    'IAS_PRIVATE': {
        'tag': 'IAS_PRIVATE',
        'folder_name': 'IAS Private',
        'interfaces': []
    },
    'IAS_PUBLIC': {
        'tag': 'IAS_PUBLIC',
        'folder_name': 'IAS Public',
        'interfaces': []
    },
    'IAS_CUSTOMER': {
        'tag': 'IAS_CUSTOMER',
        'folder_name': 'IAS Customer',
        'interfaces': []
    },
    'IAS_UPSTREAM': {
        'tag': ['IAS_UPSTREAM', 'UPSTREAM'],
        'folder_name': 'IAS Upstream',
        'interfaces': []
    },
    'GWS_PHY_UPSTREAM': {
        'tag': ['GWS_UPSTREAM', 'UPSTREAM'],
        'errors': True,
        'folder_name': 'GWS PHY Upstream',
        'interfaces': []
    }
}

AGG_DASHBOARDS = {
    'CLS_PEERS': {
        'tag': 'cls_peers',
        'dashboard_name': 'CLS Peers',
        'interfaces': []
    },
    'IAS_PEERS': {
        'tag': 'ias_peers',
        'dashboard_name': 'IAS Peers',
        'interfaces': []
    },
    'IAS_UPSTREAM': {
        'tag': 'gws_upstreams',
        'dashboard_name': 'GWS Upstreams',
        'interfaces': []
    },
    'LHCONE': {
        'tag': 'lhcone',
        'dashboard_name': 'LHCONE',
        'interfaces': []
    },
    'CAE1': {
        'tag': 'cae',
        'dashboard_name': 'CAE1',
        'interfaces': []
    },
    'COPERNICUS': {
        'tag': ['copernicus', 'RE_PEER'],  # show agg dashboard in R&E Peers
        'dashboard_name': 'COPERNICUS',
        'interfaces': []
    }
}


def provision_folder(token_request, folder_name, dash,
                     ds_name, excluded_dashboards):
    """
    Function to provision dashboards within a folder.
    """

    def _check_valid(interface):
        return interface['dashboard_info']['name'] != ''

    folder = find_folder(token_request, folder_name)
    tag = dash['tag']
    interfaces = list(filter(_check_valid, dash['interfaces']))

    # dashboard should include error panels
    errors = dash.get('errors', False)

    is_nren = folder_name == 'NREN Access'
    if is_nren:
        data = get_nren_interface_data(interfaces)
        dash_data = get_nren_dashboard_data(data, ds_name, tag)
    else:
        data = get_interface_data(interfaces)
        dash_data = get_dashboard_data(
            data=data,
            datasource=ds_name,
            tag=tag,
            errors=errors)

    if not isinstance(excluded_dashboards, list):
        excluded_dashboards = []
    else:
        excluded_dashboards = [s.lower() for s in excluded_dashboards]

    provisioned = []

    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        for dashboard in dash_data:
            rendered = render_dashboard(dashboard, nren=is_nren)
            if rendered.get('title').lower() in excluded_dashboards:
                executor.submit(delete_dashboard, token_request,
                                rendered, folder['id'])
                continue
            provisioned.append(executor.submit(create_dashboard, token_request,
                                               rendered, folder['id']))
    return [r.result() for r in provisioned]


def provision_aggregate(token_request, folder,
                        dash, ds_name):

    name = dash['dashboard_name']
    tag = dash['tag']
    interfaces = dash['interfaces']
    data = get_aggregate_interface_data(interfaces, name)

    dashboard = get_aggregate_dashboard_data(
        f'Aggregate - {name}', data, ds_name, tag)

    rendered = render_dashboard(dashboard)
    return create_dashboard(token_request, rendered, folder['id'])


def provision_maybe(config):
    with open(STATE_PATH, 'r+') as f:
        def write_timestamp(timestamp, provisioning):
            f.seek(0)
            f.write(json.dumps(
                {'timestamp': timestamp, 'provisioning': provisioning}))
            f.truncate()

        try:
            # don't conditionally provision in dev
            val = os.environ.get('FLASK_ENV') != 'development'
            now = datetime.datetime.now()
            write_timestamp(now.timestamp(), val)
            provision(config)
        except Exception as e:
            logger.exception('Uncaught Exception:')
            raise e
        finally:
            now = datetime.datetime.now()
            write_timestamp(now.timestamp(), False)


def is_excluded_folder(org_config, folder_name):
    excluded_folders = org_config.get('excluded_folders', {})
    excluded = excluded_folders.get(folder_name, False)
    # boolean True means entire folder excluded
    # if list, it is specific dashboard names not to provision
    # so is handled at provision time.
    return isinstance(excluded, bool) and excluded


def excluded_folder_dashboards(org_config, folder_name):
    excluded_folders = org_config.get('excluded_folders', {})
    excluded = excluded_folders.get(folder_name, [])
    return excluded if isinstance(excluded, list) else []


def _provision_interfaces(config, org_config, ds_name, token):
    """
    Provision dashboards, overwriting existing ones.

    :param config:
    :param org_config:
    :param ds_name:
    :param token:
    :return: yields dashboards that were created
    """

    interfaces = get_interfaces(config['inventory_provider'])

    excluded_nrens = org_config['excluded_nrens']

    def excluded(interface):
        desc = interface['description'].lower()
        lab = 'lab.office' in interface['router'].lower()
        to_exclude = any(nren.lower() in desc for nren in excluded_nrens)
        return not (to_exclude or lab)

    relevant_interfaces = list(filter(excluded, interfaces))

    # loop over interfaces and add them to the dashboard_name
    # -> folder mapping structure `dashboards` above, for convenience.

    for dash in DASHBOARDS:
        DASHBOARDS[dash]['interfaces'] = []

    for dash in AGG_DASHBOARDS:
        AGG_DASHBOARDS[dash]['interfaces'] = []

    for iface in relevant_interfaces:
        for dash_name in iface['dashboards']:

            # add interface to matched dashboard
            if dash_name in DASHBOARDS:
                ifaces = DASHBOARDS[dash_name]['interfaces']
                ifaces.append(iface)

            # add to matched aggregate dashboard
            if dash_name in AGG_DASHBOARDS:
                ifaces = AGG_DASHBOARDS[dash_name]['interfaces']
                ifaces.append(iface)

    # provision dashboards and their folders
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        provisioned = []
        for folder in DASHBOARDS.values():
            folder_name = folder['folder_name']

            # boolean True means entire folder excluded
            # if list, it is specific dashboard names not to provision
            # so is handled at provision time.
            if is_excluded_folder(org_config, folder_name):
                executor.submit(
                    delete_folder, token, title=folder_name)
                continue

            logger.info(
                f'Provisioning {org_config["name"]}/{folder_name} dashboards')
            res = executor.submit(
                provision_folder, token,
                folder_name, folder, ds_name,
                excluded_folder_dashboards(org_config, folder_name))
            provisioned.append(res)

        for result in provisioned:
            folder = result.result()
            if folder is None:
                continue
            yield from folder


def _provision_gws_indirect(config, org_config, ds_name, token):
    # fetch GWS direct data and provision related dashboards
    logger.info('Provisioning GWS Indirect dashboards')
    folder_name = 'GWS Indirect'
    if is_excluded_folder(org_config, folder_name):
        # don't provision GWS Direct folder
        delete_folder(token, title=folder_name)
    else:
        folder = find_folder(token, folder_name)
        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
            gws_indirect_data = get_gws_indirect(
                config['inventory_provider'])
            provisioned = []
            dashes = generate_indirect(gws_indirect_data, ds_name)
            for dashboard in dashes:
                rendered = render_dashboard(dashboard)
                provisioned.append(executor.submit(create_dashboard,
                                                   token,
                                                   rendered, folder['id']))

            yield from provisioned


def _provision_gws_direct(config, org_config, ds_name, token):
    # fetch GWS direct data and provision related dashboards
    logger.info('Provisioning GWS Direct dashboards')
    folder_name = 'GWS Direct'
    if is_excluded_folder(org_config, folder_name):
        # don't provision GWS Direct folder
        delete_folder(token, title=folder_name)
    else:
        folder = find_folder(token, folder_name)
        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
            gws_data = get_gws_direct(config['inventory_provider'])
            provisioned = []

            for dashboard in generate_gws(gws_data, ds_name):
                rendered = render_dashboard(dashboard)
                provisioned.append(executor.submit(create_dashboard,
                                                   token,
                                                   rendered, folder['id']))

            yield from provisioned


def _provision_eumetsat_multicast(config, org_config, ds_name, token):
    # fetch EUMETSAT multicast provision related dashboards
    logger.info('Provisioning EUMETSAT Multicast dashboards')
    folder_name = 'EUMETSAT Multicast'
    if is_excluded_folder(org_config, folder_name):
        # don't provision EUMETSAT Multicast folder
        delete_folder(token, title=folder_name)
    else:
        folder = find_folder(token, folder_name)
        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
            subscriptions = get_eumetsat_multicast_subscriptions(
                config['inventory_provider'])
            provisioned = []

            for dashboard in generate_eumetsat_multicast(
                    subscriptions, ds_name):
                rendered = render_dashboard(dashboard)
                provisioned.append(
                    executor.submit(
                        create_dashboard,
                        token,
                        rendered,
                        folder['id']))

            yield from provisioned


def _provision_aggregates(config, org_config, ds_name, token):
    if is_excluded_folder(org_config, 'Aggregates'):
        # don't provision aggregate folder
        delete_folder(token, title='Aggregates')
    else:
        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
            provisioned = []
            agg_folder = find_folder(token, 'Aggregates')
            for dash in AGG_DASHBOARDS.values():
                excluded_dashboards = excluded_folder_dashboards(
                    org_config, 'Aggregates')
                if dash['dashboard_name'] in excluded_dashboards:
                    dash_name = {
                        'title': f'Aggregate - {dash["dashboard_name"]}'}
                    executor.submit(delete_dashboard,
                                    token, dash_name,
                                    agg_folder['id'])
                    continue
                logger.info(f'Provisioning {org_config["name"]}' +
                            f'/Aggregate {dash["dashboard_name"]} dashboards')
                res = executor.submit(
                    provision_aggregate, token,
                    agg_folder, dash, ds_name)
                provisioned.append(res)

            yield from provisioned


def _provision_static_dashboards(config, org_config, ds_name, token):
    # Statically defined dashboards from json files
    excluded_dashboards = org_config.get('excluded_dashboards', [])
    logger.info('Provisioning static dashboards')
    for dashboard in get_dashboard_definitions():
        if dashboard['title'] not in excluded_dashboards:
            res = create_dashboard(token, dashboard)
            if res:
                # yield a fake dashboard dict
                # ... only the 'uid' element is referenced
                yield {'uid': res.get('uid')}
        else:
            delete_dashboard(token, dashboard)

    # Home dashboard is always called "Home"
    # Make sure it's set for the organization
    logger.info('Configuring Home dashboard')
    set_home_dashboard(token, org_config['name'] == 'GÉANT Staff')

    yield {'uid': 'home'}


def _get_ignored_dashboards(config, org_config, token):
    # get dashboard UIDs from ignored folders
    # and make sure we don't touch them
    ignored_folders = config.get('ignored_folders', [])
    for name in ignored_folders:
        logger.info(
            'Ignoring dashboards under '
            f'the folder {org_config["name"]}/{name}')
        folder = find_folder(token, name, create=False)
        if folder is None:
            continue
        to_ignore = find_dashboard(token, folder_id=folder['id'])

        if to_ignore is None:
            continue

        for dash in to_ignore:
            # return a hard-coded fake dashboard dict
            # ... only the 'uid' element is referenced
            yield {'uid': dash['uid']}  # could just yield dash


def _delete_unknown_folders(config, token):
    all_folders = get_folders(token)

    folders_to_keep = [
        # General is a base folder present in Grafana
        'General',
        # other folders, created outside of the DASHBOARDS list
        'GWS Indirect',
        'GWS Direct',
        'Aggregates',
        'EUMETSAT Multicast'
    ]
    folders_to_keep.extend([dash['folder_name']
                            for dash in DASHBOARDS.values()])
    ignored_folders = config.get('ignored_folders', [])
    folders_to_keep.extend(ignored_folders)
    folders_to_keep = set(folders_to_keep)  # de-dupe

    for folder in all_folders:
        if folder['title'] in folders_to_keep:
            continue
        delete_folder(token, uid=folder['uid'])


def _provision_datasource(config, token):
    # Only provision influxdb datasource for now
    datasource = config.get('datasources').get('influxdb')

    # Provision missing data sources
    if not check_provisioned(token, datasource):
        ds = create_datasource(token,
                               datasource,
                               config.get('datasources'))
        if ds:
            logger.info(
                f'Provisioned datasource: {datasource["name"]}')

    return datasource


def _provision_orgs(config):
    request = AdminRequest(**config)
    all_orgs = get_organizations(request)

    orgs_to_provision = config.get('organizations', DEFAULT_ORGANIZATIONS)

    missing = (org['name'] for org in orgs_to_provision
               if org['name'] not in [org['name'] for org in all_orgs])

    for org_name in missing:
        org_data = create_organization(request, org_name)
        all_orgs.append(org_data)

    return all_orgs


def provision(config):

    start = time.time()
    tokens = []
    all_orgs = _provision_orgs(config)
    request = AdminRequest(**config)
    delete_expired_api_tokens(request)

    def _find_org_config(org):
        orgs_to_provision = config.get('organizations', DEFAULT_ORGANIZATIONS)
        try:
            return next(
                o for o in orgs_to_provision if o['name'] == org['name'])
        except StopIteration:
            logger.error(
                f'Org {org["name"]} does not have valid configuration.')
            org['info'] = 'Org exists in grafana but is not configured'
            return None

    for org in all_orgs:
        org_id = org['id']

        logger.info(
            f'--- Provisioning org {org["name"]} (ID #{org_id}) ---')
        org_config = _find_org_config(org)
        if not org_config:
            # message logged from _find_org_config
            continue

        token = create_api_token(request, org_id)
        token_request = TokenRequest(token=token['key'], **config)
        tokens.append((org_id, token['id']))
        logger.debug(tokens)

        all_original_dashboards = find_dashboard(token_request) or []
        all_original_dashboard_uids = {
            d['uid'] for d in all_original_dashboards}

        datasource = _provision_datasource(config, token_request)
        ds_name = datasource.get('name', 'PollerInfluxDB')

        managed_dashboards = itertools.chain(
            _provision_interfaces(
                config, org_config, ds_name, token_request),
            _provision_gws_indirect(
                config, org_config, ds_name, token_request),
            _provision_gws_direct(
                config, org_config, ds_name, token_request),
            _provision_eumetsat_multicast(
                config, org_config, ds_name, token_request),
            _provision_aggregates(
                config, org_config, ds_name, token_request),
            _provision_static_dashboards(
                config, org_config, ds_name, token_request),
            _get_ignored_dashboards(
                config, org_config, token_request)
        )

        managed_dashboard_uids = set()
        for dashboard in managed_dashboards:
            if isinstance(dashboard, Future):
                dashboard = dashboard.result()
            if dashboard is None:
                continue
            managed_dashboard_uids.add(dashboard['uid'])

        for uid in all_original_dashboard_uids - managed_dashboard_uids:
            logger.info(f'Deleting stale dashboard with UID {uid}')
            delete_dashboard(token_request, {'uid': uid})

        _delete_unknown_folders(config, token_request)
        delete_api_token(request, token['id'], org_id=org_id)

    logger.info(f'Time to complete: {time.time() - start}')

    return all_orgs