-
Bjarke Madsen authoredBjarke Madsen authored
provision.py 23.04 KiB
"""
This module is responsible for the
entire provisioning lifecycle.
"""
import itertools
import os
import logging
import time
import json
import datetime
from concurrent.futures import Future
from concurrent.futures import ThreadPoolExecutor
from brian_dashboard_manager.config import DEFAULT_ORGANIZATIONS, STATE_PATH
from brian_dashboard_manager.grafana.utils.request import AdminRequest, \
TokenRequest
from brian_dashboard_manager.services.api import fetch_services
from brian_dashboard_manager.grafana.organization import \
get_organizations, create_organization, create_api_token, \
delete_api_token, delete_expired_api_tokens, set_home_dashboard
from brian_dashboard_manager.grafana.dashboard import find_dashboard, \
get_dashboard_definitions, create_dashboard, delete_dashboard
from brian_dashboard_manager.grafana.datasource import \
check_provisioned, create_datasource
from brian_dashboard_manager.grafana.folder import find_folder, \
delete_folder, get_folders
from brian_dashboard_manager.inventory_provider.interfaces import \
get_gws_direct, get_gws_indirect, get_interfaces, \
get_eumetsat_multicast_subscriptions
from brian_dashboard_manager.templating.helpers import \
get_aggregate_dashboard_data, get_interface_data, \
get_nren_interface_data, get_dashboard_data, \
get_nren_dashboard_data, get_aggregate_interface_data, \
get_nren_interface_data_old
from brian_dashboard_manager.templating.gws import generate_gws, \
generate_indirect
from brian_dashboard_manager.templating.eumetsat \
import generate_eumetsat_multicast
from brian_dashboard_manager.templating.services import create_service_panels
from brian_dashboard_manager.templating.render import render_dashboard
logger = logging.getLogger(__name__)
MAX_WORKERS = 4
DASHBOARDS = {
'NRENBETA': {
'tag': ['customersbeta'],
'folder_name': 'NREN Access BETA',
'interfaces': []
},
'NREN': {
'tag': ['customers'],
'folder_name': 'NREN Access',
'interfaces': []
},
'RE_PEER': {
'tag': 'RE_PEER',
'folder_name': 'RE Peer',
'interfaces': []
},
'RE_CUST': {
'tag': 'RE_CUST',
'folder_name': 'RE Customer',
'interfaces': []
},
'GEANTOPEN': {
'tag': 'GEANTOPEN',
'folder_name': 'GEANTOPEN',
'interfaces': []
},
'GCS': {
'tag': 'AUTOMATED_L2_CIRCUITS',
'folder_name': 'GCS',
'interfaces': []
},
'L2_CIRCUIT': {
'tag': 'L2_CIRCUITS',
'folder_name': 'L2 Circuit',
'interfaces': []
},
'LHCONE_PEER': {
'tag': 'LHCONE_PEER',
'folder_name': 'LHCONE Peer',
'interfaces': []
},
'LHCONE_CUST': {
'tag': 'LHCONE_CUST',
'folder_name': 'LHCONE Customer',
'interfaces': []
},
'MDVPN_CUSTOMERS': {
'tag': 'MDVPN',
'folder_name': 'MDVPN Customers',
'interfaces': []
},
'INFRASTRUCTURE_BACKBONE': {
'tag': 'BACKBONE',
'errors': True,
'folder_name': 'Infrastructure Backbone',
'interfaces': []
},
'IAS_PRIVATE': {
'tag': 'IAS_PRIVATE',
'folder_name': 'IAS Private',
'interfaces': []
},
'IAS_PUBLIC': {
'tag': 'IAS_PUBLIC',
'folder_name': 'IAS Public',
'interfaces': []
},
'IAS_CUSTOMER': {
'tag': 'IAS_CUSTOMER',
'folder_name': 'IAS Customer',
'interfaces': []
},
'IAS_UPSTREAM': {
'tag': ['IAS_UPSTREAM', 'UPSTREAM'],
'folder_name': 'IAS Upstream',
'interfaces': []
},
'GWS_PHY_UPSTREAM': {
'tag': ['GWS_UPSTREAM', 'UPSTREAM'],
'errors': True,
'folder_name': 'GWS PHY Upstream',
'interfaces': []
},
'GBS_10G': {
'tag': 'GBS_10G',
'errors': True,
'folder_name': '10G Guaranteed Bandwidth Service',
'interfaces': []
}
}
AGG_DASHBOARDS = {
'CLS_PEERS': {
'tag': 'cls_peers',
'dashboard_name': 'CLS Peers',
'interfaces': []
},
'IAS_PEERS': {
'tag': 'ias_peers',
'dashboard_name': 'IAS Peers',
'interfaces': []
},
'IAS_UPSTREAM': {
'tag': 'gws_upstreams',
'dashboard_name': 'GWS Upstreams',
'interfaces': []
},
'LHCONE': {
'tag': 'lhcone',
'dashboard_name': 'LHCONE',
'interfaces': []
},
'CAE1': {
'tag': 'cae',
'dashboard_name': 'CAE1',
'interfaces': []
},
'COPERNICUS': {
'tag': ['copernicus', 'services'],
'dashboard_name': 'COPERNICUS',
'group_by': 'location',
'interfaces': []
}
}
def provision_folder(token_request, folder_name, dash,
config, ds_name, excluded_dashboards):
"""
Function to provision dashboards within a folder.
"""
if not isinstance(excluded_dashboards, (list, set)):
excluded_dashboards = set()
else:
excluded_dashboards = set([s.lower() for s in excluded_dashboards])
folder = find_folder(token_request, folder_name)
tag = dash['tag']
interfaces = list(
filter(
lambda x: x['dashboards_info'],
dash['interfaces']
)
)
# dashboard should include error panels
errors = dash.get('errors', False)
is_nren = folder_name == 'NREN Access'
is_nren_beta = folder_name == 'NREN Access BETA'
if is_nren:
data = get_nren_interface_data_old(interfaces)
dash_data = get_nren_dashboard_data(data, ds_name, tag)
elif is_nren_beta:
services = fetch_services(config['reporting_provider'])
data = get_nren_interface_data(
services, interfaces, excluded_dashboards)
dash_data = get_nren_dashboard_data(data, ds_name, tag)
else:
data = get_interface_data(interfaces)
dash_data = get_dashboard_data(
data=data,
datasource=ds_name,
tag=tag,
errors=errors)
provisioned = []
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
for dashboard in dash_data:
rendered = render_dashboard(
dashboard, nren=is_nren or is_nren_beta)
if rendered.get('title').lower() in excluded_dashboards:
executor.submit(delete_dashboard, token_request,
rendered, folder['id'])
continue
provisioned.append(executor.submit(create_dashboard, token_request,
rendered, folder['id']))
return [r.result() for r in provisioned]
def provision_aggregate(token_request, folder,
dash, ds_name):
name = dash['dashboard_name']
tag = dash['tag']
interfaces = dash['interfaces']
group_field = dash.get('group_by', 'remote')
data = get_aggregate_interface_data(interfaces, name, group_field)
dashboard = get_aggregate_dashboard_data(
f'Aggregate - {name}', data, ds_name, tag)
rendered = render_dashboard(dashboard)
return create_dashboard(token_request, rendered, folder['id'])
def provision_maybe(config):
with open(STATE_PATH, 'r+') as f:
def write_timestamp(timestamp, provisioning):
f.seek(0)
f.write(json.dumps(
{'timestamp': timestamp, 'provisioning': provisioning}))
f.truncate()
try:
# don't conditionally provision in dev
val = os.environ.get('FLASK_ENV') != 'development'
now = datetime.datetime.now()
write_timestamp(now.timestamp(), val)
provision(config)
except Exception as e:
logger.exception('Uncaught Exception:')
raise e
finally:
now = datetime.datetime.now()
write_timestamp(now.timestamp(), False)
def is_excluded_folder(org_config, folder_name):
excluded_folders = org_config.get('excluded_folders', {})
excluded = excluded_folders.get(folder_name, False)
# boolean True means entire folder excluded
# if list, it is specific dashboard names not to provision
# so is handled at provision time.
return isinstance(excluded, bool) and excluded
def excluded_folder_dashboards(org_config, folder_name):
excluded_folders = org_config.get('excluded_folders', {})
excluded = excluded_folders.get(folder_name, [])
if 'NREN Access' in folder_name:
excluded_nrens = org_config.get('excluded_nrens', [])
excluded = list(set(excluded).union(set(excluded_nrens)))
return excluded if isinstance(excluded, list) else []
def _provision_interfaces(config, org_config, ds_name, token):
"""
Provision dashboards, overwriting existing ones.
:param config:
:param org_config:
:param ds_name:
:param token:
:return: yields dashboards that were created
"""
interfaces = get_interfaces(config['inventory_provider'])
excluded_nrens = org_config['excluded_nrens']
def excluded(interface):
desc = interface['description'].lower()
lab = 'lab.office' in interface['router'].lower()
to_exclude = any(nren.lower() in desc for nren in excluded_nrens)
if not (to_exclude or lab):
if 'dashboards_info' not in interface:
to_exclude = True
logger.info(f'No "dashboards_info" for '
f'{interface["router"]}:{interface["name"]}')
return not (to_exclude or lab)
relevant_interfaces = list(filter(excluded, interfaces))
for interface in relevant_interfaces:
interface['dashboards_info'] = list(filter(
lambda x: x['name'] != '',
interface['dashboards_info']
))
# loop over interfaces and add them to the dashboard_name
# -> folder mapping structure `dashboards` above, for convenience.
for dash in DASHBOARDS:
DASHBOARDS[dash]['interfaces'] = []
for dash in AGG_DASHBOARDS:
AGG_DASHBOARDS[dash]['interfaces'] = []
for iface in relevant_interfaces:
for dash_name in iface['dashboards']:
# add interface to matched dashboard
if dash_name in DASHBOARDS:
ifaces = DASHBOARDS[dash_name]['interfaces']
ifaces.append(iface)
# TODO: remove all references to NRENBETA
# when NREN service BETA is over (homedashboard/helpers)
if dash_name == 'NREN':
ifaces = DASHBOARDS['NRENBETA']['interfaces']
ifaces.append(iface)
# add to matched aggregate dashboard
if dash_name in AGG_DASHBOARDS:
ifaces = AGG_DASHBOARDS[dash_name]['interfaces']
ifaces.append(iface)
# provision dashboards and their folders
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
provisioned = []
for folder in DASHBOARDS.values():
folder_name = folder['folder_name']
# boolean True means entire folder excluded
# if list, it is specific dashboard names not to provision
# so is handled at provision time.
if is_excluded_folder(org_config, folder_name):
executor.submit(
delete_folder, token, title=folder_name)
continue
logger.info(
f'Provisioning {org_config["name"]}/{folder_name} dashboards')
res = executor.submit(
provision_folder, token,
folder_name, folder, config, ds_name,
excluded_folder_dashboards(org_config, folder_name))
provisioned.append(res)
for result in provisioned:
folder = result.result()
if folder is None:
continue
yield from folder
def _provision_service_dashboards(config, org_config, ds_name, token):
"""
Fetches service data from Reporting Provider
and creates dashboards for each customer with their services
"""
logger.info('Provisioning Service dashboards')
folder_name = 'Service POC'
# hardcode the org for the POC
if org_config.get('name') != 'GÉANT Staff':
return []
if is_excluded_folder(org_config, folder_name):
# don't provision Services folder
delete_folder(token, title=folder_name)
else:
folder = find_folder(token, folder_name)
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
services = fetch_services(config['reporting_provider'])
dashes = create_service_panels(services, ds_name)
for dashboard in dashes:
rendered = render_dashboard(dashboard)
yield executor.submit(create_dashboard,
token,
rendered, folder['id'])
def _provision_gws_indirect(config, org_config, ds_name, token):
# fetch GWS direct data and provision related dashboards
logger.info('Provisioning GWS Indirect dashboards')
folder_name = 'GWS Indirect'
if is_excluded_folder(org_config, folder_name):
# don't provision GWS Direct folder
delete_folder(token, title=folder_name)
else:
folder = find_folder(token, folder_name)
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
gws_indirect_data = get_gws_indirect(
config['inventory_provider'])
provisioned = []
dashes = generate_indirect(gws_indirect_data, ds_name)
for dashboard in dashes:
rendered = render_dashboard(dashboard)
provisioned.append(executor.submit(create_dashboard,
token,
rendered, folder['id']))
yield from provisioned
def _provision_gws_direct(config, org_config, ds_name, token):
# fetch GWS direct data and provision related dashboards
logger.info('Provisioning GWS Direct dashboards')
folder_name = 'GWS Direct'
if is_excluded_folder(org_config, folder_name):
# don't provision GWS Direct folder
delete_folder(token, title=folder_name)
else:
folder = find_folder(token, folder_name)
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
gws_data = get_gws_direct(config['inventory_provider'])
provisioned = []
for dashboard in generate_gws(gws_data, ds_name):
rendered = render_dashboard(dashboard)
provisioned.append(executor.submit(create_dashboard,
token,
rendered, folder['id']))
yield from provisioned
def _provision_eumetsat_multicast(config, org_config, ds_name, token):
# fetch EUMETSAT multicast provision related dashboards
logger.info('Provisioning EUMETSAT Multicast dashboards')
folder_name = 'EUMETSAT Multicast'
if is_excluded_folder(org_config, folder_name):
# don't provision EUMETSAT Multicast folder
delete_folder(token, title=folder_name)
else:
folder = find_folder(token, folder_name)
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
subscriptions = get_eumetsat_multicast_subscriptions(
config['inventory_provider'])
provisioned = []
for dashboard in generate_eumetsat_multicast(
subscriptions, ds_name):
rendered = render_dashboard(dashboard)
provisioned.append(
executor.submit(
create_dashboard,
token,
rendered,
folder['id']))
yield from provisioned
def _provision_aggregates(config, org_config, ds_name, token):
if is_excluded_folder(org_config, 'Aggregates'):
# don't provision aggregate folder
delete_folder(token, title='Aggregates')
else:
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
provisioned = []
agg_folder = find_folder(token, 'Aggregates')
for dash in AGG_DASHBOARDS.values():
excluded_dashboards = excluded_folder_dashboards(
org_config, 'Aggregates')
if dash['dashboard_name'] in excluded_dashboards:
dash_name = {
'title': f'Aggregate - {dash["dashboard_name"]}'}
executor.submit(delete_dashboard,
token, dash_name,
agg_folder['id'])
continue
logger.info(f'Provisioning {org_config["name"]}' +
f'/Aggregate {dash["dashboard_name"]} dashboards')
res = executor.submit(
provision_aggregate, token,
agg_folder, dash, ds_name)
provisioned.append(res)
yield from provisioned
def _provision_static_dashboards(config, org_config, ds_name, token):
# Statically defined dashboards from json files
excluded_dashboards = org_config.get('excluded_dashboards', [])
logger.info('Provisioning static dashboards')
for dashboard in get_dashboard_definitions():
if dashboard['title'] not in excluded_dashboards:
res = create_dashboard(token, dashboard)
if res:
# yield a fake dashboard dict
# ... only the 'uid' element is referenced
yield {'uid': res.get('uid')}
else:
delete_dashboard(token, dashboard)
# Home dashboard is always called "Home"
# Make sure it's set for the organization
logger.info('Configuring Home dashboard')
set_home_dashboard(token, org_config['name'] == 'GÉANT Staff')
yield {'uid': 'home'}
def _get_ignored_dashboards(config, org_config, token):
# get dashboard UIDs from ignored folders
# and make sure we don't touch them
ignored_folders = config.get('ignored_folders', [])
for name in ignored_folders:
logger.info(
'Ignoring dashboards under '
f'the folder {org_config["name"]}/{name}')
folder = find_folder(token, name, create=False)
if folder is None:
continue
to_ignore = find_dashboard(token, folder_id=folder['id'])
if to_ignore is None:
continue
for dash in to_ignore:
# return a hard-coded fake dashboard dict
# ... only the 'uid' element is referenced
yield {'uid': dash['uid']} # could just yield dash
def _delete_unknown_folders(config, token):
all_folders = get_folders(token)
folders_to_keep = [
# General is a base folder present in Grafana
'General',
# other folders, created outside of the DASHBOARDS list
'GWS Indirect',
'GWS Direct',
'Aggregates',
'EUMETSAT Multicast'
]
folders_to_keep.extend([dash['folder_name']
for dash in DASHBOARDS.values()])
ignored_folders = config.get('ignored_folders', [])
folders_to_keep.extend(ignored_folders)
folders_to_keep = set(folders_to_keep) # de-dupe
for folder in all_folders:
if folder['title'] in folders_to_keep:
continue
logger.info(f'Deleting unknown folder: {folder.get("title")}')
delete_folder(token, uid=folder['uid'])
def _provision_datasource(config, token):
# Only provision influxdb datasource for now
datasource = config.get('datasources').get('influxdb')
# Provision missing data sources
if not check_provisioned(token, datasource):
ds = create_datasource(token,
datasource,
config.get('datasources'))
if ds:
logger.info(
f'Provisioned datasource: {datasource["name"]}')
return datasource
def _provision_orgs(config):
request = AdminRequest(**config)
all_orgs = get_organizations(request)
orgs_to_provision = config.get('organizations', DEFAULT_ORGANIZATIONS)
missing = (org['name'] for org in orgs_to_provision
if org['name'] not in [org['name'] for org in all_orgs])
for org_name in missing:
org_data = create_organization(request, org_name)
all_orgs.append(org_data)
return all_orgs
def provision(config):
start = time.time()
tokens = []
all_orgs = _provision_orgs(config)
request = AdminRequest(**config)
delete_expired_api_tokens(request)
def _find_org_config(org):
orgs_to_provision = config.get('organizations', DEFAULT_ORGANIZATIONS)
try:
return next(
o for o in orgs_to_provision if o['name'] == org['name'])
except StopIteration:
logger.error(
f'Org {org["name"]} does not have valid configuration.')
org['info'] = 'Org exists in grafana but is not configured'
return None
for org in all_orgs:
org_id = org['id']
logger.info(
f'--- Provisioning org {org["name"]} (ID #{org_id}) ---')
org_config = _find_org_config(org)
if not org_config:
# message logged from _find_org_config
continue
token = create_api_token(request, org_id)
token_request = TokenRequest(token=token['key'], **config)
tokens.append((org_id, token['id']))
logger.debug(tokens)
all_original_dashboards = find_dashboard(token_request) or []
all_original_dashboard_uids = {
d['uid'] for d in all_original_dashboards}
datasource = _provision_datasource(config, token_request)
ds_name = datasource.get('name', 'PollerInfluxDB')
managed_dashboards = itertools.chain(
_provision_interfaces(
config, org_config, ds_name, token_request),
_provision_gws_indirect(
config, org_config, ds_name, token_request),
_provision_gws_direct(
config, org_config, ds_name, token_request),
_provision_eumetsat_multicast(
config, org_config, ds_name, token_request),
_provision_aggregates(
config, org_config, ds_name, token_request),
_provision_static_dashboards(
config, org_config, ds_name, token_request),
_get_ignored_dashboards(
config, org_config, token_request)
)
managed_dashboard_uids = set()
for dashboard in managed_dashboards:
if isinstance(dashboard, Future):
dashboard = dashboard.result()
if dashboard is None:
continue
managed_dashboard_uids.add(dashboard['uid'])
for uid in all_original_dashboard_uids - managed_dashboard_uids:
logger.info(f'Deleting stale dashboard with UID {uid}')
delete_dashboard(token_request, {'uid': uid})
_delete_unknown_folders(config, token_request)
delete_api_token(request, token['id'], org_id=org_id)
logger.info(f'Time to complete: {time.time() - start}')
return all_orgs