Skip to content
Snippets Groups Projects
Commit 3960b660 authored by Bjarke Madsen's avatar Bjarke Madsen
Browse files

use multiprocessing for generating dashboards

parent 8e2323e6
No related branches found
No related tags found
No related merge requests found
import logging
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from brian_dashboard_manager.config import DEFAULT_ORGANIZATIONS
from brian_dashboard_manager.grafana.utils.request import \
AdminRequest, \
......@@ -10,8 +12,7 @@ from brian_dashboard_manager.grafana.dashboard import \
get_dashboard_definitions, create_dashboard, find_dashboard
from brian_dashboard_manager.grafana.datasource import \
check_provisioned, create_datasource
from brian_dashboard_manager.grafana.folder import \
get_folders, create_folder
from brian_dashboard_manager.grafana.folder import find_folder
from brian_dashboard_manager.inventory_provider.interfaces import \
get_interfaces
from brian_dashboard_manager.templating.nren_access import generate_nrens
......@@ -28,6 +29,38 @@ from brian_dashboard_manager.templating.render import render_dashboard
logger = logging.getLogger(__name__)
def generate_all_nrens(token_request, nrens, folder_id, datasource_name):
with ThreadPoolExecutor(max_workers=12) as executor:
for dashboard in generate_nrens(nrens, datasource_name):
executor.submit(create_dashboard, token_request,
dashboard, folder_id)
def provision_folder(token_request, folder_name,
dash, excluded_interfaces, datasource_name):
folder = find_folder(token_request, folder_name)
predicate = dash['predicate']
tag = dash['tag']
# dashboard will include error panel
errors = dash.get('errors')
# custom parsing function for description to dashboard name
parse_func = dash.get('parse_func')
relevant_interfaces = filter(predicate, excluded_interfaces)
data = get_interface_data(relevant_interfaces, parse_func)
dash_data = get_dashboard_data(data, datasource_name, tag, errors)
with ThreadPoolExecutor(max_workers=12) as executor:
for dashboard in dash_data:
rendered = render_dashboard(dashboard)
executor.submit(create_dashboard, token_request,
rendered, folder['id'])
def provision(config):
request = AdminRequest(**config)
......@@ -43,147 +76,156 @@ def provision(config):
all_orgs.append(org_data)
interfaces = get_interfaces(config['inventory_provider'])
for org in all_orgs:
org_id = org['id']
delete_expired_api_tokens(request, org_id)
token = create_api_token(request, org_id)
token_request = TokenRequest(token=token['key'], **config)
tokens = []
folders = get_folders(token_request)
start = time.time()
def find_folder(title):
with ProcessPoolExecutor(max_workers=4) as org_executor, \
ThreadPoolExecutor(max_workers=12) as thread_executor:
for org in all_orgs:
org_id = org['id']
delete_expired_api_tokens(request, org_id)
token = create_api_token(request, org_id)
token_request = TokenRequest(token=token['key'], **config)
tokens.append((org_id, token['id']))
logger.info(
f'--- Provisioning org {org["name"]} (ID #{org_id}) ---')
try:
folder = next(
f for f in folders if f['title'].lower() == title.lower())
org_config = next(
o for o in orgs_to_provision if o['name'] == org['name'])
except StopIteration:
folder = None
if not folder:
logger.info(f'Created folder: {title}')
folder = create_folder(token_request, title)
folders.append(folder)
return folder
logger.info(
f'--- Provisioning org {org["name"]} (ID #{org_id}) ---')
try:
org_config = next(
o for o in orgs_to_provision if o['name'] == org['name'])
except StopIteration:
org_config = None
if not org_config:
logger.error(
f'Org {org["name"]} does not have valid configuration.')
org['info'] = 'Org exists in grafana but is not configured'
continue
# Only provision influxdb datasource for now
datasource = config.get('datasources').get('influxdb')
# Provision missing data sources
if not check_provisioned(token_request, datasource):
ds = create_datasource(token_request,
datasource,
config.get('datasources'))
if ds:
logger.info(
f'Provisioned datasource: {datasource["name"]}')
excluded_nrens = org_config.get('excluded_nrens', [])
excluded_nrens = list(map(lambda f: f.lower(), excluded_nrens))
def excluded(interface):
desc = interface.get('description', '').lower()
return not any(nren.lower() in desc for nren in excluded_nrens)
excluded_interfaces = list(filter(excluded, interfaces))
dashboards = {
'CLS': {'predicate': is_cls, 'tag': 'CLS'},
'RE PEER': {'predicate': is_re_peer, 'tag': 'RE_PEER'},
'RE CUST': {'predicate': is_re_customer, 'tag': 'RE_CUST'},
'GEANTOPEN': {'predicate': is_geantopen, 'tag': 'GEANTOPEN'},
'GCS': {'predicate': is_gcs, 'tag': 'AUTOMATED_L2_CIRCUITS'},
'L2 CIRCUIT': {'predicate': is_l2circuit, 'tag': 'L2_CIRCUITS'},
'LHCONE PEER': {'predicate': is_lhcone_peer, 'tag': 'LHCONE_PEER'},
'LHCONE CUST': {
'predicate': is_lhcone_customer,
'tag': 'LHCONE_CUST'
},
'MDVPN Customers': {'predicate': is_mdvpn, 'tag': 'MDVPN'},
'Infrastructure Backbone': {
'predicate': is_lag_backbone,
'tag': 'BACKBONE',
'errors': True,
'parse_func': parse_backbone_name
},
'IAS PRIVATE': {'predicate': is_ias_private, 'tag': 'IAS_PRIVATE'},
'IAS PUBLIC': {'predicate': is_ias_public, 'tag': 'IAS_PUBLIC'},
'IAS CUSTOMER': {
'predicate': is_ias_customer,
'tag': 'IAS_CUSTOMER'
},
'IAS UPSTREAM': {
'predicate': is_ias_upstream,
'tag': 'IAS_UPSTREAM'
},
'GWS PHY Upstream': {
'predicate': is_phy_upstream,
'tag': 'GWS_UPSTREAM',
'errors': True,
'parse_func': parse_phy_upstream_name
org_config = None
if not org_config:
logger.error(
f'Org {org["name"]} does not have valid configuration.')
org['info'] = 'Org exists in grafana but is not configured'
continue
# Only provision influxdb datasource for now
datasource = config.get('datasources').get('influxdb')
# Provision missing data sources
if not check_provisioned(token_request, datasource):
ds = create_datasource(token_request,
datasource,
config.get('datasources'))
if ds:
logger.info(
f'Provisioned datasource: {datasource["name"]}')
excluded_nrens = org_config.get('excluded_nrens', [])
excluded_nrens = list(map(lambda f: f.lower(), excluded_nrens))
def excluded(interface):
desc = interface.get('description', '').lower()
return not any(nren.lower() in desc for nren in excluded_nrens)
excluded_interfaces = list(filter(excluded, interfaces))
dashboards = {
'CLS': {
'predicate': is_cls,
'tag': 'CLS'
},
'RE PEER': {
'predicate': is_re_peer,
'tag': 'RE_PEER'
},
'RE CUST': {
'predicate': is_re_customer,
'tag': 'RE_CUST'
},
'GEANTOPEN': {
'predicate': is_geantopen,
'tag': 'GEANTOPEN'
},
'GCS': {
'predicate': is_gcs,
'tag': 'AUTOMATED_L2_CIRCUITS'
},
'L2 CIRCUIT': {
'predicate': is_l2circuit,
'tag': 'L2_CIRCUITS'
},
'LHCONE PEER': {
'predicate': is_lhcone_peer,
'tag': 'LHCONE_PEER'
},
'LHCONE CUST': {
'predicate': is_lhcone_customer,
'tag': 'LHCONE_CUST'
},
'MDVPN Customers': {
'predicate': is_mdvpn,
'tag': 'MDVPN'
},
'Infrastructure Backbone': {
'predicate': is_lag_backbone,
'tag': 'BACKBONE',
'errors': True,
'parse_func': parse_backbone_name
},
'IAS PRIVATE': {
'predicate': is_ias_private,
'tag': 'IAS_PRIVATE'
},
'IAS PUBLIC': {
'predicate': is_ias_public,
'tag': 'IAS_PUBLIC'
},
'IAS CUSTOMER': {
'predicate': is_ias_customer,
'tag': 'IAS_CUSTOMER'
},
'IAS UPSTREAM': {
'predicate': is_ias_upstream,
'tag': 'IAS_UPSTREAM'
},
'GWS PHY Upstream': {
'predicate': is_phy_upstream,
'tag': 'GWS_UPSTREAM',
'errors': True,
'parse_func': parse_phy_upstream_name
}
}
}
# Provision dashboards, overwriting existing ones.
datasource_name = datasource.get('name', 'PollerInfluxDB')
for folder_name, dash in dashboards.items():
folder = find_folder(folder_name)
predicate = dash['predicate']
tag = dash['tag']
# dashboard will include error panel
errors = dash.get('errors')
# custom parsing function for description to dashboard name
parse_func = dash.get('parse_func')
logger.info(f'Provisioning {folder_name} dashboards')
relevant_interfaces = filter(predicate, excluded_interfaces)
data = get_interface_data(relevant_interfaces, parse_func)
dash_data = get_dashboard_data(data, datasource_name, tag, errors)
for dashboard in dash_data:
rendered = render_dashboard(dashboard)
create_dashboard(token_request, rendered, folder['id'])
# NREN Access dashboards
# uses a different template than the above.
logger.info('Provisioning NREN Access dashboards')
folder = find_folder('NREN Access')
nrens = filter(is_nren, excluded_interfaces)
for dashboard in generate_nrens(nrens, datasource_name):
create_dashboard(token_request, dashboard, folder['id'])
# Non-generated dashboards
excluded_dashboards = org_config.get('excluded_dashboards', [])
logger.info('Provisioning static dashboards')
for dashboard in get_dashboard_definitions():
if dashboard['title'] not in excluded_dashboards:
if dashboard['title'].lower() == 'home':
dashboard['uid'] = 'home'
create_dashboard(token_request, dashboard)
# Home dashboard is always called "Home"
# Make sure it's set for the organization
home_dashboard = find_dashboard(token_request, 'Home')
if home_dashboard:
set_home_dashboard(token_request, home_dashboard['id'])
delete_api_token(request, org_id, token['id'])
# Provision dashboards, overwriting existing ones.
datasource_name = datasource.get('name', 'PollerInfluxDB')
for folder_name, dash in dashboards.items():
logger.info(
f'Provisioning {org["name"]}/{folder_name} dashboards')
org_executor.submit(provision_folder, token_request,
folder_name, dash,
excluded_interfaces, datasource_name)
# NREN Access dashboards
# uses a different template than the above.
logger.info('Provisioning NREN Access dashboards')
folder = find_folder(token_request, 'NREN Access')
nrens = filter(is_nren, excluded_interfaces)
org_executor.submit(generate_all_nrens, token_request,
nrens, folder['id'], datasource_name)
# Non-generated dashboards
excluded_dashboards = org_config.get('excluded_dashboards', [])
logger.info('Provisioning static dashboards')
for dashboard in get_dashboard_definitions():
if dashboard['title'] not in excluded_dashboards:
if dashboard['title'].lower() == 'home':
dashboard['uid'] = 'home'
thread_executor.submit(
create_dashboard, token_request, dashboard)
# Home dashboard is always called "Home"
# Make sure it's set for the organization
home_dashboard = find_dashboard(token_request, 'Home')
if home_dashboard:
set_home_dashboard(token_request, home_dashboard['id'])
logger.info(f'Time to complete: {time.time() - start}')
for org_id, token in tokens:
delete_api_token(request, org_id, token)
return all_orgs
import re
import logging
from multiprocessing.pool import Pool
from itertools import product
from string import ascii_uppercase
from brian_dashboard_manager.templating.render import create_panel
......@@ -6,6 +8,8 @@ from brian_dashboard_manager.templating.render import create_panel
PANEL_HEIGHT = 12
PANEL_WIDTH = 24
logger = logging.getLogger(__file__)
def get_description(interface):
return interface.get('description', '').strip()
......@@ -217,26 +221,36 @@ def get_panel_fields(panel, panel_type, datasource):
})
def get_panel_definitions(panel, datasource, errors=False):
result = []
result.append(get_panel_fields(
panel, 'traffic', datasource))
result.append(get_panel_fields(
panel, 'IPv6', datasource))
if errors:
result.append(get_panel_fields(
panel, 'errors', datasource))
return result
def flatten(t): return [item for sublist in t for item in sublist]
def get_dashboard_data(data, datasource, tag, errors=False):
id_gen = num_generator()
gridPos = gridPos_generator(id_gen)
def get_panel_definitions(panels, datasource):
result = []
for panel in panels:
result.append(get_panel_fields(
{**panel, **next(gridPos)}, 'traffic', datasource))
result.append(get_panel_fields(
{**panel, **next(gridPos)}, 'IPv6', datasource))
if errors:
result.append(get_panel_fields(
{**panel, **next(gridPos)}, 'errors', datasource))
return result
pool = Pool(processes=2)
for peer, panels in data.items():
otherpanels = [({**panel, **next(gridPos)}, datasource, errors)
for panel in panels]
all_panels = flatten(pool.starmap(get_panel_definitions, otherpanels))
yield {
'title': peer,
'datasource': datasource,
'panels': get_panel_definitions(panels, datasource),
'panels': all_panels,
'tag': tag
}
import json
import os
import jinja2
from concurrent.futures import ProcessPoolExecutor
from brian_dashboard_manager.templating.render import create_dropdown_panel, \
create_panel_target
from brian_dashboard_manager.templating.helpers import \
is_aggregate_interface, is_logical_interface, is_physical_interface, \
num_generator, gridPos_generator, letter_generator, get_panel_fields
num_generator, gridPos_generator, letter_generator, \
get_panel_definitions, flatten
def get_nrens(interfaces):
......@@ -95,46 +97,48 @@ def get_aggregate_targets(aggregates):
return ingress, egress
def get_panel_definitions(panels, datasource, errors=False):
result = []
for panel in panels:
result.append(get_panel_fields(
{**panel, **next(gridPos)}, 'traffic', datasource))
result.append(get_panel_fields(
{**panel, **next(gridPos)}, 'IPv6', datasource))
if errors:
result.append(get_panel_fields(
{**panel, **next(gridPos)}, 'errors', datasource))
return result
def get_dashboard_data(interfaces, datasource):
nren_data = get_nrens(interfaces)
for nren, data in nren_data.items():
agg_ingress, agg_egress = get_aggregate_targets(data['AGGREGATES'])
services_dropdown = create_dropdown_panel('Services', **next(gridPos))
service_panels = get_panel_definitions(data['SERVICES'], datasource)
iface_dropdown = create_dropdown_panel('Interfaces', **next(gridPos))
phys_panels = get_panel_definitions(data['PHYSICAL'], datasource, True)
yield {
'nren_name': nren,
'datasource': datasource,
'ingress_aggregate_targets': agg_ingress,
'egress_aggregate_targets': agg_egress,
'dropdown_groups': [
{
'dropdown': services_dropdown,
'panels': service_panels,
},
{
'dropdown': iface_dropdown,
'panels': phys_panels,
}
]
}
with ProcessPoolExecutor(max_workers=4) as executor:
def get_panels(data):
for panel in data:
yield executor.submit(
get_panel_definitions,
{
**panel,
**next(gridPos)
},
datasource
)
nren_data = executor.submit(get_nrens, interfaces).result()
for nren, data in nren_data.items():
services_dropdown = create_dropdown_panel(
'Services', **next(gridPos))
services = get_panels(data['SERVICES'])
iface_dropdown = create_dropdown_panel(
'Interfaces', **next(gridPos))
physical = get_panels(data['PHYSICAL'])
agg_ingress, agg_egress = executor.submit(
get_aggregate_targets, data['AGGREGATES']).result()
yield {
'nren_name': nren,
'datasource': datasource,
'ingress_aggregate_targets': agg_ingress,
'egress_aggregate_targets': agg_egress,
'dropdown_groups': [
{
'dropdown': services_dropdown,
'panels': flatten([p.result() for p in services]),
},
{
'dropdown': iface_dropdown,
'panels': flatten([p.result() for p in physical]),
}
]
}
def generate_nrens(interfaces, datasource):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment