Skip to content
Snippets Groups Projects
Commit e235e350 authored by Bjarke Madsen's avatar Bjarke Madsen
Browse files

Only parallelize within orgs.

Process dashboards fast, but block until org is configured.
parent 4b268d46
Branches
Tags
No related merge requests found
......@@ -30,7 +30,7 @@ logger = logging.getLogger(__name__)
def generate_all_nrens(token_request, nrens, folder_id, datasource_name):
with ThreadPoolExecutor(max_workers=3) as executor:
with ThreadPoolExecutor(max_workers=8) as executor:
for dashboard in generate_nrens(nrens, datasource_name):
executor.submit(create_dashboard, token_request,
dashboard, folder_id)
......@@ -45,7 +45,7 @@ def provision_folder(token_request, folder_name,
tag = dash['tag']
# dashboard will include error panel
errors = dash.get('errors')
errors = dash.get('errors', False)
# custom parsing function for description to dashboard name
parse_func = dash.get('parse_func')
......@@ -54,7 +54,7 @@ def provision_folder(token_request, folder_name,
data = get_interface_data(relevant_interfaces, parse_func)
dash_data = get_dashboard_data(data, datasource_name, tag, errors)
with ThreadPoolExecutor(max_workers=3) as executor:
with ThreadPoolExecutor(max_workers=4) as executor:
for dashboard in dash_data:
rendered = render_dashboard(dashboard)
executor.submit(create_dashboard, token_request,
......@@ -80,149 +80,149 @@ def provision(config):
start = time.time()
with ProcessPoolExecutor(max_workers=3) as org_executor, \
ThreadPoolExecutor(max_workers=3) as thread_executor:
for org in all_orgs:
org_id = org['id']
delete_expired_api_tokens(request, org_id)
token = create_api_token(request, org_id)
token_request = TokenRequest(token=token['key'], **config)
tokens.append((org_id, token['id']))
logger.info(
f'--- Provisioning org {org["name"]} (ID #{org_id}) ---')
try:
org_config = next(
o for o in orgs_to_provision if o['name'] == org['name'])
except StopIteration:
org_config = None
if not org_config:
logger.error(
f'Org {org["name"]} does not have valid configuration.')
org['info'] = 'Org exists in grafana but is not configured'
continue
# Only provision influxdb datasource for now
datasource = config.get('datasources').get('influxdb')
# Provision missing data sources
if not check_provisioned(token_request, datasource):
ds = create_datasource(token_request,
datasource,
config.get('datasources'))
if ds:
logger.info(
f'Provisioned datasource: {datasource["name"]}')
excluded_nrens = org_config.get('excluded_nrens', [])
excluded_nrens = list(map(lambda f: f.lower(), excluded_nrens))
def excluded(interface):
desc = interface.get('description', '').lower()
return not any(nren.lower() in desc for nren in excluded_nrens)
excluded_interfaces = list(filter(excluded, interfaces))
dashboards = {
'CLS': {
'predicate': is_cls,
'tag': 'CLS'
},
'RE PEER': {
'predicate': is_re_peer,
'tag': 'RE_PEER'
},
'RE CUST': {
'predicate': is_re_customer,
'tag': 'RE_CUST'
},
'GEANTOPEN': {
'predicate': is_geantopen,
'tag': 'GEANTOPEN'
},
'GCS': {
'predicate': is_gcs,
'tag': 'AUTOMATED_L2_CIRCUITS'
},
'L2 CIRCUIT': {
'predicate': is_l2circuit,
'tag': 'L2_CIRCUITS'
},
'LHCONE PEER': {
'predicate': is_lhcone_peer,
'tag': 'LHCONE_PEER'
},
'LHCONE CUST': {
'predicate': is_lhcone_customer,
'tag': 'LHCONE_CUST'
},
'MDVPN Customers': {
'predicate': is_mdvpn,
'tag': 'MDVPN'
},
'Infrastructure Backbone': {
'predicate': is_lag_backbone,
'tag': 'BACKBONE',
'errors': True,
'parse_func': parse_backbone_name
},
'IAS PRIVATE': {
'predicate': is_ias_private,
'tag': 'IAS_PRIVATE'
},
'IAS PUBLIC': {
'predicate': is_ias_public,
'tag': 'IAS_PUBLIC'
},
'IAS CUSTOMER': {
'predicate': is_ias_customer,
'tag': 'IAS_CUSTOMER'
},
'IAS UPSTREAM': {
'predicate': is_ias_upstream,
'tag': 'IAS_UPSTREAM'
},
'GWS PHY Upstream': {
'predicate': is_phy_upstream,
'tag': 'GWS_UPSTREAM',
'errors': True,
'parse_func': parse_phy_upstream_name
}
for org in all_orgs:
org_id = org['id']
delete_expired_api_tokens(request, org_id)
token = create_api_token(request, org_id)
token_request = TokenRequest(token=token['key'], **config)
tokens.append((org_id, token['id']))
logger.info(
f'--- Provisioning org {org["name"]} (ID #{org_id}) ---')
try:
org_config = next(
o for o in orgs_to_provision if o['name'] == org['name'])
except StopIteration:
org_config = None
if not org_config:
logger.error(
f'Org {org["name"]} does not have valid configuration.')
org['info'] = 'Org exists in grafana but is not configured'
continue
# Only provision influxdb datasource for now
datasource = config.get('datasources').get('influxdb')
# Provision missing data sources
if not check_provisioned(token_request, datasource):
ds = create_datasource(token_request,
datasource,
config.get('datasources'))
if ds:
logger.info(
f'Provisioned datasource: {datasource["name"]}')
excluded_nrens = org_config.get('excluded_nrens', [])
excluded_nrens = list(map(lambda f: f.lower(), excluded_nrens))
def excluded(interface):
desc = interface.get('description', '').lower()
return not any(nren.lower() in desc for nren in excluded_nrens)
excluded_interfaces = list(filter(excluded, interfaces))
dashboards = {
'CLS': {
'predicate': is_cls,
'tag': 'CLS'
},
'RE PEER': {
'predicate': is_re_peer,
'tag': 'RE_PEER'
},
'RE CUST': {
'predicate': is_re_customer,
'tag': 'RE_CUST'
},
'GEANTOPEN': {
'predicate': is_geantopen,
'tag': 'GEANTOPEN'
},
'GCS': {
'predicate': is_gcs,
'tag': 'AUTOMATED_L2_CIRCUITS'
},
'L2 CIRCUIT': {
'predicate': is_l2circuit,
'tag': 'L2_CIRCUITS'
},
'LHCONE PEER': {
'predicate': is_lhcone_peer,
'tag': 'LHCONE_PEER'
},
'LHCONE CUST': {
'predicate': is_lhcone_customer,
'tag': 'LHCONE_CUST'
},
'MDVPN Customers': {
'predicate': is_mdvpn,
'tag': 'MDVPN'
},
'Infrastructure Backbone': {
'predicate': is_lag_backbone,
'tag': 'BACKBONE',
'errors': True,
'parse_func': parse_backbone_name
},
'IAS PRIVATE': {
'predicate': is_ias_private,
'tag': 'IAS_PRIVATE'
},
'IAS PUBLIC': {
'predicate': is_ias_public,
'tag': 'IAS_PUBLIC'
},
'IAS CUSTOMER': {
'predicate': is_ias_customer,
'tag': 'IAS_CUSTOMER'
},
'IAS UPSTREAM': {
'predicate': is_ias_upstream,
'tag': 'IAS_UPSTREAM'
},
'GWS PHY Upstream': {
'predicate': is_phy_upstream,
'tag': 'GWS_UPSTREAM',
'errors': True,
'parse_func': parse_phy_upstream_name
}
# Provision dashboards, overwriting existing ones.
datasource_name = datasource.get('name', 'PollerInfluxDB')
}
# Provision dashboards, overwriting existing ones.
datasource_name = datasource.get('name', 'PollerInfluxDB')
with ProcessPoolExecutor(max_workers=4) as executor:
for folder_name, dash in dashboards.items():
logger.info(
f'Provisioning {org["name"]}/{folder_name} dashboards')
org_executor.submit(provision_folder, token_request,
folder_name, dash,
excluded_interfaces, datasource_name)
# NREN Access dashboards
# uses a different template than the above.
logger.info('Provisioning NREN Access dashboards')
folder = find_folder(token_request, 'NREN Access')
nrens = filter(is_nren, excluded_interfaces)
org_executor.submit(generate_all_nrens, token_request,
nrens, folder['id'], datasource_name)
# Non-generated dashboards
excluded_dashboards = org_config.get('excluded_dashboards', [])
logger.info('Provisioning static dashboards')
for dashboard in get_dashboard_definitions():
if dashboard['title'] not in excluded_dashboards:
if dashboard['title'].lower() == 'home':
dashboard['uid'] = 'home'
thread_executor.submit(
create_dashboard, token_request, dashboard)
# Home dashboard is always called "Home"
# Make sure it's set for the organization
home_dashboard = find_dashboard(token_request, 'Home')
if home_dashboard:
set_home_dashboard(token_request, home_dashboard['id'])
executor.submit(provision_folder, token_request,
folder_name, dash,
excluded_interfaces, datasource_name)
# NREN Access dashboards
# uses a different template than the above.
logger.info('Provisioning NREN Access dashboards')
folder = find_folder(token_request, 'NREN Access')
nrens = filter(is_nren, excluded_interfaces)
generate_all_nrens(token_request,
nrens, folder['id'], datasource_name)
# Non-generated dashboards
excluded_dashboards = org_config.get('excluded_dashboards', [])
logger.info('Provisioning static dashboards')
for dashboard in get_dashboard_definitions():
if dashboard['title'] not in excluded_dashboards:
if dashboard['title'].lower() == 'home':
dashboard['uid'] = 'home'
create_dashboard(token_request, dashboard)
# Home dashboard is always called "Home"
# Make sure it's set for the organization
logger.info('Configuring Home dashboard')
home_dashboard = find_dashboard(token_request, 'Home')
if home_dashboard:
set_home_dashboard(token_request, home_dashboard['id'])
logger.info(f'Time to complete: {time.time() - start}')
for org_id, token in tokens:
......
import re
import logging
from multiprocessing.pool import Pool
from itertools import product
from string import ascii_uppercase
from brian_dashboard_manager.templating.render import create_panel
......@@ -221,36 +220,27 @@ def get_panel_fields(panel, panel_type, datasource):
})
def get_panel_definitions(panel, datasource, errors=False):
result = []
result.append(get_panel_fields(
panel, 'traffic', datasource))
result.append(get_panel_fields(
panel, 'IPv6', datasource))
if errors:
result.append(get_panel_fields(
panel, 'errors', datasource))
return result
def flatten(t): return [item for sublist in t for item in sublist]
def get_dashboard_data(data, datasource, tag, errors=False):
id_gen = num_generator()
gridPos = gridPos_generator(id_gen)
pool = Pool(processes=2)
for peer, panels in data.items():
otherpanels = [({**panel, **next(gridPos)}, datasource, errors)
for panel in panels]
def get_panel_definitions(panels, datasource):
result = []
for panel in panels:
result.append(get_panel_fields(
{**panel, **next(gridPos)}, 'traffic', datasource))
result.append(get_panel_fields(
{**panel, **next(gridPos)}, 'IPv6', datasource))
if errors:
result.append(get_panel_fields(
{**panel, **next(gridPos)}, 'errors', datasource))
return result
all_panels = flatten(pool.starmap(get_panel_definitions, otherpanels))
for peer, panels in data.items():
yield {
'title': peer,
'datasource': datasource,
'panels': all_panels,
'panels': get_panel_definitions(panels, datasource),
'tag': tag
}
......@@ -7,7 +7,7 @@ from brian_dashboard_manager.templating.render import create_dropdown_panel, \
from brian_dashboard_manager.templating.helpers import \
is_aggregate_interface, is_logical_interface, is_physical_interface, \
num_generator, gridPos_generator, letter_generator, \
get_panel_definitions, flatten
get_panel_fields
def get_nrens(interfaces):
......@@ -97,48 +97,51 @@ def get_aggregate_targets(aggregates):
return ingress, egress
def get_dashboard_data(interfaces, datasource):
with ProcessPoolExecutor(max_workers=4) as executor:
def get_panel_definitions(panels, datasource, errors=False):
result = []
for panel in panels:
result.append(get_panel_fields(
{**panel, **next(gridPos)}, 'traffic', datasource))
result.append(get_panel_fields(
{**panel, **next(gridPos)}, 'IPv6', datasource))
if errors:
result.append(get_panel_fields(
{**panel, **next(gridPos)}, 'errors', datasource))
return result
def get_panels(data):
for panel in data:
yield executor.submit(
get_panel_definitions,
{
**panel,
**next(gridPos)
},
datasource
)
nren_data = executor.submit(get_nrens, interfaces).result()
for nren, data in nren_data.items():
services_dropdown = create_dropdown_panel(
'Services', **next(gridPos))
services = get_panels(data['SERVICES'])
iface_dropdown = create_dropdown_panel(
'Interfaces', **next(gridPos))
physical = get_panels(data['PHYSICAL'])
agg_ingress, agg_egress = executor.submit(
get_aggregate_targets, data['AGGREGATES']).result()
yield {
'nren_name': nren,
'datasource': datasource,
'ingress_aggregate_targets': agg_ingress,
'egress_aggregate_targets': agg_egress,
'dropdown_groups': [
{
'dropdown': services_dropdown,
'panels': flatten([p.result() for p in services]),
},
{
'dropdown': iface_dropdown,
'panels': flatten([p.result() for p in physical]),
}
]
def build_data(nren, data, datasource):
agg_ingress, agg_egress = get_aggregate_targets(data['AGGREGATES'])
services_dropdown = create_dropdown_panel('Services', **next(gridPos))
service_panels = get_panel_definitions(data['SERVICES'], datasource)
iface_dropdown = create_dropdown_panel('Interfaces', **next(gridPos))
phys_panels = get_panel_definitions(data['PHYSICAL'], datasource, True)
return {
'nren_name': nren,
'datasource': datasource,
'ingress_aggregate_targets': agg_ingress,
'egress_aggregate_targets': agg_egress,
'dropdown_groups': [
{
'dropdown': services_dropdown,
'panels': service_panels,
},
{
'dropdown': iface_dropdown,
'panels': phys_panels,
}
]
}
def get_dashboard_data(interfaces, datasource):
nren_data = get_nrens(interfaces)
result = []
with ProcessPoolExecutor(max_workers=4) as executor:
for nren, data in nren_data.items():
result.append(executor.submit(build_data, nren, data, datasource))
return [r.result() for r in result]
def generate_nrens(interfaces, datasource):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment