Skip to content
Snippets Groups Projects
provision.py 20 KiB
Newer Older
Erik Reid's avatar
Erik Reid committed
"""
This module is responsible for the
entire provisioning lifecycle.
"""
import os
import json
import datetime
from concurrent.futures import Future
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from brian_dashboard_manager.config import DEFAULT_ORGANIZATIONS, STATE_PATH
from brian_dashboard_manager.grafana.utils.request import AdminRequest, \
Bjarke Madsen's avatar
Bjarke Madsen committed
    TokenRequest
Bjarke Madsen's avatar
Bjarke Madsen committed
from brian_dashboard_manager.grafana.organization import \
    get_organizations, create_organization, create_api_token, \
    delete_api_token, delete_expired_api_tokens, set_home_dashboard
from brian_dashboard_manager.grafana.dashboard import find_dashboard, \
Bjarke Madsen's avatar
Bjarke Madsen committed
    get_dashboard_definitions, create_dashboard, delete_dashboard
Bjarke Madsen's avatar
Bjarke Madsen committed
from brian_dashboard_manager.grafana.datasource import \
    check_provisioned, create_datasource
from brian_dashboard_manager.grafana.folder import find_folder, \
    delete_folder, get_folders
from brian_dashboard_manager.inventory_provider.interfaces import \
    get_gws_direct, get_gws_indirect, get_interfaces, \
    get_eumetsat_multicast_subscriptions
Bjarke Madsen's avatar
Bjarke Madsen committed

from brian_dashboard_manager.templating.helpers import \
    get_aggregate_dashboard_data, get_interface_data, \
    get_nren_interface_data, get_dashboard_data, \
    get_nren_dashboard_data, get_aggregate_interface_data
from brian_dashboard_manager.templating.gws import generate_gws, \
    generate_indirect
from brian_dashboard_manager.templating.eumetsat import generate_eumetsat_multicast
from brian_dashboard_manager.templating.render import render_dashboard

logger = logging.getLogger(__name__)

MAX_WORKERS = 1
DASHBOARDS = {
    'NREN': {
        'tag': ['customers'],
        'folder_name': 'NREN Access',
        'interfaces': []
    },
    'CLS': {
        'tag': 'CLS',
        'folder_name': 'CLS',
        'interfaces': []
    },
    'RE_PEER': {
        'tag': 'RE_PEER',
        'folder_name': 'RE Peer',
        'interfaces': []
    },
    'RE_CUST': {
        'tag': 'RE_CUST',
        'folder_name': 'RE Customer',
        'interfaces': []
    },
    'GEANTOPEN': {
        'tag': 'GEANTOPEN',
        'folder_name': 'GEANTOPEN',
        'interfaces': []
    },
    'GCS': {
        'tag': 'AUTOMATED_L2_CIRCUITS',
        'folder_name': 'GCS',
        'interfaces': []
    },
    'L2_CIRCUIT': {
        'tag': 'L2_CIRCUITS',
        'folder_name': 'L2 Circuit',
        'interfaces': []
    },
    'LHCONE_PEER': {
        'tag': 'LHCONE_PEER',
        'folder_name': 'LHCONE Peer',
        'interfaces': []
    },
    'LHCONE_CUST': {
        'tag': 'LHCONE_CUST',
        'folder_name': 'LHCONE Customer',
        'interfaces': []
    },
    'MDVPN_CUSTOMERS': {
        'tag': 'MDVPN',
        'folder_name': 'MDVPN Customers',
        'interfaces': []
    },
    'INFRASTRUCTURE_BACKBONE': {
        'tag': 'BACKBONE',
        'errors': True,
        'folder_name': 'Infrastructure Backbone',
        'interfaces': []
    },
    'IAS_PRIVATE': {
        'tag': 'IAS_PRIVATE',
        'folder_name': 'IAS Private',
        'interfaces': []
    },
    'IAS_PUBLIC': {
        'tag': 'IAS_PUBLIC',
        'folder_name': 'IAS Public',
        'interfaces': []
    },
    'IAS_CUSTOMER': {
        'tag': 'IAS_CUSTOMER',
        'folder_name': 'IAS Customer',
        'interfaces': []
    },
    'IAS_UPSTREAM': {
        'tag': ['IAS_UPSTREAM', 'UPSTREAM'],
        'folder_name': 'IAS Upstream',
        'interfaces': []
    },
    'GWS_PHY_UPSTREAM': {
        'tag': ['GWS_UPSTREAM', 'UPSTREAM'],
        'errors': True,
        'folder_name': 'GWS PHY Upstream',
        'interfaces': []
    }
}

AGG_DASHBOARDS = {
    'CLS_PEERS': {
        'tag': 'cls_peers',
        'dashboard_name': 'CLS Peers',
        'interfaces': []
    },
    'IAS_PEERS': {
        'tag': 'ias_peers',
        'dashboard_name': 'IAS Peers',
        'interfaces': []
    },
    'IAS_UPSTREAM': {
        'tag': 'gws_upstreams',
        'dashboard_name': 'GWS Upstreams',
        'interfaces': []
    },
    'LHCONE': {
        'tag': 'lhcone',
        'dashboard_name': 'LHCONE',
        'interfaces': []
    },
    'CAE1': {
        'tag': 'cae',
        'dashboard_name': 'CAE1',
        'interfaces': []
    }
}

def provision_folder(token_request, folder_name, dash,
                     ds_name, excluded_dashboards):
    """
    Function to provision dashboards within a folder.
    """
    def _check_valid(interface):
        return interface['dashboard_info']['name'] != ''

    folder = find_folder(token_request, folder_name)
    tag = dash['tag']
    interfaces = list(filter(_check_valid, dash['interfaces']))
    # dashboard should include error panels
    errors = dash.get('errors', False)
    is_nren = folder_name == 'NREN Access'
    if is_nren:
        data = get_nren_interface_data(interfaces)
        dash_data = get_nren_dashboard_data(data, ds_name, tag)
    else:
        data = get_interface_data(interfaces)
        dash_data = get_dashboard_data(
            data=data,
            datasource=ds_name,
            tag=tag,
            errors=errors)
    if not isinstance(excluded_dashboards, list):
        excluded_dashboards = []
    else:
        excluded_dashboards = [s.lower() for s in excluded_dashboards]
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        for dashboard in dash_data:
            rendered = render_dashboard(dashboard, nren=is_nren)
            if rendered.get('title').lower() in excluded_dashboards:
                executor.submit(delete_dashboard, token_request,
                                rendered, folder['id'])
                continue
            provisioned.append(executor.submit(create_dashboard, token_request,
                                               rendered, folder['id']))
    return [r.result() for r in provisioned]
def provision_aggregate(token_request, folder,
                        dash, ds_name):
    name = dash['dashboard_name']
    tag = dash['tag']
    interfaces = dash['interfaces']
    data = get_aggregate_interface_data(interfaces, name)

    dashboard = get_aggregate_dashboard_data(
        f'Aggregate - {name}', data, ds_name, tag)

    rendered = render_dashboard(dashboard)
    return create_dashboard(token_request, rendered, folder['id'])
def provision_maybe(config):
    with open(STATE_PATH, 'r+') as f:
        def write_timestamp(timestamp, provisioning):
            f.seek(0)
            f.write(json.dumps(
                {'timestamp': timestamp, 'provisioning': provisioning}))
            f.truncate()

        try:
            # don't conditionally provision in dev
            val = os.environ.get('FLASK_ENV') != 'development'
            now = datetime.datetime.now()
            write_timestamp(now.timestamp(), val)
            provision(config)
        except Exception as e:
            logger.exception('Uncaught Exception:')
            raise e
        finally:
            now = datetime.datetime.now()
            write_timestamp(now.timestamp(), False)


def is_excluded_folder(org_config, folder_name):
    excluded_folders = org_config.get('excluded_folders', {})
    excluded = excluded_folders.get(folder_name, False)
    # boolean True means entire folder excluded
    # if list, it is specific dashboard names not to provision
    # so is handled at provision time.
    return isinstance(excluded, bool) and excluded
def excluded_folder_dashboards(org_config, folder_name):
    excluded_folders = org_config.get('excluded_folders', {})
    excluded = excluded_folders.get(folder_name, [])
    return excluded if isinstance(excluded, list) else []
class DashboardChanges(object):
    def __init__(self, token):
        # Map of dashboard UID -> whether it has been updated.
        # This is used to remove stale dashboards at the end.
        all_dashboards = find_dashboard(token) or []
        self.updated = {d['uid']: False for d in all_dashboards}

    def update_dash_list(self, dashboards):
        for dashboard in dashboards:
            if isinstance(dashboard, Future):
                dashboard = dashboard.result()
            if dashboard is None:
                continue
            self.updated[dashboard.get('uid')] = True

    def delete_untouched(self, token):
        for uid, provisioned in self.updated.items():
            if not provisioned:
                logger.info(f'Deleting stale dashboard with UID {uid}')
                delete_dashboard(token, {'uid': uid})


DASHBOARD_CHANGES = None  # will be an instance of DashboardChanges
def _provision_interfaces(config, org_config, ds_name, token):
    interfaces = get_interfaces(config['inventory_provider'])
    excluded_nrens = org_config['excluded_nrens']
    def excluded(interface):
        desc = interface['description'].lower()
        lab = 'lab.office' in interface['router'].lower()
        to_exclude = any(nren.lower() in desc for nren in excluded_nrens)
        return not (to_exclude or lab)
    relevant_interfaces = list(filter(excluded, interfaces))
    # Provision dashboards, overwriting existing ones.
    # loop over interfaces and add them to the dashboard_name
    # -> folder mapping structure `dashboards` above, for convenience.
    for iface in relevant_interfaces:
        for dash_name in iface['dashboards']:

            # add interface to matched dashboard
            if dash_name in DASHBOARDS:
                ifaces = DASHBOARDS[dash_name]['interfaces']
                ifaces.append(iface)
            # add to matched aggregate dashboard
            if dash_name in AGG_DASHBOARDS:
                ifaces = AGG_DASHBOARDS[dash_name]['interfaces']
                ifaces.append(iface)
    # provision dashboards and their folders
    with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
        provisioned = []
        for folder in DASHBOARDS.values():
            folder_name = folder['folder_name']
            # boolean True means entire folder excluded
            # if list, it is specific dashboard names not to provision
            # so is handled at provision time.
            if is_excluded_folder(org_config, folder_name):
                executor.submit(
                    delete_folder, token, title=folder_name)
                continue
            logger.info(
                f'Provisioning {org_config["name"]}/{folder_name} dashboards')
            res = executor.submit(provision_folder, token,
                                  folder_name, folder, ds_name,
                                  excluded_folder_dashboards(org_config, folder_name))
            provisioned.append(res)

        for result in provisioned:
            folder = result.result()
            if folder is None:
                continue
            DASHBOARD_CHANGES.update_dash_list(folder)


def _provision_gws_indirect(config, org_config, ds_name, token):
    # fetch GWS direct data and provision related dashboards
    logger.info('Provisioning GWS Indirect dashboards')
    folder_name = 'GWS Indirect'
    if is_excluded_folder(org_config, folder_name):
        # don't provision GWS Direct folder
        delete_folder(token, title=folder_name)
    else:
        folder = find_folder(token, folder_name)
        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
            gws_indirect_data = get_gws_indirect(
                config['inventory_provider'])
            provisioned = []
            dashes = generate_indirect(gws_indirect_data, ds_name)
            for dashboard in dashes:
                rendered = render_dashboard(dashboard)
                provisioned.append(executor.submit(create_dashboard,
                                                   token,
                                                   rendered, folder['id']))

            DASHBOARD_CHANGES.update_dash_list(provisioned)


def _provision_gws_direct(config, org_config, ds_name, token):
    # fetch GWS direct data and provision related dashboards
    logger.info('Provisioning GWS Direct dashboards')
    folder_name = 'GWS Direct'
    if is_excluded_folder(org_config, folder_name):
        # don't provision GWS Direct folder
        delete_folder(token, title=folder_name)
    else:
        folder = find_folder(token, folder_name)
        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
            gws_data = get_gws_direct(config['inventory_provider'])
            provisioned = []
            for dashboard in generate_gws(gws_data, ds_name):
                rendered = render_dashboard(dashboard)
                provisioned.append(executor.submit(create_dashboard,
                                                   token,
                                                   rendered, folder['id']))

            DASHBOARD_CHANGES.update_dash_list(provisioned)


def _provision_eumetsat_multicast(config, org_config, ds_name, token):
    # fetch EUMETSAT multicast provision related dashboards
    logger.info('Provisioning EUMETSAT Multicast dashboards')
    folder_name = 'EUMETSAT Multicast'
    if is_excluded_folder(org_config, folder_name):
        # don't provision EUMETSAT Multicast folder
        delete_folder(token, title=folder_name)
    else:
        folder = find_folder(token, folder_name)
        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
            subscriptions = get_eumetsat_multicast_subscriptions(config['inventory_provider'])
            provisioned = []

            for dashboard in generate_eumetsat_multicast(subscriptions, ds_name):
                rendered = render_dashboard(dashboard)
                provisioned.append(executor.submit(create_dashboard,
                                                   token,
                                                   rendered, folder['id']))

            DASHBOARD_CHANGES.update_dash_list(provisioned)


def _provision_aggregates(config, org_config, ds_name, token):
    if is_excluded_folder(org_config, 'Aggregates'):
        # don't provision aggregate folder
        delete_folder(token, title='Aggregates')
    else:
        with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
            provisioned = []
            agg_folder = find_folder(token, 'Aggregates')
            for dash in AGG_DASHBOARDS.values():
                excluded_dashboards = excluded_folder_dashboards(org_config, 'Aggregates')
                if dash['dashboard_name'] in excluded_dashboards:
                    dash_name = {
                        'title': f'Aggregate - {dash["dashboard_name"]}'}
                    executor.submit(delete_dashboard,
                                    token, dash_name,
                                    agg_folder['id'])
                logger.info(f'Provisioning {org_config["name"]}' +
                            f'/Aggregate {dash["dashboard_name"]} dashboards')  # noqa: E501
                res = executor.submit(
                    provision_aggregate, token,
                    agg_folder, dash, ds_name)
                provisioned.append(res)

            DASHBOARD_CHANGES.update_dash_list(provisioned)


def _provision_static_dashboards(config, org_config, ds_name, token):
    # Statically defined dashboards from json files
    excluded_dashboards = org_config.get('excluded_dashboards', [])
    logger.info('Provisioning static dashboards')
    for dashboard in get_dashboard_definitions():
        if dashboard['title'] not in excluded_dashboards:
            res = create_dashboard(token, dashboard)
            if res:
                DASHBOARD_CHANGES.updated[res.get('uid')] = True
            delete_dashboard(token, dashboard)
    # Home dashboard is always called "Home"
    # Make sure it's set for the organization
    logger.info('Configuring Home dashboard')
    set_home_dashboard(token, org_config['name'] == 'GÉANT Staff')
    # just hardcode that we updated home dashboard
    DASHBOARD_CHANGES.updated['home'] = True
def _set_ignored_folders_as_updated(config, org_config, token):
    # get dashboard UIDs from ignored folders
    # and make sure we don't touch them
    ignored_folders = config.get('ignored_folders', [])
    for name in ignored_folders:
        logger.info(
            f'Ignoring dashboards under the folder {org_config["name"]}/{name}')
        folder = find_folder(token, name, create=False)
        if folder is None:
            continue
        to_ignore = find_dashboard(token, folder_id=folder['id'])
        if to_ignore is None:
            continue

        for dash in to_ignore:
            # mark it updated, so we don't modify it.
            DASHBOARD_CHANGES.updated[dash['uid']] = True


def _delete_unknown_folders(config, token):
    all_folders = get_folders(token)

    folders_to_keep = [
        # General is a base folder present in Grafana
        'General', 'GWS Indirect', 'GWS Direct', 'Aggregates']
    folders_to_keep.extend([dash['folder_name']
                            for dash in DASHBOARDS.values()])
    ignored_folders = config.get('ignored_folders', [])
    folders_to_keep.extend(ignored_folders)
    folders_to_keep = set(folders_to_keep)  # de-dupe

    for folder in all_folders:
        if folder['title'] in folders_to_keep:
            continue
        delete_folder(token, uid=folder['uid'])


def _provision_datasource(config, token):
    # Only provision influxdb datasource for now
    datasource = config.get('datasources').get('influxdb')

    # Provision missing data sources
    if not check_provisioned(token, datasource):
        ds = create_datasource(token,
                               datasource,
                               config.get('datasources'))
        if ds:
Bjarke Madsen's avatar
Bjarke Madsen committed
            logger.info(
                f'Provisioned datasource: {datasource["name"]}')
Bjarke Madsen's avatar
Bjarke Madsen committed

    return datasource
def _provision_orgs(config):
    request = AdminRequest(**config)
    all_orgs = get_organizations(request)
    orgs_to_provision = config.get('organizations', DEFAULT_ORGANIZATIONS)
    missing = (org['name'] for org in orgs_to_provision
               if org['name'] not in [org['name'] for org in all_orgs])

    for org_name in missing:
        org_data = create_organization(request, org_name)
        all_orgs.append(org_data)

    return all_orgs


def provision(config):

    global DASHBOARD_CHANGES

    start = time.time()
    tokens = []
    all_orgs = _provision_orgs(config)
    request = AdminRequest(**config)

    def _find_org_config(org):
        orgs_to_provision = config.get('organizations', DEFAULT_ORGANIZATIONS)
        try:
            return next(
                o for o in orgs_to_provision if o['name'] == org['name'])
        except StopIteration:
            logger.error(
                f'Org {org["name"]} does not have valid configuration.')
            org['info'] = 'Org exists in grafana but is not configured'
            return None


    for org in all_orgs:
        org_id = org['id']
        delete_expired_api_tokens(request, org_id)
        token = create_api_token(request, org_id)
        token_request = TokenRequest(token=token['key'], **config)
        tokens.append((org_id, token['id']))

        DASHBOARD_CHANGES = DashboardChanges(token_request)

        logger.info(
            f'--- Provisioning org {org["name"]} (ID #{org_id}) ---')

        org_config = _find_org_config(org)
        if not org_config:
            # message logged from _find_org_config
            continue

        datasource = _provision_datasource(config, token_request)
        ds_name = datasource.get('name', 'PollerInfluxDB')

        _provision_interfaces(config, org_config, ds_name, token_request)
        _provision_gws_indirect(config, org_config, ds_name, token_request)
        _provision_gws_direct(config, org_config, ds_name, token_request)
        _provision_eumetsat_multicast(config, org_config, ds_name, token_request)
        _provision_aggregates(config, org_config, ds_name, token_request)
        _provision_static_dashboards(config, org_config, ds_name, token_request)
        _set_ignored_folders_as_updated(config, org_config, token_request)

        DASHBOARD_CHANGES.delete_untouched(token_request)

        _delete_unknown_folders(config, token_request)
    for org_id, token in tokens:
        delete_api_token(request, org_id, token)
    logger.info(f'Time to complete: {time.time() - start}')