Newer
Older
"""
This module is responsible for the
entire provisioning lifecycle.
"""
import json
import datetime
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from brian_dashboard_manager.config import DEFAULT_ORGANIZATIONS, STATE_PATH
from brian_dashboard_manager.grafana.utils.request import \
AdminRequest, \
TokenRequest
from brian_dashboard_manager.grafana.organization import \
get_organizations, create_organization, create_api_token, \
delete_api_token, delete_expired_api_tokens, set_home_dashboard
from brian_dashboard_manager.grafana.dashboard import find_dashboard, \
get_dashboard_definitions, create_dashboard, delete_dashboard
check_provisioned, create_datasource
from brian_dashboard_manager.grafana.folder import find_folder, \
delete_folder
from brian_dashboard_manager.inventory_provider.interfaces import \
get_gws_direct, get_gws_indirect, get_interfaces
from brian_dashboard_manager.templating.nren_access import generate_nrens
from brian_dashboard_manager.templating.helpers import is_re_customer, \
is_cls_peer, is_cls, is_ias_customer, is_ias_private, is_ias_public, \
is_ias_upstream, is_ias_peer, is_lag_backbone, is_nren, is_phy_upstream, \
is_re_peer, is_gcs, is_cae1, is_geantopen, is_l2circuit, is_lhcone_peer, \
is_lhcone_customer, is_lhcone, is_mdvpn, get_aggregate_dashboard_data, \
get_interface_data, parse_backbone_name, parse_phy_upstream_name, \
get_dashboard_data, get_aggregate_interface_data
from brian_dashboard_manager.templating.gws import generate_gws, \
generate_indirect
from brian_dashboard_manager.templating.render import render_dashboard
logger = logging.getLogger(__name__)
def generate_all_nrens(token_request, nrens, folder_id, datasource_name):
with ThreadPoolExecutor(max_workers=8) as executor:
for dashboard in generate_nrens(nrens, datasource_name):
res = executor.submit(create_dashboard, token_request,
dashboard, folder_id)
provisioned.append(res)
return [r.result() for r in provisioned]
def provision_folder(token_request, folder_name,
dash, excluded_interfaces, datasource_name,
excluded_dashboards):
folder = find_folder(token_request, folder_name)
predicate = dash['predicate']
tag = dash['tag']
# dashboard will include error panel
# custom parsing function for description to dashboard name
parse_func = dash.get('parse_func')
relevant_interfaces = filter(predicate, excluded_interfaces)
data = get_interface_data(relevant_interfaces, parse_func)
dash_data = get_dashboard_data(data, datasource_name, tag, errors)
if not isinstance(excluded_dashboards, list):
excluded_dashboards = []
else:
excluded_dashboards = list(
map(lambda s: s.lower(), excluded_dashboards))
with ThreadPoolExecutor(max_workers=4) as executor:
for dashboard in dash_data:
rendered = render_dashboard(dashboard)
if rendered.get('title').lower() in excluded_dashboards:
executor.submit(delete_dashboard, token_request,
rendered, folder['id'])
continue
provisioned.append(executor.submit(create_dashboard, token_request,
rendered, folder['id']))
return [r.result() for r in provisioned]
def provision_aggregate(token_request, agg_type, aggregate_folder,
dash, excluded_interfaces, datasource_name):
predicate = dash['predicate']
tag = dash['tag']
relevant_interfaces = filter(predicate, excluded_interfaces)
data = get_aggregate_interface_data(relevant_interfaces, agg_type)
dashboard = get_aggregate_dashboard_data(
f'Aggregate - {agg_type}', data, datasource_name, tag)
rendered = render_dashboard(dashboard)
return create_dashboard(token_request, rendered, aggregate_folder['id'])
def provision_maybe(config):
with open(STATE_PATH, 'r+') as f:
def write_timestamp(timestamp, provisioning):
f.seek(0)
f.write(json.dumps(
{'timestamp': timestamp, 'provisioning': provisioning}))
f.truncate()
try:
# don't conditionally provision in dev
val = os.environ.get('FLASK_ENV') != 'development'
now = datetime.datetime.now()
except Exception as e:
logger.exception('Uncaught Exception:')
raise e
finally:
now = datetime.datetime.now()
write_timestamp(now.timestamp(), False)
request = AdminRequest(**config)
all_orgs = get_organizations(request)
orgs_to_provision = config.get('organizations', DEFAULT_ORGANIZATIONS)
missing = (org['name'] for org in orgs_to_provision
if org['name'] not in [org['name'] for org in all_orgs])
for org_name in missing:
org_data = create_organization(request, org_name)
all_orgs.append(org_data)
interfaces = get_interfaces(config['inventory_provider'])
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
for org in all_orgs:
org_id = org['id']
delete_expired_api_tokens(request, org_id)
token = create_api_token(request, org_id)
token_request = TokenRequest(token=token['key'], **config)
tokens.append((org_id, token['id']))
logger.info(
f'--- Provisioning org {org["name"]} (ID #{org_id}) ---')
try:
org_config = next(
o for o in orgs_to_provision if o['name'] == org['name'])
except StopIteration:
org_config = None
if not org_config:
logger.error(
f'Org {org["name"]} does not have valid configuration.')
org['info'] = 'Org exists in grafana but is not configured'
continue
# Only provision influxdb datasource for now
datasource = config.get('datasources').get('influxdb')
# Provision missing data sources
if not check_provisioned(token_request, datasource):
ds = create_datasource(token_request,
datasource,
config.get('datasources'))
if ds:
logger.info(
f'Provisioned datasource: {datasource["name"]}')
excluded_nrens = org_config.get('excluded_nrens', [])
excluded_nrens = list(map(lambda f: f.lower(), excluded_nrens))
def excluded(interface):
desc = interface.get('description', '').lower()
lab = 'lab.office' in interface.get('router', '').lower()
excluded_desc = any(
nren.lower() in desc for nren in excluded_nrens)
return not (excluded_desc or lab)
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
excluded_interfaces = list(filter(excluded, interfaces))
dashboards = {
'CLS': {
'predicate': is_cls,
'tag': 'CLS'
},
'RE PEER': {
'predicate': is_re_peer,
'tag': 'RE_PEER'
},
'RE CUST': {
'predicate': is_re_customer,
'tag': 'RE_CUST'
},
'GEANTOPEN': {
'predicate': is_geantopen,
'tag': 'GEANTOPEN'
},
'GCS': {
'predicate': is_gcs,
'tag': 'AUTOMATED_L2_CIRCUITS'
},
'L2 CIRCUIT': {
'predicate': is_l2circuit,
'tag': 'L2_CIRCUITS'
},
'LHCONE PEER': {
'predicate': is_lhcone_peer,
'tag': 'LHCONE_PEER'
},
'LHCONE CUST': {
'predicate': is_lhcone_customer,
'tag': 'LHCONE_CUST'
},
'MDVPN Customers': {
'predicate': is_mdvpn,
'tag': 'MDVPN'
},
'Infrastructure Backbone': {
'predicate': is_lag_backbone,
'tag': 'BACKBONE',
'errors': True,
'parse_func': parse_backbone_name
},
'IAS PRIVATE': {
'predicate': is_ias_private,
'tag': 'IAS_PRIVATE'
},
'IAS PUBLIC': {
'predicate': is_ias_public,
'tag': 'IAS_PUBLIC'
},
'IAS CUSTOMER': {
'predicate': is_ias_customer,
'tag': 'IAS_CUSTOMER'
},
'IAS UPSTREAM': {
'predicate': is_ias_upstream,
'tag': ['IAS_UPSTREAM', 'UPSTREAM']
},
'GWS PHY Upstream': {
'predicate': is_phy_upstream,
'tag': ['GWS_UPSTREAM', 'UPSTREAM'],
'errors': True,
'parse_func': parse_phy_upstream_name
}
# Provision dashboards, overwriting existing ones.
datasource_name = datasource.get('name', 'PollerInfluxDB')
excluded_folders = org_config.get('excluded_folders', {})
def get_uid(prev, curr):
prev[curr.get('uid')] = False
return prev
# Map of dashboard UID -> whether it has been updated.
# This is used to remove stale dashboards at the end.
dash_list = find_dashboard(token_request) or []
dash_list = reduce(get_uid, dash_list, {})
def update_dash_list(dashboards):
for dashboard in dashboards:
if isinstance(dashboard, Future):
dashboard = dashboard.result()
if dashboard is None:
continue
dash_list[dashboard.get('uid')] = True
with ProcessPoolExecutor(max_workers=4) as executor:
for folder_name, dash in dashboards.items():
exclude = excluded_folders.get(folder_name)
if exclude:
if isinstance(exclude, bool):
# boolean True -> entire folder excluded
# list -> dashboard names not to provision
executor.submit(
delete_folder, token_request, folder_name)
continue
logger.info(
f'Provisioning {org["name"]}/{folder_name} dashboards')
res = executor.submit(provision_folder, token_request,
folder_name, dash,
excluded_interfaces, datasource_name,
exclude)
provisioned.append(res)
for result in provisioned:
folder = result.result()
if folder is None:
continue
update_dash_list(folder)
# fetch GWS direct data and provision related dashboards
logger.info('Provisioning GWS Indirect dashboards')
folder_name = 'GWS Indirect'
exclude_indirect = excluded_folders.get(folder_name, [])
exclude_indirect = list(map(lambda f: f.lower(), exclude_indirect))
if isinstance(exclude_indirect, bool) and exclude_indirect:
# don't provision GWS Direct folder
delete_folder(token_request, folder_name)
else:
folder = find_folder(token_request, folder_name)
with ProcessPoolExecutor(max_workers=4) as executor:
gws_indirect_data = get_gws_indirect(
config['inventory_provider'])
provisioned = []
dashes = generate_indirect(gws_indirect_data, datasource_name)
for dashboard in dashes:
rendered = render_dashboard(dashboard)
if rendered.get('title').lower() in exclude_indirect:
executor.submit(delete_dashboard, token_request,
rendered, folder['id'])
provisioned.append(executor.submit(create_dashboard,
token_request,
rendered, folder['id']))
update_dash_list(provisioned)
# fetch GWS direct data and provision related dashboards
logger.info('Provisioning GWS Direct dashboards')
folder_name = 'GWS Direct'
exclude_gws = excluded_folders.get(folder_name, [])
exclude_gws = list(map(lambda f: f.lower(), exclude_gws))
if isinstance(exclude_gws, bool) and exclude_gws:
# don't provision GWS Direct folder
delete_folder(token_request, folder_name)
else:
folder = find_folder(token_request, folder_name)
with ProcessPoolExecutor(max_workers=4) as executor:
gws_data = get_gws_direct(config['inventory_provider'])
provisioned = []
for dashboard in generate_gws(gws_data, datasource_name):
rendered = render_dashboard(dashboard)
if rendered.get('title').lower() in exclude_gws:
executor.submit(delete_dashboard, token_request,
rendered, folder['id'])
continue
provisioned.append(executor.submit(create_dashboard,
token_request,
rendered, folder['id']))
aggregate_dashboards = {
'CLS PEERS': {
'predicate': is_cls_peer,
'tag': 'cls_peers',
},
'IAS PEERS': {
'predicate': is_ias_peer,
'tag': 'ias_peers',
},
'GWS UPSTREAMS': {
'predicate': is_ias_upstream,
'tag': 'gws_upstreams',
},
'LHCONE': {
'predicate': is_lhcone,
'tag': 'lhcone',
},
'CAE1': {
'predicate': is_cae1,
'tag': 'cae',
}
exclude_agg = excluded_folders.get('Aggregates', [])
if isinstance(exclude_agg, bool) and exclude_agg:
# don't provision aggregate folder
delete_folder(token_request, 'Aggregates')
else:
with ProcessPoolExecutor(max_workers=4) as executor:
agg_folder = find_folder(token_request, 'Aggregates')
for agg_type, dash in aggregate_dashboards.items():
if agg_type in exclude_agg:
dash_name = {'title': f'Aggregate - {agg_type}'}
executor.submit(delete_dashboard,
token_request, dash_name,
agg_folder['id'])
continue
logger.info(f'Provisioning {org["name"]}' +
f'/Aggregate {agg_type} dashboards')
res = executor.submit(provision_aggregate, token_request,
agg_type, agg_folder, dash,
excluded_interfaces, datasource_name)
provisioned.append(res)
# NREN Access dashboards
# uses a different template than the above.
logger.info('Provisioning NREN Access dashboards')
folder = find_folder(token_request, 'NREN Access')
nrens = filter(is_nren, excluded_interfaces)
provisioned = generate_all_nrens(
token_request, nrens, folder['id'], datasource_name)
for dashboard in provisioned:
if dashboard is None:
continue
dash_list[dashboard.get('uid')] = True
# Non-generated dashboards
excluded_dashboards = org_config.get('excluded_dashboards', [])
logger.info('Provisioning static dashboards')
for dashboard in get_dashboard_definitions():
if dashboard['title'] not in excluded_dashboards:
res = create_dashboard(token_request, dashboard)
if res:
dash_list[res.get('uid')] = True
else:
delete_dashboard(token_request, dashboard)
# Home dashboard is always called "Home"
# Make sure it's set for the organization
logger.info('Configuring Home dashboard')
is_staff = org['name'] == 'GÉANT Staff'
set_home_dashboard(token_request, is_staff)
# just hardcode that we updated home dashboard
dash_list['home'] = True
for dash, provisioned in dash_list.items():
if not provisioned:
logger.info(f'Deleting stale dashboard with UID {dash}')
delete_dashboard(token_request, {'uid': dash})
logger.info(f'Time to complete: {time.time() - start}')
for org_id, token in tokens:
delete_api_token(request, org_id, token)