Skip to content
Snippets Groups Projects
Commit c56f39d3 authored by Bjarke Madsen's avatar Bjarke Madsen
Browse files

Add support for service-based aggregate dashboards & fix handling of region for EAP

parent a7a1d453
Branches
Tags
No related merge requests found
......@@ -5,6 +5,7 @@ entire provisioning lifecycle.
import itertools
import logging
import time
from enum import Enum
from concurrent.futures import Future
from concurrent.futures import ThreadPoolExecutor
from brian_dashboard_manager.config import DEFAULT_ORGANIZATIONS
......@@ -32,7 +33,7 @@ from brian_dashboard_manager.templating.helpers import \
get_nren_interface_data, get_dashboard_data, \
get_nren_dashboard_data, get_aggregate_interface_data, \
get_nren_interface_data_old, get_re_peer_dashboard_data, get_re_peer_interface_data, get_service_data, \
get_service_dashboard_data
get_service_dashboard_data, get_aggregate_service_data
from brian_dashboard_manager.templating.gws import generate_gws, generate_indirect
from brian_dashboard_manager.templating.eumetsat import generate_eumetsat_multicast
......@@ -43,6 +44,11 @@ from brian_dashboard_manager.templating.render import (
logger = logging.getLogger(__name__)
class REGIONS(Enum):
EAP = 'EAP'
DASHBOARDS = {
'NRENLEGACY': {
'tag': ['customerslegacy'],
......@@ -55,6 +61,7 @@ DASHBOARDS = {
'interfaces': []
},
'EAP': {
'region': REGIONS.EAP.value,
'tag': ['eap'],
'folder_name': 'EAP NREN Access',
'interfaces': []
......@@ -140,7 +147,8 @@ DASHBOARDS = {
}
SERVICE_DASHBOARDS = {
'MWS': {
# service-based dashboards, the keys should be valid service types/products
'GEANT MANAGED WAVELENGTH SERVICE': {
'tag': ['mws'],
'service_type': 'GEANT MANAGED WAVELENGTH SERVICE',
'folder_name': 'Managed Wavelength Service',
......@@ -150,11 +158,6 @@ SERVICE_DASHBOARDS = {
}
AGG_DASHBOARDS = {
'CLS_PEERS': {
'tag': 'cls_peers',
'dashboard_name': 'CLS Peers',
'interfaces': []
},
'IAS_PEERS': {
'tag': 'ias_peers',
'dashboard_name': 'IAS Peers',
......@@ -191,14 +194,37 @@ AGG_DASHBOARDS = {
'dashboard_name': 'ANA',
'interfaces': []
},
}
SERVICE_AGG_DASHBOARDS = {
'EAP': {
'region': REGIONS.EAP.value,
'tag': 'eap',
'dashboard_name': 'EAP Aggregate',
'interfaces': []
'services': []
}
}
def get_service_region(service, regions):
for customer in service['customers']:
if customer in regions:
yield regions[customer].upper()
def get_customers_for_region(services, regions, region=None):
if not region:
return []
customers = []
for service in services:
service_customers = service.get('customers', [])
for cust in service_customers:
cust_region = regions.get(cust)
if cust_region == region:
customers.append(cust)
return list(set(customers))
def provision_folder(thread_executor: ThreadPoolExecutor, token_request, folder_name, dash, services, regions,
ds_name, excluded_dashboards):
"""
......@@ -236,17 +262,6 @@ def provision_folder(thread_executor: ThreadPoolExecutor, token_request, folder_
)
)
def _get_customers_for_region(region=None):
customers = []
region_lookup = {region['nren']: region['region'] for region in regions}
for service in services:
service_customers = service.get('customers', [])
for cust in service_customers:
cust_region = region_lookup.get(cust)
if cust_region == region:
customers.append(cust)
return customers
# dashboard should include error panels
errors = dash.get('errors', False)
......@@ -261,7 +276,12 @@ def provision_folder(thread_executor: ThreadPoolExecutor, token_request, folder_
data = get_nren_interface_data_old(interfaces)
dash_data = get_nren_dashboard_data(data, ds_name, tag)
elif is_nren or is_eap:
region_customers = _get_customers_for_region("EAP" if is_eap else None)
dash_regions = dash.get('region')
region_customers = get_customers_for_region(services, regions, dash_regions)
if is_eap and not region_customers:
logger.info(f'No customers for region {dash_regions}, skipping EAP NREN Access dashboards')
delete_folder(token_request, uid=folder['uid'])
return
data = get_nren_interface_data(services, interfaces, excluded_dashboards, region_customers)
dash_data = get_nren_dashboard_data(data, ds_name, tag)
elif is_re_peer:
......@@ -304,9 +324,15 @@ def provision_aggregate(token_request, folder,
name = dash['dashboard_name']
tag = dash['tag']
interfaces = dash['interfaces']
group_field = dash.get('group_by', 'remote')
data = get_aggregate_interface_data(interfaces, name, group_field)
service_based = 'services' in dash
if not service_based:
interfaces = dash['interfaces']
group_field = dash.get('group_by', 'remote')
data = get_aggregate_interface_data(interfaces, group_field)
else:
services = dash['services']
data = get_aggregate_service_data(services)
dashboard = get_aggregate_dashboard_data(
f'Aggregate - {name}', data, ds_name, tag)
......@@ -442,7 +468,7 @@ def _provision_interfaces(thread_executor: ThreadPoolExecutor, config,
ifaces.append(iface)
# provision dashboards and their folders
for folder in DASHBOARDS.values():
for folder in itertools.chain(DASHBOARDS.values(), SERVICE_DASHBOARDS.values()):
folder_name = folder['folder_name']
# boolean True means entire folder excluded
......@@ -600,14 +626,19 @@ def _provision_aggregates(thread_executor: ThreadPoolExecutor, config, org_confi
folder_dashboards_by_name = list_folder_dashboards(token, agg_folder['uid'])
for dash in AGG_DASHBOARDS.values():
for dash in itertools.chain(AGG_DASHBOARDS.values(), SERVICE_AGG_DASHBOARDS.values()):
location = f'{org_config["name"]}/Aggregate {dash["dashboard_name"]}'
if not dash.get('interfaces') and not dash.get('services'):
logger.info(f'No interfaces or services for {location}, skipping')
continue
excluded_dashboards = excluded_folder_dashboards(org_config, folder_name)
if dash['dashboard_name'] in excluded_dashboards:
dash_name = {'title': f'Aggregate - {dash["dashboard_name"]}'}
delete_dashboard(token, dash_name, agg_folder['id'])
continue
logger.info(f'Provisioning {org_config["name"]}/Aggregate {dash["dashboard_name"]} dashboards')
logger.info(f'Provisioning {location} dashboards')
provisioned.append(
thread_executor.submit(provision_aggregate, token, agg_folder, dash, ds_name, folder_dashboards_by_name)
)
......@@ -615,60 +646,6 @@ def _provision_aggregates(thread_executor: ThreadPoolExecutor, config, org_confi
yield from provisioned
def _provision_service_dashboards(thread_executor: ThreadPoolExecutor, config, org_config, ds_name, token):
"""
This function is used to provision service-specific dashboards,
overwriting existing ones.
:param thread_executor: a ThreadPoolExecutor for concurrent requests
:param config: the application config
:param org_config: the organisation config
:param ds_name: the name of the datasource to query in the dashboards
:param token: a token_request object
:return: generator of UIDs of dashboards that were created
"""
services = fetch_services(config['reporting_provider'])
regions = get_nren_regions(config['inventory_provider'])
excluded_folders = org_config.get('excluded_folders', {})
logger.info('Provisioning service-specific dashboards')
# loop over service dashboards and get service types we care about
dash_service_types = {SERVICE_DASHBOARDS[dash]['service_type']: dash for dash in SERVICE_DASHBOARDS}
# loop over services and append to dashboards
for service in services:
if service['service_type'] in dash_service_types:
dash = dash_service_types[service['service_type']]
svcs = SERVICE_DASHBOARDS[dash]['services']
svcs.append(service)
# provision dashboards and their folders
provisioned = []
for folder in SERVICE_DASHBOARDS.values():
folder_name = folder['folder_name']
# boolean True means entire folder excluded
# if list, it is specific dashboard names not to provision
# so is handled at provision time.
if is_excluded_folder(excluded_folders, folder_name):
delete_folder(token, title=folder_name)
continue
logger.info(
f'Provisioning {org_config["name"]}/{folder_name} dashboards')
res = thread_executor.submit(
provision_folder, thread_executor, token,
folder_name, folder, services, regions, ds_name,
excluded_folder_dashboards(org_config, folder_name))
provisioned.append(res)
for result in provisioned:
folder = result.result()
if folder is None:
continue
yield from folder
def _provision_static_dashboards(thread_executor: ThreadPoolExecutor, config, org_config, ds_name, token):
"""
This function is used to provision static dashboards from json files,
......@@ -776,6 +753,49 @@ def _provision_orgs(config):
return all_orgs
def _add_service_data(org_config, services, regions):
"""
This function is used to add service data to the aggregate dashboards.
Services for customers that are listed in the excluded_nrens list are excluded.
"""
# clean up the services in the datastructures from previous runs
for dash in SERVICE_AGG_DASHBOARDS.values():
dash['services'] = []
for dash in SERVICE_DASHBOARDS.values():
dash['services'] = []
excluded_nrens = [n.lower() for n in org_config['excluded_nrens']]
for service in services:
customers = service.get('customers', [])
if any(c.lower() in excluded_nrens for c in customers):
continue
service_regions = list(get_service_region(service, regions))
for service_agg_dash in SERVICE_AGG_DASHBOARDS.values():
# this block handles aggregate dashboards which are region-based
agg_dash_region = service_agg_dash.get('region')
if not agg_dash_region:
continue
if agg_dash_region in service_regions:
service_agg_dash['services'].append(service)
for service_agg_dash in SERVICE_AGG_DASHBOARDS.values():
# this block handles aggregate dashboards which are not region-based
if service_agg_dash.get('region'):
continue
# TODO: currently we only have region-based aggregate dashboards, TBD if we need to handle non-region-based
service_type = service['service_type']
if service_type in SERVICE_DASHBOARDS:
SERVICE_DASHBOARDS[service_type]['services'].append(service)
def _provision_org(config, org, org_config, interfaces, services, regions):
try:
request = AdminRequest(**config)
......@@ -807,6 +827,10 @@ def _provision_org(config, org, org_config, interfaces, services, regions):
args = (thread_executor, config, org_config, ds_name, token_request)
# initialise the aggregate dashboards with service data, to be used in the provisioning process
# it doesn't create the dashboards, just prepares the data
_add_service_data(org_config, services, regions)
# call to list is needed to queue up the futures
managed_dashboards = list(itertools.chain(
_provision_interfaces(*args, interfaces, services, regions),
......@@ -814,7 +838,6 @@ def _provision_org(config, org, org_config, interfaces, services, regions):
_provision_gws_direct(*args),
_provision_eumetsat_multicast(*args),
_provision_aggregates(*args),
_provision_service_dashboards(*args),
_provision_static_dashboards(*args),
_get_ignored_dashboards(*args)
))
......
......@@ -221,6 +221,45 @@ def get_re_peer_interface_data(interfaces):
return result
def get_service_aggregate_targets(services):
for service in services:
_interfaces = service.get('endpoints')
name = service.get('name')
sid = service.get('sid')
scid = service.get('scid')
service_type = service.get('service_type', '')
measurement = 'scid_rates'
lag_service = 'GA-' in sid and service_type == 'ETHERNET'
if len(_interfaces) == 0 or not lag_service:
continue
if 'interface' in _interfaces[0]:
if_name = _interfaces[0].get('interface')
router = _interfaces[0].get('hostname')
else:
if_name = _interfaces[0].get('port')
router = _interfaces[0].get('equipment')
router = router.replace('.geant.net', '')
target_alias = f'{router} - {if_name} - {name} ({sid})'
if len(_interfaces) > 1:
logger.info(
f'{sid} {name} aggregate service has > 1 interface')
continue
yield service, {
'measurement': measurement,
'alias': target_alias,
'scid': scid,
# used when checking if an interface is already covered by a service in an aggregate panel for NREN access
'interface_key': f'{router}:::{if_name}'
}
def get_nren_interface_data(services, interfaces, excluded_dashboards, region_customers):
"""
Helper for grouping interface data to be used for generating
......@@ -263,6 +302,12 @@ def get_nren_interface_data(services, interfaces, excluded_dashboards, region_cu
'PHYSICAL': []
})
# it's a tuple of (service, target) for each service, pick out the targets
aggregate_targets = (s[1] for s in get_service_aggregate_targets(services))
for target in aggregate_targets:
aggregate_interfaces[target['interface_key']] = True
dashboard['AGGREGATES'].append(target)
for service in services:
_interfaces = service.get('endpoints')
name = service.get('name')
......@@ -290,17 +335,8 @@ def get_nren_interface_data(services, interfaces, excluded_dashboards, region_cu
if lag_service:
if len(_interfaces) > 1:
logger.info(
f'{sid} {name} aggregate service has > 1 interface')
continue
aggregate_interfaces[f'{router}:::{if_name}'] = True
dashboard['AGGREGATES'].append({
'measurement': measurement,
'alias': title.replace('- {} ', ''), # remove the format part for aggregate panels
'scid': scid
})
if 'MDVPN' in service['service_type']:
# MDVPN type services don't have data in BRIAN
continue
......@@ -450,6 +486,54 @@ def get_service_data(service_type, services, interfaces, excluded_dashboards):
return result
def get_aggregate_service_data(services):
"""
Helper for grouping service data for generating aggregate dashboards.
Aggregate dashboards have panels with multiple targets (timeseries) that are grouped together.
Groups the services by the customer and returns a dictionary of aggregate dashboards and their service data.
One of the panels is a special panel that has all the targets in a single panel,
as an aggregate of all data for that dashboard.
:param services: list of services
:return: dictionary of targets for the aggregate panels, grouped by customer
"""
targets = []
def get_reduce_func_for_field(field):
def reduce_func(remote_map, service):
value_to_group_by = service[field]
group = remote_map.get(value_to_group_by, [])
group.append(service) # contains all services for this group
all_agg = remote_map.get('EVERYSINGLETARGET', []) # contains all services regardless of group
all_agg.append(service)
remote_map[value_to_group_by] = group
remote_map['EVERYSINGLETARGET'] = all_agg
return remote_map
return reduce_func
aggregate_targets = get_service_aggregate_targets(services)
for service, target in aggregate_targets:
customers = service.get('customers')
if not customers:
continue
customer = customers[0].upper() # it's a list, but only one customer per service for now
targets.append({
'customer': customer, # used to group by customer
**target # contains the target data for the panel
})
targets = sorted(targets, key=lambda x: x['customer'])
result = reduce(get_reduce_func_for_field('customer'), targets, {})
for key in result:
result[key] = sorted(result[key], key=lambda x: x['customer'])
return result
def get_interface_data(interfaces):
"""
Helper for grouping interface data to be used for generating
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment