Skip to content
Snippets Groups Projects
Commit e9dfae0f authored by Bjarke Madsen's avatar Bjarke Madsen
Browse files

Merge branch 'feature/concurrent-requests' into 'develop'

Only parallelize within orgs.

See merge request live-projects/brian-dashboard-manager!3
parents ce54fcaa e235e350
No related branches found
No related tags found
No related merge requests found
...@@ -30,7 +30,7 @@ logger = logging.getLogger(__name__) ...@@ -30,7 +30,7 @@ logger = logging.getLogger(__name__)
def generate_all_nrens(token_request, nrens, folder_id, datasource_name): def generate_all_nrens(token_request, nrens, folder_id, datasource_name):
with ThreadPoolExecutor(max_workers=3) as executor: with ThreadPoolExecutor(max_workers=8) as executor:
for dashboard in generate_nrens(nrens, datasource_name): for dashboard in generate_nrens(nrens, datasource_name):
executor.submit(create_dashboard, token_request, executor.submit(create_dashboard, token_request,
dashboard, folder_id) dashboard, folder_id)
...@@ -45,7 +45,7 @@ def provision_folder(token_request, folder_name, ...@@ -45,7 +45,7 @@ def provision_folder(token_request, folder_name,
tag = dash['tag'] tag = dash['tag']
# dashboard will include error panel # dashboard will include error panel
errors = dash.get('errors') errors = dash.get('errors', False)
# custom parsing function for description to dashboard name # custom parsing function for description to dashboard name
parse_func = dash.get('parse_func') parse_func = dash.get('parse_func')
...@@ -54,7 +54,7 @@ def provision_folder(token_request, folder_name, ...@@ -54,7 +54,7 @@ def provision_folder(token_request, folder_name,
data = get_interface_data(relevant_interfaces, parse_func) data = get_interface_data(relevant_interfaces, parse_func)
dash_data = get_dashboard_data(data, datasource_name, tag, errors) dash_data = get_dashboard_data(data, datasource_name, tag, errors)
with ThreadPoolExecutor(max_workers=3) as executor: with ThreadPoolExecutor(max_workers=4) as executor:
for dashboard in dash_data: for dashboard in dash_data:
rendered = render_dashboard(dashboard) rendered = render_dashboard(dashboard)
executor.submit(create_dashboard, token_request, executor.submit(create_dashboard, token_request,
...@@ -80,149 +80,149 @@ def provision(config): ...@@ -80,149 +80,149 @@ def provision(config):
start = time.time() start = time.time()
with ProcessPoolExecutor(max_workers=3) as org_executor, \ for org in all_orgs:
ThreadPoolExecutor(max_workers=3) as thread_executor: org_id = org['id']
for org in all_orgs: delete_expired_api_tokens(request, org_id)
org_id = org['id'] token = create_api_token(request, org_id)
delete_expired_api_tokens(request, org_id) token_request = TokenRequest(token=token['key'], **config)
token = create_api_token(request, org_id) tokens.append((org_id, token['id']))
token_request = TokenRequest(token=token['key'], **config)
tokens.append((org_id, token['id'])) logger.info(
f'--- Provisioning org {org["name"]} (ID #{org_id}) ---')
logger.info(
f'--- Provisioning org {org["name"]} (ID #{org_id}) ---') try:
org_config = next(
try: o for o in orgs_to_provision if o['name'] == org['name'])
org_config = next( except StopIteration:
o for o in orgs_to_provision if o['name'] == org['name']) org_config = None
except StopIteration:
org_config = None if not org_config:
logger.error(
if not org_config: f'Org {org["name"]} does not have valid configuration.')
logger.error( org['info'] = 'Org exists in grafana but is not configured'
f'Org {org["name"]} does not have valid configuration.') continue
org['info'] = 'Org exists in grafana but is not configured'
continue # Only provision influxdb datasource for now
datasource = config.get('datasources').get('influxdb')
# Only provision influxdb datasource for now
datasource = config.get('datasources').get('influxdb') # Provision missing data sources
if not check_provisioned(token_request, datasource):
# Provision missing data sources ds = create_datasource(token_request,
if not check_provisioned(token_request, datasource): datasource,
ds = create_datasource(token_request, config.get('datasources'))
datasource, if ds:
config.get('datasources')) logger.info(
if ds: f'Provisioned datasource: {datasource["name"]}')
logger.info(
f'Provisioned datasource: {datasource["name"]}') excluded_nrens = org_config.get('excluded_nrens', [])
excluded_nrens = list(map(lambda f: f.lower(), excluded_nrens))
excluded_nrens = org_config.get('excluded_nrens', [])
excluded_nrens = list(map(lambda f: f.lower(), excluded_nrens)) def excluded(interface):
desc = interface.get('description', '').lower()
def excluded(interface): return not any(nren.lower() in desc for nren in excluded_nrens)
desc = interface.get('description', '').lower()
return not any(nren.lower() in desc for nren in excluded_nrens) excluded_interfaces = list(filter(excluded, interfaces))
excluded_interfaces = list(filter(excluded, interfaces)) dashboards = {
'CLS': {
dashboards = { 'predicate': is_cls,
'CLS': { 'tag': 'CLS'
'predicate': is_cls, },
'tag': 'CLS' 'RE PEER': {
}, 'predicate': is_re_peer,
'RE PEER': { 'tag': 'RE_PEER'
'predicate': is_re_peer, },
'tag': 'RE_PEER' 'RE CUST': {
}, 'predicate': is_re_customer,
'RE CUST': { 'tag': 'RE_CUST'
'predicate': is_re_customer, },
'tag': 'RE_CUST' 'GEANTOPEN': {
}, 'predicate': is_geantopen,
'GEANTOPEN': { 'tag': 'GEANTOPEN'
'predicate': is_geantopen, },
'tag': 'GEANTOPEN' 'GCS': {
}, 'predicate': is_gcs,
'GCS': { 'tag': 'AUTOMATED_L2_CIRCUITS'
'predicate': is_gcs, },
'tag': 'AUTOMATED_L2_CIRCUITS' 'L2 CIRCUIT': {
}, 'predicate': is_l2circuit,
'L2 CIRCUIT': { 'tag': 'L2_CIRCUITS'
'predicate': is_l2circuit, },
'tag': 'L2_CIRCUITS' 'LHCONE PEER': {
}, 'predicate': is_lhcone_peer,
'LHCONE PEER': { 'tag': 'LHCONE_PEER'
'predicate': is_lhcone_peer, },
'tag': 'LHCONE_PEER' 'LHCONE CUST': {
}, 'predicate': is_lhcone_customer,
'LHCONE CUST': { 'tag': 'LHCONE_CUST'
'predicate': is_lhcone_customer, },
'tag': 'LHCONE_CUST' 'MDVPN Customers': {
}, 'predicate': is_mdvpn,
'MDVPN Customers': { 'tag': 'MDVPN'
'predicate': is_mdvpn, },
'tag': 'MDVPN' 'Infrastructure Backbone': {
}, 'predicate': is_lag_backbone,
'Infrastructure Backbone': { 'tag': 'BACKBONE',
'predicate': is_lag_backbone, 'errors': True,
'tag': 'BACKBONE', 'parse_func': parse_backbone_name
'errors': True, },
'parse_func': parse_backbone_name 'IAS PRIVATE': {
}, 'predicate': is_ias_private,
'IAS PRIVATE': { 'tag': 'IAS_PRIVATE'
'predicate': is_ias_private, },
'tag': 'IAS_PRIVATE' 'IAS PUBLIC': {
}, 'predicate': is_ias_public,
'IAS PUBLIC': { 'tag': 'IAS_PUBLIC'
'predicate': is_ias_public, },
'tag': 'IAS_PUBLIC' 'IAS CUSTOMER': {
}, 'predicate': is_ias_customer,
'IAS CUSTOMER': { 'tag': 'IAS_CUSTOMER'
'predicate': is_ias_customer, },
'tag': 'IAS_CUSTOMER' 'IAS UPSTREAM': {
}, 'predicate': is_ias_upstream,
'IAS UPSTREAM': { 'tag': 'IAS_UPSTREAM'
'predicate': is_ias_upstream, },
'tag': 'IAS_UPSTREAM' 'GWS PHY Upstream': {
}, 'predicate': is_phy_upstream,
'GWS PHY Upstream': { 'tag': 'GWS_UPSTREAM',
'predicate': is_phy_upstream, 'errors': True,
'tag': 'GWS_UPSTREAM', 'parse_func': parse_phy_upstream_name
'errors': True,
'parse_func': parse_phy_upstream_name
}
} }
# Provision dashboards, overwriting existing ones. }
datasource_name = datasource.get('name', 'PollerInfluxDB') # Provision dashboards, overwriting existing ones.
datasource_name = datasource.get('name', 'PollerInfluxDB')
with ProcessPoolExecutor(max_workers=4) as executor:
for folder_name, dash in dashboards.items(): for folder_name, dash in dashboards.items():
logger.info( logger.info(
f'Provisioning {org["name"]}/{folder_name} dashboards') f'Provisioning {org["name"]}/{folder_name} dashboards')
org_executor.submit(provision_folder, token_request, executor.submit(provision_folder, token_request,
folder_name, dash, folder_name, dash,
excluded_interfaces, datasource_name) excluded_interfaces, datasource_name)
# NREN Access dashboards # NREN Access dashboards
# uses a different template than the above. # uses a different template than the above.
logger.info('Provisioning NREN Access dashboards') logger.info('Provisioning NREN Access dashboards')
folder = find_folder(token_request, 'NREN Access') folder = find_folder(token_request, 'NREN Access')
nrens = filter(is_nren, excluded_interfaces) nrens = filter(is_nren, excluded_interfaces)
org_executor.submit(generate_all_nrens, token_request, generate_all_nrens(token_request,
nrens, folder['id'], datasource_name) nrens, folder['id'], datasource_name)
# Non-generated dashboards # Non-generated dashboards
excluded_dashboards = org_config.get('excluded_dashboards', []) excluded_dashboards = org_config.get('excluded_dashboards', [])
logger.info('Provisioning static dashboards') logger.info('Provisioning static dashboards')
for dashboard in get_dashboard_definitions(): for dashboard in get_dashboard_definitions():
if dashboard['title'] not in excluded_dashboards: if dashboard['title'] not in excluded_dashboards:
if dashboard['title'].lower() == 'home': if dashboard['title'].lower() == 'home':
dashboard['uid'] = 'home' dashboard['uid'] = 'home'
thread_executor.submit( create_dashboard(token_request, dashboard)
create_dashboard, token_request, dashboard)
# Home dashboard is always called "Home"
# Home dashboard is always called "Home" # Make sure it's set for the organization
# Make sure it's set for the organization logger.info('Configuring Home dashboard')
home_dashboard = find_dashboard(token_request, 'Home') home_dashboard = find_dashboard(token_request, 'Home')
if home_dashboard: if home_dashboard:
set_home_dashboard(token_request, home_dashboard['id']) set_home_dashboard(token_request, home_dashboard['id'])
logger.info(f'Time to complete: {time.time() - start}') logger.info(f'Time to complete: {time.time() - start}')
for org_id, token in tokens: for org_id, token in tokens:
......
import re import re
import logging import logging
from multiprocessing.pool import Pool
from itertools import product from itertools import product
from string import ascii_uppercase from string import ascii_uppercase
from brian_dashboard_manager.templating.render import create_panel from brian_dashboard_manager.templating.render import create_panel
...@@ -221,36 +220,27 @@ def get_panel_fields(panel, panel_type, datasource): ...@@ -221,36 +220,27 @@ def get_panel_fields(panel, panel_type, datasource):
}) })
def get_panel_definitions(panel, datasource, errors=False):
result = []
result.append(get_panel_fields(
panel, 'traffic', datasource))
result.append(get_panel_fields(
panel, 'IPv6', datasource))
if errors:
result.append(get_panel_fields(
panel, 'errors', datasource))
return result
def flatten(t): return [item for sublist in t for item in sublist]
def get_dashboard_data(data, datasource, tag, errors=False): def get_dashboard_data(data, datasource, tag, errors=False):
id_gen = num_generator() id_gen = num_generator()
gridPos = gridPos_generator(id_gen) gridPos = gridPos_generator(id_gen)
pool = Pool(processes=2)
for peer, panels in data.items(): def get_panel_definitions(panels, datasource):
otherpanels = [({**panel, **next(gridPos)}, datasource, errors) result = []
for panel in panels] for panel in panels:
result.append(get_panel_fields(
{**panel, **next(gridPos)}, 'traffic', datasource))
result.append(get_panel_fields(
{**panel, **next(gridPos)}, 'IPv6', datasource))
if errors:
result.append(get_panel_fields(
{**panel, **next(gridPos)}, 'errors', datasource))
return result
all_panels = flatten(pool.starmap(get_panel_definitions, otherpanels)) for peer, panels in data.items():
yield { yield {
'title': peer, 'title': peer,
'datasource': datasource, 'datasource': datasource,
'panels': all_panels, 'panels': get_panel_definitions(panels, datasource),
'tag': tag 'tag': tag
} }
...@@ -7,7 +7,7 @@ from brian_dashboard_manager.templating.render import create_dropdown_panel, \ ...@@ -7,7 +7,7 @@ from brian_dashboard_manager.templating.render import create_dropdown_panel, \
from brian_dashboard_manager.templating.helpers import \ from brian_dashboard_manager.templating.helpers import \
is_aggregate_interface, is_logical_interface, is_physical_interface, \ is_aggregate_interface, is_logical_interface, is_physical_interface, \
num_generator, gridPos_generator, letter_generator, \ num_generator, gridPos_generator, letter_generator, \
get_panel_definitions, flatten get_panel_fields
def get_nrens(interfaces): def get_nrens(interfaces):
...@@ -97,48 +97,51 @@ def get_aggregate_targets(aggregates): ...@@ -97,48 +97,51 @@ def get_aggregate_targets(aggregates):
return ingress, egress return ingress, egress
def get_dashboard_data(interfaces, datasource): def get_panel_definitions(panels, datasource, errors=False):
with ProcessPoolExecutor(max_workers=4) as executor: result = []
for panel in panels:
result.append(get_panel_fields(
{**panel, **next(gridPos)}, 'traffic', datasource))
result.append(get_panel_fields(
{**panel, **next(gridPos)}, 'IPv6', datasource))
if errors:
result.append(get_panel_fields(
{**panel, **next(gridPos)}, 'errors', datasource))
return result
def get_panels(data):
for panel in data:
yield executor.submit(
get_panel_definitions,
{
**panel,
**next(gridPos)
},
datasource
)
nren_data = executor.submit(get_nrens, interfaces).result()
for nren, data in nren_data.items():
services_dropdown = create_dropdown_panel( def build_data(nren, data, datasource):
'Services', **next(gridPos)) agg_ingress, agg_egress = get_aggregate_targets(data['AGGREGATES'])
services = get_panels(data['SERVICES']) services_dropdown = create_dropdown_panel('Services', **next(gridPos))
iface_dropdown = create_dropdown_panel( service_panels = get_panel_definitions(data['SERVICES'], datasource)
'Interfaces', **next(gridPos)) iface_dropdown = create_dropdown_panel('Interfaces', **next(gridPos))
physical = get_panels(data['PHYSICAL']) phys_panels = get_panel_definitions(data['PHYSICAL'], datasource, True)
agg_ingress, agg_egress = executor.submit(
get_aggregate_targets, data['AGGREGATES']).result() return {
'nren_name': nren,
yield { 'datasource': datasource,
'nren_name': nren, 'ingress_aggregate_targets': agg_ingress,
'datasource': datasource, 'egress_aggregate_targets': agg_egress,
'ingress_aggregate_targets': agg_ingress, 'dropdown_groups': [
'egress_aggregate_targets': agg_egress, {
'dropdown_groups': [ 'dropdown': services_dropdown,
{ 'panels': service_panels,
'dropdown': services_dropdown, },
'panels': flatten([p.result() for p in services]), {
}, 'dropdown': iface_dropdown,
{ 'panels': phys_panels,
'dropdown': iface_dropdown,
'panels': flatten([p.result() for p in physical]),
}
]
} }
]
}
def get_dashboard_data(interfaces, datasource):
nren_data = get_nrens(interfaces)
result = []
with ProcessPoolExecutor(max_workers=4) as executor:
for nren, data in nren_data.items():
result.append(executor.submit(build_data, nren, data, datasource))
return [r.result() for r in result]
def generate_nrens(interfaces, datasource): def generate_nrens(interfaces, datasource):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment