From 3960b6608b8da2a8e2732dfa48af30d8c150b36f Mon Sep 17 00:00:00 2001
From: Bjarke Madsen <bjarke.madsen@geant.org>
Date: Tue, 2 Mar 2021 16:13:42 +0100
Subject: [PATCH] use multiprocessing for generating dashboards

---
 brian_dashboard_manager/grafana/provision.py  | 318 ++++++++++--------
 brian_dashboard_manager/templating/helpers.py |  40 ++-
 .../templating/nren_access.py                 |  84 ++---
 3 files changed, 251 insertions(+), 191 deletions(-)

diff --git a/brian_dashboard_manager/grafana/provision.py b/brian_dashboard_manager/grafana/provision.py
index bafbbd1..2a69308 100644
--- a/brian_dashboard_manager/grafana/provision.py
+++ b/brian_dashboard_manager/grafana/provision.py
@@ -1,4 +1,6 @@
 import logging
+import time
+from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
 from brian_dashboard_manager.config import DEFAULT_ORGANIZATIONS
 from brian_dashboard_manager.grafana.utils.request import \
     AdminRequest, \
@@ -10,8 +12,7 @@ from brian_dashboard_manager.grafana.dashboard import \
     get_dashboard_definitions, create_dashboard, find_dashboard
 from brian_dashboard_manager.grafana.datasource import \
     check_provisioned, create_datasource
-from brian_dashboard_manager.grafana.folder import \
-    get_folders, create_folder
+from brian_dashboard_manager.grafana.folder import find_folder
 from brian_dashboard_manager.inventory_provider.interfaces import \
     get_interfaces
 from brian_dashboard_manager.templating.nren_access import generate_nrens
@@ -28,6 +29,38 @@ from brian_dashboard_manager.templating.render import render_dashboard
 logger = logging.getLogger(__name__)
 
 
+def generate_all_nrens(token_request, nrens, folder_id, datasource_name):
+    with ThreadPoolExecutor(max_workers=12) as executor:
+        for dashboard in generate_nrens(nrens, datasource_name):
+            executor.submit(create_dashboard, token_request,
+                            dashboard, folder_id)
+
+
+def provision_folder(token_request, folder_name,
+                     dash, excluded_interfaces, datasource_name):
+
+    folder = find_folder(token_request, folder_name)
+
+    predicate = dash['predicate']
+    tag = dash['tag']
+
+    # dashboard will include error panel
+    errors = dash.get('errors')
+
+    # custom parsing function for description to dashboard name
+    parse_func = dash.get('parse_func')
+
+    relevant_interfaces = filter(predicate, excluded_interfaces)
+    data = get_interface_data(relevant_interfaces, parse_func)
+    dash_data = get_dashboard_data(data, datasource_name, tag, errors)
+
+    with ThreadPoolExecutor(max_workers=12) as executor:
+        for dashboard in dash_data:
+            rendered = render_dashboard(dashboard)
+            executor.submit(create_dashboard, token_request,
+                            rendered, folder['id'])
+
+
 def provision(config):
 
     request = AdminRequest(**config)
@@ -43,147 +76,156 @@ def provision(config):
         all_orgs.append(org_data)
 
     interfaces = get_interfaces(config['inventory_provider'])
-    for org in all_orgs:
-        org_id = org['id']
-        delete_expired_api_tokens(request, org_id)
-        token = create_api_token(request, org_id)
-        token_request = TokenRequest(token=token['key'], **config)
+    tokens = []
 
-        folders = get_folders(token_request)
+    start = time.time()
 
-        def find_folder(title):
+    with ProcessPoolExecutor(max_workers=4) as org_executor, \
+            ThreadPoolExecutor(max_workers=12) as thread_executor:
+        for org in all_orgs:
+            org_id = org['id']
+            delete_expired_api_tokens(request, org_id)
+            token = create_api_token(request, org_id)
+            token_request = TokenRequest(token=token['key'], **config)
+            tokens.append((org_id, token['id']))
+
+            logger.info(
+                f'--- Provisioning org {org["name"]} (ID #{org_id}) ---')
 
             try:
-                folder = next(
-                    f for f in folders if f['title'].lower() == title.lower())
+                org_config = next(
+                    o for o in orgs_to_provision if o['name'] == org['name'])
             except StopIteration:
-                folder = None
-
-            if not folder:
-                logger.info(f'Created folder: {title}')
-                folder = create_folder(token_request, title)
-                folders.append(folder)
-            return folder
-
-        logger.info(
-            f'--- Provisioning org {org["name"]} (ID #{org_id}) ---')
-
-        try:
-            org_config = next(
-                o for o in orgs_to_provision if o['name'] == org['name'])
-        except StopIteration:
-            org_config = None
-
-        if not org_config:
-            logger.error(
-                f'Org {org["name"]} does not have valid configuration.')
-            org['info'] = 'Org exists in grafana but is not configured'
-            continue
-
-        # Only provision influxdb datasource for now
-        datasource = config.get('datasources').get('influxdb')
-
-        # Provision missing data sources
-        if not check_provisioned(token_request, datasource):
-            ds = create_datasource(token_request,
-                                   datasource,
-                                   config.get('datasources'))
-            if ds:
-                logger.info(
-                    f'Provisioned datasource: {datasource["name"]}')
-
-        excluded_nrens = org_config.get('excluded_nrens', [])
-        excluded_nrens = list(map(lambda f: f.lower(), excluded_nrens))
-
-        def excluded(interface):
-            desc = interface.get('description', '').lower()
-            return not any(nren.lower() in desc for nren in excluded_nrens)
-
-        excluded_interfaces = list(filter(excluded, interfaces))
-
-        dashboards = {
-            'CLS': {'predicate': is_cls, 'tag': 'CLS'},
-            'RE PEER': {'predicate': is_re_peer, 'tag': 'RE_PEER'},
-            'RE CUST': {'predicate': is_re_customer, 'tag': 'RE_CUST'},
-            'GEANTOPEN': {'predicate': is_geantopen, 'tag': 'GEANTOPEN'},
-            'GCS': {'predicate': is_gcs, 'tag': 'AUTOMATED_L2_CIRCUITS'},
-            'L2 CIRCUIT': {'predicate': is_l2circuit, 'tag': 'L2_CIRCUITS'},
-            'LHCONE PEER': {'predicate': is_lhcone_peer, 'tag': 'LHCONE_PEER'},
-            'LHCONE CUST': {
-                'predicate': is_lhcone_customer,
-                'tag': 'LHCONE_CUST'
-            },
-            'MDVPN Customers': {'predicate': is_mdvpn, 'tag': 'MDVPN'},
-            'Infrastructure Backbone': {
-                'predicate': is_lag_backbone,
-                'tag': 'BACKBONE',
-                'errors': True,
-                'parse_func': parse_backbone_name
-            },
-            'IAS PRIVATE': {'predicate': is_ias_private, 'tag': 'IAS_PRIVATE'},
-            'IAS PUBLIC': {'predicate': is_ias_public, 'tag': 'IAS_PUBLIC'},
-            'IAS CUSTOMER': {
-                'predicate': is_ias_customer,
-                'tag': 'IAS_CUSTOMER'
-            },
-            'IAS UPSTREAM': {
-                'predicate': is_ias_upstream,
-                'tag': 'IAS_UPSTREAM'
-            },
-            'GWS PHY Upstream': {
-                'predicate': is_phy_upstream,
-                'tag': 'GWS_UPSTREAM',
-                'errors': True,
-                'parse_func': parse_phy_upstream_name
+                org_config = None
+
+            if not org_config:
+                logger.error(
+                    f'Org {org["name"]} does not have valid configuration.')
+                org['info'] = 'Org exists in grafana but is not configured'
+                continue
+
+            # Only provision influxdb datasource for now
+            datasource = config.get('datasources').get('influxdb')
+
+            # Provision missing data sources
+            if not check_provisioned(token_request, datasource):
+                ds = create_datasource(token_request,
+                                       datasource,
+                                       config.get('datasources'))
+                if ds:
+                    logger.info(
+                        f'Provisioned datasource: {datasource["name"]}')
+
+            excluded_nrens = org_config.get('excluded_nrens', [])
+            excluded_nrens = list(map(lambda f: f.lower(), excluded_nrens))
+
+            def excluded(interface):
+                desc = interface.get('description', '').lower()
+                return not any(nren.lower() in desc for nren in excluded_nrens)
+
+            excluded_interfaces = list(filter(excluded, interfaces))
+
+            dashboards = {
+                'CLS': {
+                    'predicate': is_cls,
+                    'tag': 'CLS'
+                },
+                'RE PEER': {
+                    'predicate': is_re_peer,
+                    'tag': 'RE_PEER'
+                },
+                'RE CUST': {
+                    'predicate': is_re_customer,
+                    'tag': 'RE_CUST'
+                },
+                'GEANTOPEN': {
+                    'predicate': is_geantopen,
+                    'tag': 'GEANTOPEN'
+                },
+                'GCS': {
+                    'predicate': is_gcs,
+                    'tag': 'AUTOMATED_L2_CIRCUITS'
+                },
+                'L2 CIRCUIT': {
+                    'predicate': is_l2circuit,
+                    'tag': 'L2_CIRCUITS'
+                },
+                'LHCONE PEER': {
+                    'predicate': is_lhcone_peer,
+                    'tag': 'LHCONE_PEER'
+                },
+                'LHCONE CUST': {
+                    'predicate': is_lhcone_customer,
+                    'tag': 'LHCONE_CUST'
+                },
+                'MDVPN Customers': {
+                    'predicate': is_mdvpn,
+                    'tag': 'MDVPN'
+                },
+                'Infrastructure Backbone': {
+                    'predicate': is_lag_backbone,
+                    'tag': 'BACKBONE',
+                    'errors': True,
+                    'parse_func': parse_backbone_name
+                },
+                'IAS PRIVATE': {
+                    'predicate': is_ias_private,
+                    'tag': 'IAS_PRIVATE'
+                },
+                'IAS PUBLIC': {
+                    'predicate': is_ias_public,
+                    'tag': 'IAS_PUBLIC'
+                },
+                'IAS CUSTOMER': {
+                    'predicate': is_ias_customer,
+                    'tag': 'IAS_CUSTOMER'
+                },
+                'IAS UPSTREAM': {
+                    'predicate': is_ias_upstream,
+                    'tag': 'IAS_UPSTREAM'
+                },
+                'GWS PHY Upstream': {
+                    'predicate': is_phy_upstream,
+                    'tag': 'GWS_UPSTREAM',
+                    'errors': True,
+                    'parse_func': parse_phy_upstream_name
+                }
             }
-
-        }
-        # Provision dashboards, overwriting existing ones.
-
-        datasource_name = datasource.get('name', 'PollerInfluxDB')
-        for folder_name, dash in dashboards.items():
-            folder = find_folder(folder_name)
-            predicate = dash['predicate']
-            tag = dash['tag']
-
-            # dashboard will include error panel
-            errors = dash.get('errors')
-
-            # custom parsing function for description to dashboard name
-            parse_func = dash.get('parse_func')
-
-            logger.info(f'Provisioning {folder_name} dashboards')
-
-            relevant_interfaces = filter(predicate, excluded_interfaces)
-            data = get_interface_data(relevant_interfaces, parse_func)
-            dash_data = get_dashboard_data(data, datasource_name, tag, errors)
-            for dashboard in dash_data:
-                rendered = render_dashboard(dashboard)
-                create_dashboard(token_request, rendered, folder['id'])
-
-        # NREN Access dashboards
-        # uses a different template than the above.
-        logger.info('Provisioning NREN Access dashboards')
-        folder = find_folder('NREN Access')
-        nrens = filter(is_nren, excluded_interfaces)
-        for dashboard in generate_nrens(nrens, datasource_name):
-            create_dashboard(token_request, dashboard, folder['id'])
-
-        # Non-generated dashboards
-        excluded_dashboards = org_config.get('excluded_dashboards', [])
-        logger.info('Provisioning static dashboards')
-        for dashboard in get_dashboard_definitions():
-            if dashboard['title'] not in excluded_dashboards:
-                if dashboard['title'].lower() == 'home':
-                    dashboard['uid'] = 'home'
-                create_dashboard(token_request, dashboard)
-
-        # Home dashboard is always called "Home"
-        # Make sure it's set for the organization
-        home_dashboard = find_dashboard(token_request, 'Home')
-        if home_dashboard:
-            set_home_dashboard(token_request, home_dashboard['id'])
-
-        delete_api_token(request, org_id, token['id'])
+            # Provision dashboards, overwriting existing ones.
+            datasource_name = datasource.get('name', 'PollerInfluxDB')
+            for folder_name, dash in dashboards.items():
+                logger.info(
+                    f'Provisioning {org["name"]}/{folder_name} dashboards')
+                org_executor.submit(provision_folder, token_request,
+                                    folder_name, dash,
+                                    excluded_interfaces, datasource_name)
+
+            # NREN Access dashboards
+            # uses a different template than the above.
+            logger.info('Provisioning NREN Access dashboards')
+            folder = find_folder(token_request, 'NREN Access')
+            nrens = filter(is_nren, excluded_interfaces)
+            org_executor.submit(generate_all_nrens, token_request,
+                                nrens, folder['id'], datasource_name)
+
+            # Non-generated dashboards
+            excluded_dashboards = org_config.get('excluded_dashboards', [])
+            logger.info('Provisioning static dashboards')
+            for dashboard in get_dashboard_definitions():
+                if dashboard['title'] not in excluded_dashboards:
+                    if dashboard['title'].lower() == 'home':
+                        dashboard['uid'] = 'home'
+                    thread_executor.submit(
+                        create_dashboard, token_request, dashboard)
+
+            # Home dashboard is always called "Home"
+            # Make sure it's set for the organization
+            home_dashboard = find_dashboard(token_request, 'Home')
+            if home_dashboard:
+                set_home_dashboard(token_request, home_dashboard['id'])
+
+    logger.info(f'Time to complete: {time.time() - start}')
+    for org_id, token in tokens:
+        delete_api_token(request, org_id, token)
 
     return all_orgs
diff --git a/brian_dashboard_manager/templating/helpers.py b/brian_dashboard_manager/templating/helpers.py
index f808f8b..5d878ab 100644
--- a/brian_dashboard_manager/templating/helpers.py
+++ b/brian_dashboard_manager/templating/helpers.py
@@ -1,4 +1,6 @@
 import re
+import logging
+from multiprocessing.pool import Pool
 from itertools import product
 from string import ascii_uppercase
 from brian_dashboard_manager.templating.render import create_panel
@@ -6,6 +8,8 @@ from brian_dashboard_manager.templating.render import create_panel
 PANEL_HEIGHT = 12
 PANEL_WIDTH = 24
 
+logger = logging.getLogger(__file__)
+
 
 def get_description(interface):
     return interface.get('description', '').strip()
@@ -217,26 +221,36 @@ def get_panel_fields(panel, panel_type, datasource):
     })
 
 
