Newer
Older
"""
This module is responsible for the
entire provisioning lifecycle.
"""
import json
import datetime
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from brian_dashboard_manager.config import DEFAULT_ORGANIZATIONS, STATE_PATH
from brian_dashboard_manager.grafana.utils.request import AdminRequest, \
get_organizations, create_organization, create_api_token, \
delete_api_token, delete_expired_api_tokens, set_home_dashboard
from brian_dashboard_manager.grafana.dashboard import find_dashboard, \
check_provisioned, create_datasource
from brian_dashboard_manager.grafana.folder import find_folder, \
from brian_dashboard_manager.inventory_provider.interfaces import \
get_gws_direct, get_gws_indirect, get_interfaces, \
get_eumetsat_multicast_subscriptions
from brian_dashboard_manager.templating.helpers import \
get_aggregate_dashboard_data, get_interface_data, \
get_nren_interface_data, get_dashboard_data, \
get_nren_dashboard_data, get_aggregate_interface_data
from brian_dashboard_manager.templating.gws import generate_gws, \
generate_indirect
from brian_dashboard_manager.templating.eumetsat import generate_eumetsat_multicast
from brian_dashboard_manager.templating.render import render_dashboard
logger = logging.getLogger(__name__)
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
MAX_WORKERS = 1
DASHBOARDS = {
'NREN': {
'tag': ['customers'],
'folder_name': 'NREN Access',
'interfaces': []
},
'CLS': {
'tag': 'CLS',
'folder_name': 'CLS',
'interfaces': []
},
'RE_PEER': {
'tag': 'RE_PEER',
'folder_name': 'RE Peer',
'interfaces': []
},
'RE_CUST': {
'tag': 'RE_CUST',
'folder_name': 'RE Customer',
'interfaces': []
},
'GEANTOPEN': {
'tag': 'GEANTOPEN',
'folder_name': 'GEANTOPEN',
'interfaces': []
},
'GCS': {
'tag': 'AUTOMATED_L2_CIRCUITS',
'folder_name': 'GCS',
'interfaces': []
},
'L2_CIRCUIT': {
'tag': 'L2_CIRCUITS',
'folder_name': 'L2 Circuit',
'interfaces': []
},
'LHCONE_PEER': {
'tag': 'LHCONE_PEER',
'folder_name': 'LHCONE Peer',
'interfaces': []
},
'LHCONE_CUST': {
'tag': 'LHCONE_CUST',
'folder_name': 'LHCONE Customer',
'interfaces': []
},
'MDVPN_CUSTOMERS': {
'tag': 'MDVPN',
'folder_name': 'MDVPN Customers',
'interfaces': []
},
'INFRASTRUCTURE_BACKBONE': {
'tag': 'BACKBONE',
'errors': True,
'folder_name': 'Infrastructure Backbone',
'interfaces': []
},
'IAS_PRIVATE': {
'tag': 'IAS_PRIVATE',
'folder_name': 'IAS Private',
'interfaces': []
},
'IAS_PUBLIC': {
'tag': 'IAS_PUBLIC',
'folder_name': 'IAS Public',
'interfaces': []
},
'IAS_CUSTOMER': {
'tag': 'IAS_CUSTOMER',
'folder_name': 'IAS Customer',
'interfaces': []
},
'IAS_UPSTREAM': {
'tag': ['IAS_UPSTREAM', 'UPSTREAM'],
'folder_name': 'IAS Upstream',
'interfaces': []
},
'GWS_PHY_UPSTREAM': {
'tag': ['GWS_UPSTREAM', 'UPSTREAM'],
'errors': True,
'folder_name': 'GWS PHY Upstream',
'interfaces': []
}
}
AGG_DASHBOARDS = {
'CLS_PEERS': {
'tag': 'cls_peers',
'dashboard_name': 'CLS Peers',
'interfaces': []
},
'IAS_PEERS': {
'tag': 'ias_peers',
'dashboard_name': 'IAS Peers',
'interfaces': []
},
'IAS_UPSTREAM': {
'tag': 'gws_upstreams',
'dashboard_name': 'GWS Upstreams',
'interfaces': []
},
'LHCONE': {
'tag': 'lhcone',
'dashboard_name': 'LHCONE',
'interfaces': []
},
'CAE1': {
'tag': 'cae',
'dashboard_name': 'CAE1',
'interfaces': []
}
}
def provision_folder(token_request, folder_name, dash,
ds_name, excluded_dashboards):
"""
Function to provision dashboards within a folder.
"""
Bjarke Madsen
committed
def _check_valid(interface):
return interface['dashboard_info']['name'] != ''
folder = find_folder(token_request, folder_name)
tag = dash['tag']
interfaces = list(filter(_check_valid, dash['interfaces']))
Bjarke Madsen
committed
# dashboard should include error panels
is_nren = folder_name == 'NREN Access'
if is_nren:
data = get_nren_interface_data(interfaces)
dash_data = get_nren_dashboard_data(data, ds_name, tag)
else:
data = get_interface_data(interfaces)
dash_data = get_dashboard_data(
data=data,
datasource=ds_name,
tag=tag,
errors=errors)
if not isinstance(excluded_dashboards, list):
excluded_dashboards = []
else:
excluded_dashboards = [s.lower() for s in excluded_dashboards]
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
for dashboard in dash_data:
rendered = render_dashboard(dashboard, nren=is_nren)
if rendered.get('title').lower() in excluded_dashboards:
executor.submit(delete_dashboard, token_request,
rendered, folder['id'])
continue
provisioned.append(executor.submit(create_dashboard, token_request,
rendered, folder['id']))
return [r.result() for r in provisioned]
def provision_aggregate(token_request, folder,
dash, ds_name):
name = dash['dashboard_name']
tag = dash['tag']
interfaces = dash['interfaces']
data = get_aggregate_interface_data(interfaces, name)
dashboard = get_aggregate_dashboard_data(
f'Aggregate - {name}', data, ds_name, tag)
rendered = render_dashboard(dashboard)
return create_dashboard(token_request, rendered, folder['id'])
def provision_maybe(config):
with open(STATE_PATH, 'r+') as f:
def write_timestamp(timestamp, provisioning):
f.seek(0)
f.write(json.dumps(
{'timestamp': timestamp, 'provisioning': provisioning}))
f.truncate()
try:
# don't conditionally provision in dev
val = os.environ.get('FLASK_ENV') != 'development'
now = datetime.datetime.now()
except Exception as e:
logger.exception('Uncaught Exception:')
raise e
finally:
now = datetime.datetime.now()
write_timestamp(now.timestamp(), False)
def is_excluded_folder(org_config, folder_name):
excluded_folders = org_config.get('excluded_folders', {})
excluded = excluded_folders.get(folder_name, False)
# boolean True means entire folder excluded
# if list, it is specific dashboard names not to provision
# so is handled at provision time.
return isinstance(excluded, bool) and excluded
def excluded_folder_dashboards(org_config, folder_name):
excluded_folders = org_config.get('excluded_folders', {})
excluded = excluded_folders.get(folder_name, [])
return excluded if isinstance(excluded, list) else []
class DashboardChanges(object):
def __init__(self, token):
# Map of dashboard UID -> whether it has been updated.
# This is used to remove stale dashboards at the end.
all_dashboards = find_dashboard(token) or []
self.updated = {d['uid']: False for d in all_dashboards}
def update_dash_list(self, dashboards):
for dashboard in dashboards:
if isinstance(dashboard, Future):
dashboard = dashboard.result()
if dashboard is None:
continue
self.updated[dashboard.get('uid')] = True
def delete_untouched(self, token):
for uid, provisioned in self.updated.items():
if not provisioned:
logger.info(f'Deleting stale dashboard with UID {uid}')
delete_dashboard(token, {'uid': uid})
DASHBOARD_CHANGES = None # will be an instance of DashboardChanges
def _provision_interfaces(config, org_config, ds_name, token):
interfaces = get_interfaces(config['inventory_provider'])
excluded_nrens = org_config['excluded_nrens']
def excluded(interface):
desc = interface['description'].lower()
lab = 'lab.office' in interface['router'].lower()
to_exclude = any(nren.lower() in desc for nren in excluded_nrens)
return not (to_exclude or lab)
relevant_interfaces = list(filter(excluded, interfaces))
# Provision dashboards, overwriting existing ones.
# loop over interfaces and add them to the dashboard_name
# -> folder mapping structure `dashboards` above, for convenience.
for iface in relevant_interfaces:
for dash_name in iface['dashboards']:
# add interface to matched dashboard
if dash_name in DASHBOARDS:
ifaces = DASHBOARDS[dash_name]['interfaces']
ifaces.append(iface)
# add to matched aggregate dashboard
if dash_name in AGG_DASHBOARDS:
ifaces = AGG_DASHBOARDS[dash_name]['interfaces']
ifaces.append(iface)
# provision dashboards and their folders
with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
provisioned = []
for folder in DASHBOARDS.values():
folder_name = folder['folder_name']
# boolean True means entire folder excluded
# if list, it is specific dashboard names not to provision
# so is handled at provision time.
if is_excluded_folder(org_config, folder_name):
executor.submit(
delete_folder, token, title=folder_name)
continue
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
logger.info(
f'Provisioning {org_config["name"]}/{folder_name} dashboards')
res = executor.submit(provision_folder, token,
folder_name, folder, ds_name,
excluded_folder_dashboards(org_config, folder_name))
provisioned.append(res)
for result in provisioned:
folder = result.result()
if folder is None:
continue
DASHBOARD_CHANGES.update_dash_list(folder)
def _provision_gws_indirect(config, org_config, ds_name, token):
# fetch GWS direct data and provision related dashboards
logger.info('Provisioning GWS Indirect dashboards')
folder_name = 'GWS Indirect'
if is_excluded_folder(org_config, folder_name):
# don't provision GWS Direct folder
delete_folder(token, title=folder_name)
else:
folder = find_folder(token, folder_name)
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
gws_indirect_data = get_gws_indirect(
config['inventory_provider'])
provisioned = []
dashes = generate_indirect(gws_indirect_data, ds_name)
for dashboard in dashes:
rendered = render_dashboard(dashboard)
provisioned.append(executor.submit(create_dashboard,
token,
rendered, folder['id']))
DASHBOARD_CHANGES.update_dash_list(provisioned)
def _provision_gws_direct(config, org_config, ds_name, token):
# fetch GWS direct data and provision related dashboards
logger.info('Provisioning GWS Direct dashboards')
folder_name = 'GWS Direct'
if is_excluded_folder(org_config, folder_name):
# don't provision GWS Direct folder
delete_folder(token, title=folder_name)
else:
folder = find_folder(token, folder_name)
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
gws_data = get_gws_direct(config['inventory_provider'])
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
for dashboard in generate_gws(gws_data, ds_name):
rendered = render_dashboard(dashboard)
provisioned.append(executor.submit(create_dashboard,
token,
rendered, folder['id']))
DASHBOARD_CHANGES.update_dash_list(provisioned)
def _provision_eumetsat_multicast(config, org_config, ds_name, token):
# fetch EUMETSAT multicast provision related dashboards
logger.info('Provisioning EUMETSAT Multicast dashboards')
folder_name = 'EUMETSAT Multicast'
if is_excluded_folder(org_config, folder_name):
# don't provision EUMETSAT Multicast folder
delete_folder(token, title=folder_name)
else:
folder = find_folder(token, folder_name)
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
subscriptions = get_eumetsat_multicast_subscriptions(config['inventory_provider'])
provisioned = []
for dashboard in generate_eumetsat_multicast(subscriptions, ds_name):
rendered = render_dashboard(dashboard)
provisioned.append(executor.submit(create_dashboard,
token,
rendered, folder['id']))
DASHBOARD_CHANGES.update_dash_list(provisioned)
def _provision_aggregates(config, org_config, ds_name, token):
if is_excluded_folder(org_config, 'Aggregates'):
# don't provision aggregate folder
delete_folder(token, title='Aggregates')
else:
with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
provisioned = []
agg_folder = find_folder(token, 'Aggregates')
for dash in AGG_DASHBOARDS.values():
excluded_dashboards = excluded_folder_dashboards(org_config, 'Aggregates')
if dash['dashboard_name'] in excluded_dashboards:
dash_name = {
'title': f'Aggregate - {dash["dashboard_name"]}'}
executor.submit(delete_dashboard,
token, dash_name,
agg_folder['id'])
logger.info(f'Provisioning {org_config["name"]}' +
f'/Aggregate {dash["dashboard_name"]} dashboards') # noqa: E501
res = executor.submit(
provision_aggregate, token,
agg_folder, dash, ds_name)
provisioned.append(res)
DASHBOARD_CHANGES.update_dash_list(provisioned)
def _provision_static_dashboards(config, org_config, ds_name, token):
# Statically defined dashboards from json files
excluded_dashboards = org_config.get('excluded_dashboards', [])
logger.info('Provisioning static dashboards')
for dashboard in get_dashboard_definitions():
if dashboard['title'] not in excluded_dashboards:
res = create_dashboard(token, dashboard)
if res:
DASHBOARD_CHANGES.updated[res.get('uid')] = True
delete_dashboard(token, dashboard)
# Home dashboard is always called "Home"
# Make sure it's set for the organization
logger.info('Configuring Home dashboard')
set_home_dashboard(token, org_config['name'] == 'GÉANT Staff')
# just hardcode that we updated home dashboard
DASHBOARD_CHANGES.updated['home'] = True
def _set_ignored_folders_as_updated(config, org_config, token):
# get dashboard UIDs from ignored folders
# and make sure we don't touch them
ignored_folders = config.get('ignored_folders', [])
for name in ignored_folders:
logger.info(
f'Ignoring dashboards under the folder {org_config["name"]}/{name}')
folder = find_folder(token, name, create=False)
if folder is None:
continue
to_ignore = find_dashboard(token, folder_id=folder['id'])
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
if to_ignore is None:
continue
for dash in to_ignore:
# mark it updated, so we don't modify it.
DASHBOARD_CHANGES.updated[dash['uid']] = True
def _delete_unknown_folders(config, token):
all_folders = get_folders(token)
folders_to_keep = [
# General is a base folder present in Grafana
'General', 'GWS Indirect', 'GWS Direct', 'Aggregates']
folders_to_keep.extend([dash['folder_name']
for dash in DASHBOARDS.values()])
ignored_folders = config.get('ignored_folders', [])
folders_to_keep.extend(ignored_folders)
folders_to_keep = set(folders_to_keep) # de-dupe
for folder in all_folders:
if folder['title'] in folders_to_keep:
continue
delete_folder(token, uid=folder['uid'])
def _provision_datasource(config, token):
# Only provision influxdb datasource for now
datasource = config.get('datasources').get('influxdb')
# Provision missing data sources
if not check_provisioned(token, datasource):
ds = create_datasource(token,
datasource,
config.get('datasources'))
if ds:
f'Provisioned datasource: {datasource["name"]}')
Bjarke Madsen
committed
def _provision_orgs(config):
request = AdminRequest(**config)
all_orgs = get_organizations(request)
orgs_to_provision = config.get('organizations', DEFAULT_ORGANIZATIONS)
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
missing = (org['name'] for org in orgs_to_provision
if org['name'] not in [org['name'] for org in all_orgs])
for org_name in missing:
org_data = create_organization(request, org_name)
all_orgs.append(org_data)
return all_orgs
def provision(config):
global DASHBOARD_CHANGES
start = time.time()
tokens = []
all_orgs = _provision_orgs(config)
request = AdminRequest(**config)
def _find_org_config(org):
orgs_to_provision = config.get('organizations', DEFAULT_ORGANIZATIONS)
try:
return next(
o for o in orgs_to_provision if o['name'] == org['name'])
except StopIteration:
logger.error(
f'Org {org["name"]} does not have valid configuration.')
org['info'] = 'Org exists in grafana but is not configured'
return None
for org in all_orgs:
org_id = org['id']
delete_expired_api_tokens(request, org_id)
token = create_api_token(request, org_id)
token_request = TokenRequest(token=token['key'], **config)
tokens.append((org_id, token['id']))
DASHBOARD_CHANGES = DashboardChanges(token_request)
logger.info(
f'--- Provisioning org {org["name"]} (ID #{org_id}) ---')
org_config = _find_org_config(org)
if not org_config:
# message logged from _find_org_config
continue
datasource = _provision_datasource(config, token_request)
ds_name = datasource.get('name', 'PollerInfluxDB')
_provision_interfaces(config, org_config, ds_name, token_request)
_provision_gws_indirect(config, org_config, ds_name, token_request)
_provision_gws_direct(config, org_config, ds_name, token_request)
_provision_eumetsat_multicast(config, org_config, ds_name, token_request)
_provision_aggregates(config, org_config, ds_name, token_request)
_provision_static_dashboards(config, org_config, ds_name, token_request)
_set_ignored_folders_as_updated(config, org_config, token_request)
DASHBOARD_CHANGES.delete_untouched(token_request)
_delete_unknown_folders(config, token_request)
for org_id, token in tokens:
delete_api_token(request, org_id, token)
logger.info(f'Time to complete: {time.time() - start}')