+def get_panel_definitions(panel, datasource, errors=False):
+    result = []
+
+    result.append(get_panel_fields(
+        panel, 'traffic', datasource))
+    result.append(get_panel_fields(
+        panel, 'IPv6', datasource))
+    if errors:
+        result.append(get_panel_fields(
+            panel, 'errors', datasource))
+    return result
+
+
+def flatten(t): return [item for sublist in t for item in sublist]
+
+
 def get_dashboard_data(data, datasource, tag, errors=False):
     id_gen = num_generator()
     gridPos = gridPos_generator(id_gen)
-
-    def get_panel_definitions(panels, datasource):
-        result = []
-        for panel in panels:
-            result.append(get_panel_fields(
-                {**panel, **next(gridPos)}, 'traffic', datasource))
-            result.append(get_panel_fields(
-                {**panel, **next(gridPos)}, 'IPv6', datasource))
-            if errors:
-                result.append(get_panel_fields(
-                    {**panel, **next(gridPos)}, 'errors', datasource))
-        return result
+    pool = Pool(processes=2)
 
     for peer, panels in data.items():
+        otherpanels = [({**panel, **next(gridPos)}, datasource, errors)
+                       for panel in panels]
+
+        all_panels = flatten(pool.starmap(get_panel_definitions, otherpanels))
+
         yield {
             'title': peer,
             'datasource': datasource,
-            'panels': get_panel_definitions(panels, datasource),
+            'panels': all_panels,
             'tag': tag
         }
diff --git a/brian_dashboard_manager/templating/nren_access.py b/brian_dashboard_manager/templating/nren_access.py
index 4430515..03eaa1f 100644
--- a/brian_dashboard_manager/templating/nren_access.py
+++ b/brian_dashboard_manager/templating/nren_access.py
@@ -1,11 +1,13 @@
 import json
 import os
 import jinja2
+from concurrent.futures import ProcessPoolExecutor
 from brian_dashboard_manager.templating.render import create_dropdown_panel, \
     create_panel_target
 from brian_dashboard_manager.templating.helpers import \
     is_aggregate_interface, is_logical_interface, is_physical_interface, \
-    num_generator, gridPos_generator, letter_generator, get_panel_fields
+    num_generator, gridPos_generator, letter_generator, \
+    get_panel_definitions, flatten
 
 
 def get_nrens(interfaces):
@@ -95,46 +97,48 @@ def get_aggregate_targets(aggregates):
     return ingress, egress
 
 
-def get_panel_definitions(panels, datasource, errors=False):
-    result = []
-    for panel in panels:
-        result.append(get_panel_fields(
-            {**panel, **next(gridPos)}, 'traffic', datasource))
-        result.append(get_panel_fields(
-            {**panel, **next(gridPos)}, 'IPv6', datasource))
-        if errors:
-            result.append(get_panel_fields(
-                {**panel, **next(gridPos)}, 'errors', datasource))
-    return result
-
-
 def get_dashboard_data(interfaces, datasource):
-
-    nren_data = get_nrens(interfaces)
-    for nren, data in nren_data.items():
-
-        agg_ingress, agg_egress = get_aggregate_targets(data['AGGREGATES'])
-        services_dropdown = create_dropdown_panel('Services', **next(gridPos))
-        service_panels = get_panel_definitions(data['SERVICES'], datasource)
-        iface_dropdown = create_dropdown_panel('Interfaces', **next(gridPos))
-        phys_panels = get_panel_definitions(data['PHYSICAL'], datasource, True)
-
-        yield {
-            'nren_name': nren,
-            'datasource': datasource,
-            'ingress_aggregate_targets': agg_ingress,
-            'egress_aggregate_targets': agg_egress,
-            'dropdown_groups': [
-                {
-                    'dropdown': services_dropdown,
-                    'panels': service_panels,
-                },
-                {
-                    'dropdown': iface_dropdown,
-                    'panels': phys_panels,
-                }
-            ]
-        }
+    with ProcessPoolExecutor(max_workers=4) as executor:
+
+        def get_panels(data):
+            for panel in data:
+                yield executor.submit(
+                    get_panel_definitions,
+                    {
+                        **panel,
+                        **next(gridPos)
+                    },
+                    datasource
+                )
+
+        nren_data = executor.submit(get_nrens, interfaces).result()
+        for nren, data in nren_data.items():
+
+            services_dropdown = create_dropdown_panel(
+                'Services', **next(gridPos))
+            services = get_panels(data['SERVICES'])
+            iface_dropdown = create_dropdown_panel(
+                'Interfaces', **next(gridPos))
+            physical = get_panels(data['PHYSICAL'])
+            agg_ingress, agg_egress = executor.submit(
+                get_aggregate_targets, data['AGGREGATES']).result()
+
+            yield {
+                'nren_name': nren,
+                'datasource': datasource,
+                'ingress_aggregate_targets': agg_ingress,
+                'egress_aggregate_targets': agg_egress,
+                'dropdown_groups': [
+                    {
+                        'dropdown': services_dropdown,
+                        'panels': flatten([p.result() for p in services]),
+                    },
+                    {
+                        'dropdown': iface_dropdown,
+                        'panels': flatten([p.result() for p in physical]),
+                    }
+                ]
+            }
 
 
 def generate_nrens(interfaces, datasource):
-- 
GitLab