diff --git a/brian_dashboard_manager/config.py b/brian_dashboard_manager/config.py index 7e215f8bfc2c472aec99e0d6aaad787639974099..eab88718e4cb01f4ad3dd9f7bde09bd55c6a428b 100644 --- a/brian_dashboard_manager/config.py +++ b/brian_dashboard_manager/config.py @@ -29,11 +29,8 @@ If the value is a list, dashboard titles within the list should be excluded. import json import jsonschema -import tempfile -STATE_PATH = tempfile.gettempdir() + '/briandashboardmanager-state.json' - DEFAULT_ORGANIZATIONS = [ { "name": "GÉANT Staff", diff --git a/brian_dashboard_manager/grafana/dashboard.py b/brian_dashboard_manager/grafana/dashboard.py index 9ef8db0d648f9cbcdcd4b2d64b6a8a096548a549..56b6945af71b1bf0b8bf28cc18506491b9ab1f96 100644 --- a/brian_dashboard_manager/grafana/dashboard.py +++ b/brian_dashboard_manager/grafana/dashboard.py @@ -6,6 +6,7 @@ import os import json import time +from typing import Dict, Any, Optional from requests.exceptions import HTTPError from brian_dashboard_manager.grafana.utils.request import TokenRequest @@ -191,7 +192,8 @@ def _get_dashboard(request: TokenRequest, uid): return r.json()['dashboard'] -def create_dashboard(request: TokenRequest, dashboard: dict, folder_id=None): +def create_dashboard(request: TokenRequest, dashboard: dict, folder_id=None, + existing_folder_dashboards: Optional[Dict[str, Any]] = None): """ Creates the given dashboard for the organization tied to the token. If the dashboard already exists, it will be updated. @@ -205,22 +207,32 @@ def create_dashboard(request: TokenRequest, dashboard: dict, folder_id=None): title = dashboard['title'] existing_dashboard = None has_uid = dashboard.get('uid') is not None - if has_uid: - existing_dashboard = _get_dashboard(request, uid=dashboard['uid']) - else: - existing_dashboard = _search_dashboard(request, dashboard, folder_id) + + existing_dashboard = None + + if existing_folder_dashboards: + existing_dashboard = existing_folder_dashboards.get(dashboard['title'].lower()) + if has_uid: + assert existing_dashboard is None or existing_dashboard['uid'] == dashboard['uid'], \ + f"UID mismatch for dashboard {title}: {existing_dashboard['uid']} != {dashboard['uid']}" + + if not existing_dashboard: + if has_uid: + existing_dashboard = _get_dashboard(request, uid=dashboard['uid']) + else: + existing_dashboard = _search_dashboard(request, dashboard, folder_id) if existing_dashboard: dashboard['uid'] = existing_dashboard['uid'] dashboard['id'] = existing_dashboard['id'] - dashboard['version'] = existing_dashboard['version'] + dashboard['version'] = 1 else: # We are creating a new dashboard, delete ID if it exists. dashboard.pop('id', None) payload = { 'dashboard': dashboard, - 'overwrite': False + 'overwrite': True } if folder_id: payload['folderId'] = folder_id diff --git a/brian_dashboard_manager/grafana/folder.py b/brian_dashboard_manager/grafana/folder.py index 5343768262c423fe06b36ed4160bc3f5f58447c6..e77cfc38eb530dbd914b28ceda7f5a8fe6b81f57 100644 --- a/brian_dashboard_manager/grafana/folder.py +++ b/brian_dashboard_manager/grafana/folder.py @@ -6,6 +6,24 @@ from brian_dashboard_manager.grafana.utils.request import TokenRequest logger = logging.getLogger(__name__) +def list_folder_dashboards(request: TokenRequest, folder_uid): + """ + Lists all dashboards in a folder. + + :param request: TokenRequest object + :param folder_uid: folder UID + :return: list of dashboard definitions + """ + try: + r = request.get(f'api/search?folderUIDs={folder_uid}') + dashboards = r.json() + except HTTPError: + logger.exception(f'Error when listing dashboards in folder: {folder_uid}') + return [] + + return {dash['title'].lower(): dash for dash in dashboards} + + def delete_folder(request: TokenRequest, title=None, uid=None): """ Deletes a single folder for the organization diff --git a/brian_dashboard_manager/grafana/provision.py b/brian_dashboard_manager/grafana/provision.py index 308bd2436ba11497c2397095000e87c3c022f598..5f56a4a7b5809adcb5b228edd8d7378c25891cfd 100644 --- a/brian_dashboard_manager/grafana/provision.py +++ b/brian_dashboard_manager/grafana/provision.py @@ -3,14 +3,11 @@ This module is responsible for the entire provisioning lifecycle. """ import itertools -import os import logging import time -import json -import datetime from concurrent.futures import Future from concurrent.futures import ThreadPoolExecutor -from brian_dashboard_manager.config import DEFAULT_ORGANIZATIONS, STATE_PATH +from brian_dashboard_manager.config import DEFAULT_ORGANIZATIONS from brian_dashboard_manager.grafana.utils.request import AdminRequest, \ TokenRequest @@ -25,7 +22,7 @@ from brian_dashboard_manager.grafana.dashboard import list_dashboards, \ from brian_dashboard_manager.grafana.datasource import \ datasource_exists, create_datasource from brian_dashboard_manager.grafana.folder import find_folder, \ - delete_folder, delete_unknown_folders + delete_folder, delete_unknown_folders, list_folder_dashboards from brian_dashboard_manager.inventory_provider.interfaces import \ get_gws_direct, get_gws_indirect, get_interfaces, \ get_eumetsat_multicast_subscriptions, get_nren_regions @@ -44,9 +41,9 @@ from brian_dashboard_manager.templating.render import ( render_simple_dashboard, ) +MAX_THREADS = 16 logger = logging.getLogger(__name__) -MAX_WORKERS = 4 DASHBOARDS = { 'NRENLEGACY': { 'tag': ['customerslegacy'], @@ -203,7 +200,7 @@ AGG_DASHBOARDS = { } -def provision_folder(token_request, folder_name, dash, services, regions, +def provision_folder(thread_executor: ThreadPoolExecutor, token_request, folder_name, dash, services, regions, ds_name, excluded_dashboards): """ Function to provision dashboards within a folder. @@ -227,6 +224,11 @@ def provision_folder(token_request, folder_name, dash, services, regions, excluded_dashboards = set([s.lower() for s in excluded_dashboards]) folder = find_folder(token_request, folder_name) + if not folder: + raise ValueError(f'Folder {folder_name} not found') + + folder_dashboards_by_name = list_folder_dashboards(token_request, folder['uid']) + tag = dash['tag'] interfaces = list( filter( @@ -271,31 +273,24 @@ def provision_folder(token_request, folder_name, dash, services, regions, dash_data = get_service_dashboard_data(data, ds_name, tag) else: data = get_interface_data(interfaces) - dash_data = get_dashboard_data( - data=data, - datasource=ds_name, - tag=tag, - errors=errors) - - provisioned = [] + dash_data = get_dashboard_data(data=data, datasource=ds_name, tag=tag, errors=errors) - with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: - for dashboard in dash_data: - if has_aggregate_panels: - rendered = render_with_aggregate_dashboard(**dashboard) - else: - rendered = render_simple_dashboard(**dashboard) + for dashboard in dash_data: + if has_aggregate_panels: + rendered = render_with_aggregate_dashboard(**dashboard) + else: + rendered = render_simple_dashboard(**dashboard) - if rendered.get("title").lower() in excluded_dashboards: - executor.submit(delete_dashboard, token_request, rendered, folder["id"]) - continue - provisioned.append(executor.submit(create_dashboard, token_request, - rendered, folder['id'])) - return [r.result() for r in provisioned] + dash_title = rendered.get("title").lower() + if dash_title in excluded_dashboards: + if dash_title in folder_dashboards_by_name: + delete_dashboard(token_request, rendered, folder['id']) + continue + yield thread_executor.submit(create_dashboard, token_request, rendered, folder['id'], folder_dashboards_by_name) def provision_aggregate(token_request, folder, - dash, ds_name): + dash, ds_name, folder_dashboards_by_name): """ Function to provision an aggregate dashboard within a folder. @@ -318,7 +313,7 @@ def provision_aggregate(token_request, folder, f'Aggregate - {name}', data, ds_name, tag) rendered = render_simple_dashboard(**dashboard) - return create_dashboard(token_request, rendered, folder['id']) + return create_dashboard(token_request, rendered, folder['id'], folder_dashboards_by_name) def is_excluded_folder(excluded_folders, folder_name): @@ -396,11 +391,12 @@ def _interfaces_to_keep(interface, excluded_nrens): return should_keep -def _provision_interfaces(config, org_config, ds_name, token): +def _provision_interfaces(thread_executor: ThreadPoolExecutor, config, org_config, ds_name, token): """ This function is used to provision most dashboards, overwriting existing ones. + :param thread_executor: a ThreadPoolExecutor for concurrent requests :param config: the application config :param org_config: the organisation config :param ds_name: the name of the datasource to query in the dashboards @@ -450,39 +446,28 @@ def _provision_interfaces(config, org_config, ds_name, token): ifaces.append(iface) # provision dashboards and their folders - with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: - provisioned = [] - for folder in DASHBOARDS.values(): - folder_name = folder['folder_name'] - - # boolean True means entire folder excluded - # if list, it is specific dashboard names not to provision - # so is handled at provision time. - if is_excluded_folder(excluded_folders, folder_name): - executor.submit( - delete_folder, token, title=folder_name) - continue + for folder in DASHBOARDS.values(): + folder_name = folder['folder_name'] + + # boolean True means entire folder excluded + # if list, it is specific dashboard names not to provision + # so is handled at provision time. + if is_excluded_folder(excluded_folders, folder_name): + delete_folder(token, title=folder_name) + continue - logger.info( - f'Provisioning {org_config["name"]}/{folder_name} dashboards') - res = executor.submit( - provision_folder, token, - folder_name, folder, services, regions, ds_name, - excluded_folder_dashboards(org_config, folder_name)) - provisioned.append(res) - - for result in provisioned: - folder = result.result() - if folder is None: - continue - yield from folder + logger.info( + f'Provisioning {org_config["name"]}/{folder_name} dashboards') + yield from provision_folder(thread_executor, token, folder_name, folder, services, regions, ds_name, + excluded_folder_dashboards(org_config, folder_name)) -def _provision_gws_indirect(config, org_config, ds_name, token): +def _provision_gws_indirect(thread_executor: ThreadPoolExecutor, config, org_config, ds_name, token): """ This function is used to provision GWS Indirect dashboards, overwriting existing ones. + :param thread_executor: a ThreadPoolExecutor for concurrent requests :param config: the application config :param org_config: the organisation config :param ds_name: the name of the datasource to query in the dashboards @@ -498,25 +483,30 @@ def _provision_gws_indirect(config, org_config, ds_name, token): delete_folder(token, title=folder_name) else: folder = find_folder(token, folder_name) - with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: - gws_indirect_data = get_gws_indirect( - config['inventory_provider']) - provisioned = [] - dashes = generate_indirect(gws_indirect_data, ds_name) - for dashboard in dashes: - rendered = render_simple_dashboard(**dashboard) - provisioned.append(executor.submit(create_dashboard, - token, - rendered, folder['id'])) + if not folder: + raise ValueError(f'Folder {folder_name} not found') + + folder_dashboards_by_name = list_folder_dashboards(token, folder['uid']) - yield from provisioned + gws_indirect_data = get_gws_indirect( + config['inventory_provider']) + provisioned = [] + dashes = generate_indirect(gws_indirect_data, ds_name) + for dashboard in dashes: + rendered = render_simple_dashboard(**dashboard) + provisioned.append(thread_executor.submit(create_dashboard, + token, + rendered, folder['id'], folder_dashboards_by_name)) + + yield from provisioned -def _provision_gws_direct(config, org_config, ds_name, token): +def _provision_gws_direct(thread_executor: ThreadPoolExecutor, config, org_config, ds_name, token): """ This function is used to provision GWS Direct dashboards, overwriting existing ones. + :param thread_executor: a ThreadPoolExecutor for concurrent requests :param config: the application config :param org_config: the organisation config :param ds_name: the name of the datasource to query in the dashboards @@ -533,25 +523,28 @@ def _provision_gws_direct(config, org_config, ds_name, token): else: folder = find_folder(token, folder_name) if not folder: - logger.error(f'Folder {folder_name} not found') - with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: - gws_data = get_gws_direct(config['inventory_provider']) - provisioned = [] + raise ValueError(f'Folder {folder_name} not found') - for dashboard in generate_gws(gws_data, ds_name): - rendered = render_simple_dashboard(**dashboard) - provisioned.append(executor.submit(create_dashboard, - token, - rendered, folder['id'])) + folder_dashboards_by_name = list_folder_dashboards(token, folder['uid']) - yield from provisioned + gws_data = get_gws_direct(config['inventory_provider']) + provisioned = [] + + for dashboard in generate_gws(gws_data, ds_name): + rendered = render_simple_dashboard(**dashboard) + provisioned.append( + thread_executor.submit(create_dashboard, token, rendered, folder['id'], folder_dashboards_by_name) + ) + + yield from provisioned -def _provision_eumetsat_multicast(config, org_config, ds_name, token): +def _provision_eumetsat_multicast(thread_executor: ThreadPoolExecutor, config, org_config, ds_name, token): """ This function is used to provision EUMETSAT Multicast dashboards, overwriting existing ones. + :param thread_executor: a ThreadPoolExecutor for concurrent requests :param config: the application config :param org_config: the organisation config :param ds_name: the name of the datasource to query in the dashboards @@ -567,29 +560,30 @@ def _provision_eumetsat_multicast(config, org_config, ds_name, token): delete_folder(token, title=folder_name) else: folder = find_folder(token, folder_name) - with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: - subscriptions = get_eumetsat_multicast_subscriptions( - config['inventory_provider']) - provisioned = [] + if not folder: + raise ValueError(f'Folder {folder_name} not found') + + folder_dashboards_by_name = list_folder_dashboards(token, folder['uid']) - for dashboard in generate_eumetsat_multicast( - subscriptions, ds_name): - rendered = render_simple_dashboard(**dashboard) - provisioned.append( - executor.submit( - create_dashboard, - token, - rendered, - folder['id'])) + subscriptions = get_eumetsat_multicast_subscriptions( + config['inventory_provider']) + provisioned = [] + + for dashboard in generate_eumetsat_multicast(subscriptions, ds_name): + rendered = render_simple_dashboard(**dashboard) + provisioned.append( + thread_executor.submit(create_dashboard, token, rendered, folder['id'], folder_dashboards_by_name) + ) - yield from provisioned + yield from provisioned -def _provision_aggregates(config, org_config, ds_name, token): +def _provision_aggregates(thread_executor: ThreadPoolExecutor, config, org_config, ds_name, token): """ This function is used to provision Aggregate dashboards, overwriting existing ones. + :param thread_executor: a ThreadPoolExecutor for concurrent requests :param config: the application config :param org_config: the organisation config :param ds_name: the name of the datasource to query in the dashboards @@ -603,34 +597,34 @@ def _provision_aggregates(config, org_config, ds_name, token): # don't provision aggregate folder delete_folder(token, title=folder_name) else: - with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: - provisioned = [] - agg_folder = find_folder(token, folder_name) - for dash in AGG_DASHBOARDS.values(): - excluded_dashboards = excluded_folder_dashboards( - org_config, folder_name) - if dash['dashboard_name'] in excluded_dashboards: - dash_name = { - 'title': f'Aggregate - {dash["dashboard_name"]}'} - executor.submit(delete_dashboard, - token, dash_name, - agg_folder['id']) - continue - logger.info(f'Provisioning {org_config["name"]}' + - f'/Aggregate {dash["dashboard_name"]} dashboards') - res = executor.submit( - provision_aggregate, token, - agg_folder, dash, ds_name) - provisioned.append(res) + provisioned = [] + agg_folder = find_folder(token, folder_name) + if not agg_folder: + raise ValueError(f'Folder {folder_name} not found') + + folder_dashboards_by_name = list_folder_dashboards(token, agg_folder['uid']) - yield from provisioned + for dash in AGG_DASHBOARDS.values(): + excluded_dashboards = excluded_folder_dashboards(org_config, folder_name) + if dash['dashboard_name'] in excluded_dashboards: + dash_name = {'title': f'Aggregate - {dash["dashboard_name"]}'} + delete_dashboard(token, dash_name, agg_folder['id']) + continue + + logger.info(f'Provisioning {org_config["name"]}/Aggregate {dash["dashboard_name"]} dashboards') + provisioned.append( + thread_executor.submit(provision_aggregate, token, agg_folder, dash, ds_name, folder_dashboards_by_name) + ) + yield from provisioned -def _provision_service_dashboards(config, org_config, ds_name, token): + +def _provision_service_dashboards(thread_executor: ThreadPoolExecutor, config, org_config, ds_name, token): """ This function is used to provision service-specific dashboards, overwriting existing ones. + :param thread_executor: a ThreadPoolExecutor for concurrent requests :param config: the application config :param org_config: the organisation config :param ds_name: the name of the datasource to query in the dashboards @@ -653,42 +647,41 @@ def _provision_service_dashboards(config, org_config, ds_name, token): svcs.append(service) # provision dashboards and their folders - with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: - provisioned = [] - for folder in SERVICE_DASHBOARDS.values(): - folder_name = folder['folder_name'] - - # boolean True means entire folder excluded - # if list, it is specific dashboard names not to provision - # so is handled at provision time. - if is_excluded_folder(excluded_folders, folder_name): - executor.submit( - delete_folder, token, title=folder_name) - continue + provisioned = [] + for folder in SERVICE_DASHBOARDS.values(): + folder_name = folder['folder_name'] + + # boolean True means entire folder excluded + # if list, it is specific dashboard names not to provision + # so is handled at provision time. + if is_excluded_folder(excluded_folders, folder_name): + delete_folder(token, title=folder_name) + continue - logger.info( - f'Provisioning {org_config["name"]}/{folder_name} dashboards') - res = executor.submit( - provision_folder, token, - folder_name, folder, services, regions, ds_name, - excluded_folder_dashboards(org_config, folder_name)) - provisioned.append(res) - - for result in provisioned: - folder = result.result() - if folder is None: - continue - yield from folder + logger.info( + f'Provisioning {org_config["name"]}/{folder_name} dashboards') + res = thread_executor.submit( + provision_folder, thread_executor, token, + folder_name, folder, services, regions, ds_name, + excluded_folder_dashboards(org_config, folder_name)) + provisioned.append(res) + + for result in provisioned: + folder = result.result() + if folder is None: + continue + yield from folder -def _provision_static_dashboards(config, org_config, ds_name, token): +def _provision_static_dashboards(thread_executor: ThreadPoolExecutor, config, org_config, ds_name, token): """ This function is used to provision static dashboards from json files, overwriting existing ones. - :param config: unused + :param thread_executor: a ThreadPoolExecutor for concurrent requests + :param config: the application config :param org_config: the organisation config - :param ds_name: unused + :param ds_name: the name of the datasource to query in the dashboards :param token: a token_request object :return: generator of UIDs of dashboards that were created """ @@ -696,29 +689,32 @@ def _provision_static_dashboards(config, org_config, ds_name, token): # Statically defined dashboards from json files excluded_dashboards = org_config.get('excluded_dashboards', []) logger.info('Provisioning static dashboards') + provisioned = [] for dashboard in get_dashboard_definitions(): if dashboard['title'] not in excluded_dashboards: logger.info(f'Provisioning static {dashboard["title"]} dashboard') - res = create_dashboard(token, dashboard) - if res: - yield res + provisioned.append(thread_executor.submit(create_dashboard, token, dashboard)) else: logger.info(f'Ensuring {dashboard["title"]} static dashboard is deleted') delete_dashboard(token, dashboard) + yield from provisioned + # Home dashboard is always called "Home" # Make sure it's set for the organization logger.info('Configuring Home dashboard') - yield set_home_dashboard(token, is_staff=org_config['name'] == 'GÉANT Staff') + yield thread_executor.submit(set_home_dashboard, token, is_staff=org_config['name'] == 'GÉANT Staff') -def _get_ignored_dashboards(config, org_config, token): +def _get_ignored_dashboards(thread_executor: ThreadPoolExecutor, config, org_config, ds_name, token): """ This function is used to get a list of dashboards that should not be touched by the provisioning process. + :param thread_executor: a ThreadPoolExecutor for concurrent requests :param config: the application config :param org_config: the organisation config + :param ds_name: the name of the datasource to query in the dashboards :param token: a token_request object :return: generator of UIDs of dashboards that should not be touched @@ -784,43 +780,93 @@ def _provision_orgs(config): return all_orgs -def provision_maybe(config): - """ - This function writes a timestamp and whether the provisioning process - is running to a state file, and then runs the provisioning process. +def _provision_org(config, org, org_config): + try: + request = AdminRequest(**config) + org_id = org['id'] + accounts = [] - The boolean is used to determine if the provisioning process - should run from other worker processes using the shared state file. + logger.info(f'--- Provisioning org {org["name"]} (ID #{org_id}) ---') - The timestamp is written as a safety measure to ensure that the - provisioning process is not stuck in case a worker process crashes - mid-provisioning. + try: + # create a service account for provisioning (>grafana 11.0) + account = get_or_create_service_account(request, org_id) + token = create_service_account_token(request, account['id']) + accounts.append((org_id, account)) + except Exception: + # we're on a older version of grafana + token = create_api_token(request, org_id) + accounts.append((org_id, token)) - This behaviour is disabled in development mode. + token_request = TokenRequest(token=token['key'], **config) + logger.debug(accounts) - :param config: the application config + all_original_dashboards = list_dashboards(token_request) + all_original_dashboard_uids = {d['uid']: d.get('folderUrl', '') + d['url'] for d in all_original_dashboards} - :return: - """ - with open(STATE_PATH, 'r+') as f: - def write_timestamp(timestamp, provisioning): - f.seek(0) - f.write(json.dumps( - {'timestamp': timestamp, 'provisioning': provisioning})) - f.truncate() + datasource = _provision_datasource(config, token_request) + ds_name = datasource.get('name', 'PollerInfluxDB') + + with ThreadPoolExecutor(max_workers=MAX_THREADS) as thread_executor: + + args = (thread_executor, config, org_config, ds_name, token_request) + managed_dashboards = itertools.chain( + _provision_interfaces(*args), + _provision_gws_indirect(*args), + _provision_gws_direct(*args), + _provision_eumetsat_multicast(*args), + _provision_aggregates(*args), + _provision_service_dashboards(*args), + _provision_static_dashboards(*args), + _get_ignored_dashboards(*args) + ) + managed_dashboard_uids = {} + for dashboard in managed_dashboards: + if isinstance(dashboard, Future): + dashboard = dashboard.result() + if dashboard is None: + continue + assert dashboard['uid'] not in managed_dashboard_uids, \ + f'Dashboard with UID {dashboard["uid"]} already exists: {dashboard}' + managed_dashboard_uids[dashboard['uid']] = dashboard['url'] + + difference = set(all_original_dashboard_uids.keys()) - set(managed_dashboard_uids.keys()) + for uid in difference: + info = all_original_dashboard_uids[uid] + # delete unmanaged dashboards + logger.info(f'Deleting stale dashboard {info} with UID {uid}') + delete_dashboard(token_request, {'uid': uid}) + + folders_to_keep = { + # General is a base folder present in Grafana + 'General', + # other folders, created outside of the DASHBOARDS list + 'GWS Indirect', + 'GWS Direct', + 'Aggregates', + 'EUMETSAT Multicast', + 'EAP Dashboard' + } + folders_to_keep.update({dash['folder_name'] + for dash in DASHBOARDS.values()}) + folders_to_keep.update({dash['folder_name'] + for dash in SERVICE_DASHBOARDS.values()}) + + ignored_folders = config.get('ignored_folders', []) + folders_to_keep.update(ignored_folders) + + delete_unknown_folders(token_request, folders_to_keep) try: - # don't conditionally provision in dev - provisioning = os.environ.get('FLASK_ENV') != 'development' - now = datetime.datetime.now() - write_timestamp(now.timestamp(), provisioning) - provision(config) - finally: - now = datetime.datetime.now() - write_timestamp(now.timestamp(), False) + delete_service_account(request, account['id']) + except Exception: + # we're on a older version of grafana + delete_api_token(request, token['id'], org_id=org_id) + except Exception: + logger.exception(f'Error when provisioning org {org["name"]}') -def provision(config, raise_exceptions=False): +def provision(config): """ The entrypoint for the provisioning process. @@ -835,7 +881,6 @@ def provision(config, raise_exceptions=False): """ start = time.time() - accounts = [] all_orgs = _provision_orgs(config) request = AdminRequest(**config) try: @@ -854,101 +899,10 @@ def provision(config, raise_exceptions=False): f'Org {org["name"]} does not have valid configuration.') return None - for org in all_orgs: - try: - org_id = org['id'] - - logger.info( - f'--- Provisioning org {org["name"]} (ID #{org_id}) ---') - org_config = _find_org_config(org) - if not org_config: - # message logged from _find_org_config - continue - - try: - # create a service account for provisioning (>grafana 11.0) - account = get_or_create_service_account(request, org_id) - token = create_service_account_token(request, account['id']) - accounts.append((org_id, account)) - except Exception: - # we're on a older version of grafana - token = create_api_token(request, org_id) - accounts.append((org_id, token)) - - token_request = TokenRequest(token=token['key'], **config) - logger.debug(accounts) - - all_original_dashboards = list_dashboards(token_request) - all_original_dashboard_uids = {d['uid']: d.get('folderUrl', '') + d['url'] for d in all_original_dashboards} - - datasource = _provision_datasource(config, token_request) - ds_name = datasource.get('name', 'PollerInfluxDB') - - managed_dashboards = itertools.chain( - _provision_interfaces( - config, org_config, ds_name, token_request), - _provision_gws_indirect( - config, org_config, ds_name, token_request), - _provision_gws_direct( - config, org_config, ds_name, token_request), - _provision_eumetsat_multicast( - config, org_config, ds_name, token_request), - _provision_aggregates( - config, org_config, ds_name, token_request), - _provision_service_dashboards( - config, org_config, ds_name, token_request), - _provision_static_dashboards( - config, org_config, ds_name, token_request), - _get_ignored_dashboards( - config, org_config, token_request) - ) + orgs = list(filter(lambda t: t[1] is not None, [(org, _find_org_config(org)) for org in all_orgs])) - managed_dashboard_uids = {} - for dashboard in managed_dashboards: - if isinstance(dashboard, Future): - dashboard = dashboard.result() - if dashboard is None: - continue - assert dashboard['uid'] not in managed_dashboard_uids, \ - f'Dashboard with UID {dashboard["uid"]} already exists: {dashboard}' - managed_dashboard_uids[dashboard['uid']] = dashboard['url'] - - difference = set(all_original_dashboard_uids.keys()) - set(managed_dashboard_uids.keys()) - for uid in difference: - info = all_original_dashboard_uids[uid] - # delete unmanaged dashboards - logger.info(f'Deleting stale dashboard {info} with UID {uid}') - delete_dashboard(token_request, {'uid': uid}) - - folders_to_keep = { - # General is a base folder present in Grafana - 'General', - # other folders, created outside of the DASHBOARDS list - 'GWS Indirect', - 'GWS Direct', - 'Aggregates', - 'EUMETSAT Multicast', - 'EAP Dashboard' - } - folders_to_keep.update({dash['folder_name'] - for dash in DASHBOARDS.values()}) - folders_to_keep.update({dash['folder_name'] - for dash in SERVICE_DASHBOARDS.values()}) - - ignored_folders = config.get('ignored_folders', []) - folders_to_keep.update(ignored_folders) - - delete_unknown_folders(token_request, folders_to_keep) - try: - delete_service_account(request, account['id']) - except Exception: - # we're on a older version of grafana - delete_api_token(request, token['id'], org_id=org_id) - except Exception: - logger.exception(f'Error when provisioning org {org["name"]}') - if raise_exceptions: - raise - break + for org, org_config in orgs: + _provision_org(config, org, org_config) logger.info(f'Time to complete: {time.time() - start}') diff --git a/brian_dashboard_manager/grafana/utils/request.py b/brian_dashboard_manager/grafana/utils/request.py index dfe7051e9f13185fa13bec70b1998cb457b192b4..8baa5e95199a5843df0e26c996c16c9b522d4604 100644 --- a/brian_dashboard_manager/grafana/utils/request.py +++ b/brian_dashboard_manager/grafana/utils/request.py @@ -22,7 +22,7 @@ class Request(requests.Session): self.BASE_URL = url def do_request(self, method, endpoint, **kwargs) -> requests.Response: - r = self.request(method, self.BASE_URL + endpoint, **kwargs) + r = self.request(method, self.BASE_URL + endpoint, timeout=15, **kwargs) r.raise_for_status() return r diff --git a/brian_dashboard_manager/routes/update.py b/brian_dashboard_manager/routes/update.py index 8a042dfbfba511e0bcb42bf2bd3aa55c3f7823b0..5c81c011f400dde670fa7551a54571f71dfe7999 100644 --- a/brian_dashboard_manager/routes/update.py +++ b/brian_dashboard_manager/routes/update.py @@ -1,14 +1,16 @@ -import json import datetime + from flask import jsonify, Response from concurrent.futures import ThreadPoolExecutor -from json.decoder import JSONDecodeError from flask import Blueprint, current_app from brian_dashboard_manager.routes import common -from brian_dashboard_manager.grafana.provision import provision_maybe +from brian_dashboard_manager.grafana.provision import provision from brian_dashboard_manager import CONFIG_KEY -from brian_dashboard_manager.config import STATE_PATH +provision_state = { + 'time': datetime.datetime.now(datetime.timezone.utc), + 'provisioning': False +} routes = Blueprint("update", __name__) @@ -27,40 +29,38 @@ def after_request(resp): return common.after_request(resp) -def should_provision(): +def provision_maybe(): """ - Check if we should provision by checking the state file. - Multiple workers can call this function at the same time, - so we need to make sure we don't provision twice while - the first provisioning is still running. + Check if we should provision in case of multiple requests hitting the endpoint. + We need to make sure we don't provision if another thread is still running. :return: tuple of (bool, datetime) representing if we can provision and the timestamp of the last provisioning, respectively. """ - try: - with open(STATE_PATH, 'r+') as f: - try: - state = json.load(f) - except JSONDecodeError: - state = {} + global provision_state + + now = datetime.datetime.now(datetime.timezone.utc) + timestamp = provision_state['time'] + provisioning = provision_state['provisioning'] + + if provisioning and (now - timestamp).total_seconds() < 600: # lockout for 10 minutes at most + return False, timestamp + + def write_timestamp(timestamp, provisioning): + provision_state['time'] = timestamp + provision_state['provisioning'] = provisioning - provisioning = state.get('provisioning', False) - timestamp = datetime.datetime.fromtimestamp( - state.get('timestamp', 1)) + def _finish(): + now = datetime.datetime.now(datetime.timezone.utc) + write_timestamp(now, False) - now = datetime.datetime.now() - if provisioning and (now - timestamp).total_seconds() > 3600: - # if we stay in provisioning state - # for over an hour, we probably restarted - # and the state file is out of sync. - provisioning = False + write_timestamp(now, True) - can_provision = not provisioning - return can_provision, timestamp - except FileNotFoundError: - with open(STATE_PATH, 'w') as f: - return True, None + executor = ThreadPoolExecutor(max_workers=1) + f = executor.submit(provision, current_app.config[CONFIG_KEY]) + f.add_done_callback(lambda _: _finish()) + return True, now @routes.route('/', methods=['GET']) @@ -78,11 +78,10 @@ def update(): :return: json """ - should, timestamp = should_provision() + should, timestamp = provision_maybe() if should: - executor = ThreadPoolExecutor(max_workers=1) - executor.submit(provision_maybe, current_app.config[CONFIG_KEY]) return jsonify({'data': {'message': 'Provisioning dashboards!'}}) else: - message = f'Provision already in progress since {timestamp}' + seconds_ago = (datetime.datetime.now(datetime.timezone.utc) - timestamp).total_seconds() + message = f'Provision already in progress since {timestamp} ({seconds_ago:.2f} seconds ago).' return Response(message, status=503) diff --git a/brian_dashboard_manager/templating/helpers.py b/brian_dashboard_manager/templating/helpers.py index efcf8ff0e48107eadd8dd0397df75b9f6d13650a..e2b32264f70ae1cfa56052772adf52da99e9296a 100644 --- a/brian_dashboard_manager/templating/helpers.py +++ b/brian_dashboard_manager/templating/helpers.py @@ -2,9 +2,11 @@ Helper functions used to group interfaces together and generate the necessary data to generate the dashboards from templates. """ -from collections import defaultdict -from concurrent.futures import ProcessPoolExecutor import logging + +from typing import Union +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor from itertools import product from functools import partial, reduce from string import ascii_uppercase @@ -12,7 +14,9 @@ from string import ascii_uppercase from brian_dashboard_manager.templating.render import create_panel, \ create_panel_target, create_dropdown_panel -NUM_PROCESSES = 4 +AnyExecutor = Union[ProcessPoolExecutor, ThreadPoolExecutor] + +DEFAULT_TIMEOUT = 60 PANEL_HEIGHT = 12 PANEL_WIDTH = 24 @@ -686,29 +690,122 @@ def default_interface_panel_generator(gridPos, use_all_traffic=True, use_ipv6=Tr return get_panel_definitions -def _get_dashboard_data(data, datasource, tag, single_data_func): +def create_aggregate_panel(title, gridpos, targets, datasource): """ - Helper for generating dashboard definitions. - Uses multiprocessing to speed up generation. + Helper for generating aggregate panels. Creates two panels, one for + ingress and one for egress. - :param data: the dashboard names and the panel data for each dashboard + :param title: title for the panel + :param gridpos: generator for grid position + :param targets: list of targets for the panels, used to build separate + targets for both ingress and egress. + :param datasource: datasource to use for the panels + + :return: tuple of aggregate panels, one for ingress and one for egress + """ + + ingress_targets, egress_targets = get_aggregate_targets(targets) + + ingress_pos = next(gridpos) + egress_pos = next(gridpos) + + is_total = 'totals' in title.lower() + + def reduce_alias(prev, curr): + alias = curr['alias'] + if 'egress' in alias.lower(): + prev[alias] = '#0000FF' + else: + prev[alias] = '#00FF00' + return prev + + ingress_colors = reduce(reduce_alias, ingress_targets, {}) + egress_colors = reduce(reduce_alias, egress_targets, {}) + + ingress = create_panel( + **ingress_pos, + stack=True, + linewidth=0 if is_total else 1, + datasource=datasource, + title=title + " - ingress", + targets=ingress_targets, + y_axis_type="bits", + alias_colors=ingress_colors if is_total else {}, + ) + + egress = create_panel( + **egress_pos, + stack=True, + linewidth=0 if is_total else 1, + datasource=datasource, + title=title + " - egress", + targets=egress_targets, + y_axis_type="bits", + alias_colors=egress_colors if is_total else {}, + ) + + return ingress, egress + + +def get_aggregate_dashboard_data(title, remotes, datasource, tag): + """ + Helper for generating aggregate dashboard definitions. + Aggregate dashboards consist only of aggregate panels that are + panels with data for multiple interfaces. + + At the top of the dashboard are two aggregate panels showing + total ingress and egress data for all interfaces. + + Below that are two aggregate panels for each target, one for + ingress and one for egress. + + :param title: title for the dashboard + :param remotes: dictionary of targets for the panels, the key is the + remote (usually a customer) and the value is a list of targets + for that remote. A single target represents how to fetch + data for one interface. :param datasource: datasource to use for the panels :param tag: tag to use for the dashboard, used for dashboard dropdowns on the home dashboard. - :param single_data_func: function that gets data for one definition - :return: generator for dashboard definitions for each dashboard + :return: dashboard definition for the aggregate dashboard """ - with ProcessPoolExecutor(max_workers=NUM_PROCESSES) as executor: - for dash in executor.map( - partial( - single_data_func, - datasource=datasource, - tag=tag), - data.items() - ): - yield dash + id_gen = num_generator() + gridPos = gridPos_generator(id_gen, agg=True) + + panels = [] + all_targets = remotes.get('EVERYSINGLETARGET', []) + + ingress, egress = create_aggregate_panel( + title, gridPos, all_targets, datasource) + panels.extend([ingress, egress]) + + totals_title = title + ' - Totals' + t_in, t_eg = create_aggregate_panel( + totals_title, gridPos, all_targets, datasource) + panels.extend([t_in, t_eg]) + + if 'EVERYSINGLETARGET' in remotes: + del remotes['EVERYSINGLETARGET'] + + for remote in remotes: + _in, _out = create_aggregate_panel( + title + f' - {remote}', gridPos, remotes[remote], datasource) + panels.extend([_in, _out]) + + result = { + 'title': title, + 'datasource': datasource, + 'panels': panels, + + } + if isinstance(tag, list): + result['tags'] = tag + else: + result['tag'] = tag + + return result def get_nren_dashboard_data_single(data, datasource, tag): @@ -790,10 +887,6 @@ def get_nren_dashboard_data_single(data, datasource, tag): return result -def get_nren_dashboard_data(data, datasource, tag): - yield from _get_dashboard_data(data, datasource, tag, get_nren_dashboard_data_single) - - def get_re_peer_dashboard_data_single(data, datasource, tag): """ Helper for generating dashboard definitions for a single R&E Peer. @@ -863,10 +956,6 @@ def get_re_peer_dashboard_data_single(data, datasource, tag): return result -def get_re_peer_dashboard_data(data, datasource, tag): - yield from _get_dashboard_data(data, datasource, tag, get_re_peer_dashboard_data_single) - - def get_service_dashboard_data_single(data, datasource, tag): """ Helper for generating dashboard definitions for a single service. @@ -917,10 +1006,6 @@ def get_service_dashboard_data_single(data, datasource, tag): return result -def get_service_dashboard_data(data, datasource, tag): - yield from _get_dashboard_data(data, datasource, tag, get_service_dashboard_data_single) - - def get_dashboard_data_single( data, datasource, tag, panel_generator=default_interface_panel_generator, @@ -963,8 +1048,7 @@ def get_dashboard_data( panel_generator=default_interface_panel_generator, errors=False): """ - Helper for generating dashboard definitions for all non-NREN dashboards. - Uses multiprocessing to speed up generation. + Helper for generating dashboard definitions for interface-based non-NREN dashboards. :param data: the dashboard names and the panel data for each dashboard :param datasource: datasource to use for the panels @@ -976,135 +1060,36 @@ def get_dashboard_data( :return: generator for dashboard definitions for each dashboard """ - with ProcessPoolExecutor(max_workers=NUM_PROCESSES) as executor: - try: - for dash in executor.map( - partial( - get_dashboard_data_single, - datasource=datasource, - tag=tag, - panel_generator=panel_generator, - errors=errors), - data.items() - ): - yield dash - finally: - executor.shutdown(wait=False) - - -def create_aggregate_panel(title, gridpos, targets, datasource): - """ - Helper for generating aggregate panels. Creates two panels, one for - ingress and one for egress. - - :param title: title for the panel - :param gridpos: generator for grid position - :param targets: list of targets for the panels, used to build separate - targets for both ingress and egress. - :param datasource: datasource to use for the panels - - :return: tuple of aggregate panels, one for ingress and one for egress - """ - - ingress_targets, egress_targets = get_aggregate_targets(targets) + func = partial( + get_dashboard_data_single, + datasource=datasource, tag=tag, panel_generator=panel_generator, errors=errors) - ingress_pos = next(gridpos) - egress_pos = next(gridpos) + yield from map(func, data.items()) - is_total = 'totals' in title.lower() - def reduce_alias(prev, curr): - alias = curr['alias'] - if 'egress' in alias.lower(): - prev[alias] = '#0000FF' - else: - prev[alias] = '#00FF00' - return prev - - ingress_colors = reduce(reduce_alias, ingress_targets, {}) - egress_colors = reduce(reduce_alias, egress_targets, {}) - - ingress = create_panel( - **ingress_pos, - stack=True, - linewidth=0 if is_total else 1, - datasource=datasource, - title=title + " - ingress", - targets=ingress_targets, - y_axis_type="bits", - alias_colors=ingress_colors if is_total else {}, - ) +def get_nren_dashboard_data(data, datasource, tag): - egress = create_panel( - **egress_pos, - stack=True, - linewidth=0 if is_total else 1, + func = partial( + get_nren_dashboard_data_single, datasource=datasource, - title=title + " - egress", - targets=egress_targets, - y_axis_type="bits", - alias_colors=egress_colors if is_total else {}, - ) + tag=tag) - return ingress, egress + yield from map(func, data.items()) -def get_aggregate_dashboard_data(title, remotes, datasource, tag): - """ - Helper for generating aggregate dashboard definitions. - Aggregate dashboards consist only of aggregate panels that are - panels with data for multiple interfaces. - - At the top of the dashboard are two aggregate panels showing - total ingress and egress data for all interfaces. - - Below that are two aggregate panels for each target, one for - ingress and one for egress. - - :param title: title for the dashboard - :param remotes: dictionary of targets for the panels, the key is the - remote (usually a customer) and the value is a list of targets - for that remote. A single target represents how to fetch - data for one interface. - :param datasource: datasource to use for the panels - :param tag: tag to use for the dashboard, used for dashboard dropdowns on - the home dashboard. - - :return: dashboard definition for the aggregate dashboard - """ - - id_gen = num_generator() - gridPos = gridPos_generator(id_gen, agg=True) - - panels = [] - all_targets = remotes.get('EVERYSINGLETARGET', []) - - ingress, egress = create_aggregate_panel( - title, gridPos, all_targets, datasource) - panels.extend([ingress, egress]) +def get_re_peer_dashboard_data(data, datasource, tag): + func = partial( + get_re_peer_dashboard_data_single, + datasource=datasource, + tag=tag) - totals_title = title + ' - Totals' - t_in, t_eg = create_aggregate_panel( - totals_title, gridPos, all_targets, datasource) - panels.extend([t_in, t_eg]) + yield from map(func, data.items()) - if 'EVERYSINGLETARGET' in remotes: - del remotes['EVERYSINGLETARGET'] - for remote in remotes: - _in, _out = create_aggregate_panel( - title + f' - {remote}', gridPos, remotes[remote], datasource) - panels.extend([_in, _out]) - - result = { - 'title': title, - 'datasource': datasource, - 'panels': panels, - - } - if isinstance(tag, list): - result['tags'] = tag - else: - result['tag'] = tag +def get_service_dashboard_data(data, datasource, tag): + func = partial( + get_service_dashboard_data_single, + datasource=datasource, + tag=tag) - return result + yield from map(func, data.items()) diff --git a/changelog.md b/changelog.md index 83be12dc4d605de42daa9b14dd1e3f52053e33be..9e18d0e84b7c90051590877f1dd167c2793e74d0 100644 --- a/changelog.md +++ b/changelog.md @@ -2,6 +2,11 @@ All notable changes to this project will be documented in this file. +## [0.72] - 2024-11-05 +- Refactor codebase to remove use of multiple python processes, and only use threads for requests to Grafana. +- Simplify provision_maybe functionality to be local to the process, since we only have one gunicorn process now as well. +- Fetch all existing dashboards per folder in advance, to avoid fetching the dashboards individually (by UID or search by title) when creating the dashboards. + ## [0.71] - 2024-11-04 - Remove use of feature not available in python 3.6 - Shutdown executors explicitly when we are done with them diff --git a/setup.py b/setup.py index a93a92c6fcac223204fee6e96ed69dec95d442a7..fbb3dcb8c9099ff3733ae2d9010f04c2642f7a47 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup, find_packages setup( name='brian-dashboard-manager', - version="0.71", + version="0.72", author='GEANT', author_email='swd@geant.org', description='', diff --git a/test/conftest.py b/test/conftest.py index 963d78c118f63a0415f612b75a42d8bc95f333d3..205fb89cd2384cd1b66f3f45fac5013da3fb7124 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -7,11 +7,11 @@ import pathlib import re import string import threading -from brian_dashboard_manager import environment -from brian_dashboard_manager.grafana.utils.request import TokenRequest import pytest import brian_dashboard_manager import responses +from brian_dashboard_manager import environment +from brian_dashboard_manager.grafana.utils.request import TokenRequest @pytest.fixture diff --git a/test/test_aggregrate.py b/test/test_aggregrate.py index b70cd58042ba42ba75e089cd47cba094108c30b1..74f7d6486f40f093a7355971273201c1817fd163 100644 --- a/test/test_aggregrate.py +++ b/test/test_aggregrate.py @@ -323,17 +323,24 @@ def generate_folder(data): @responses.activate def test_provision_aggregate(data_config, mocker, client): - def create_dashboard(_, dash, folder=None): + def create_dashboard(_, dash, *args, folder=None, **kwargs): return dash + def list_dashboards(_, folder): + return [] + mocker.patch( 'brian_dashboard_manager.grafana.provision.create_dashboard', create_dashboard) + mocker.patch( + 'brian_dashboard_manager.grafana.provision.list_folder_dashboards', + list_dashboards) + request = TokenRequest(**data_config, token='test') fake_folder = generate_folder({'uid': 'aggtest', 'title': 'aggtest'}) result = provision_aggregate(request, fake_folder, TEST_DASHBOARD, - 'test_datasource') + 'test_datasource', {}) panels = result['panels'] expected_title = f'Aggregate - {TEST_DASHBOARD["dashboard_name"]}' assert result['title'] == expected_title diff --git a/test/test_update.py b/test/test_update.py index 5cbd8e38f5fa10d5e9554919cf5ec0fc3460da4f..dddb906efc933f31c00bff38832ffeb7a0545374 100644 --- a/test/test_update.py +++ b/test/test_update.py @@ -1,6 +1,6 @@ import pytest import responses - +from concurrent.futures import ThreadPoolExecutor from brian_dashboard_manager.grafana.provision import provision_folder, provision from brian_dashboard_manager.inventory_provider.interfaces import get_nren_regions from brian_dashboard_manager.services.api import fetch_services @@ -722,19 +722,19 @@ def populate_inventory(get_test_data, data_config): @pytest.mark.parametrize( "folder_name, excluded_nrens, expected_nrens", [ - ("NREN Access", [], ['CESNET', 'GEANT', 'KIAE', 'LITNET', 'SWITCH']), - ("NREN Access", ["GEANT", "KIAE"], ['CESNET', 'LITNET', 'SWITCH']), + ("NREN Access", [], {'CESNET', 'GEANT', 'KIAE', 'LITNET', 'SWITCH'}), + ("NREN Access", ["GEANT", "KIAE"], {'CESNET', 'LITNET', 'SWITCH'}), ( - "NREN Access", - [], - ["LITNET", "CESNET", "GEANT", "KIAE", "SWITCH"], + "NREN Access", + [], + {"LITNET", "CESNET", "GEANT", "KIAE", "SWITCH"}, ), ( - "NREN Access", - ["GEANT"], - ["LITNET", "CESNET", "KIAE", "SWITCH"], + "NREN Access", + ["GEANT"], + {"LITNET", "CESNET", "KIAE", "SWITCH"}, ), - ("testfolder", ["GEANT"], ["KIAE", "SWITCH"]), + ("testfolder", ["GEANT"], {"KIAE", "SWITCH"}), ], ) def test_provision_nren_folder( @@ -774,7 +774,8 @@ def test_provision_nren_folder( services = fetch_services(data_config['reporting_provider']) regions = get_nren_regions(data_config['inventory_provider']) - result = provision_folder( + result = [f.result() for f in provision_folder( + ThreadPoolExecutor(), mock_grafana.request, folder_name, dashboards["NREN"], @@ -782,10 +783,10 @@ def test_provision_nren_folder( regions, "testdatasource", excluded_nrens, - ) - assert len(result) == len(expected_nrens) - for i, nren in enumerate(result): - assert result[i]["title"] in expected_nrens + )] + nrens = set([r["title"] for r in result]) + assert nrens == expected_nrens + for i, nren in enumerate(nrens): if "NREN" in folder_name: # Every NREN dashboard must have at least 4 panels # (3 default panels and 1 per ifc) @@ -829,7 +830,7 @@ def test_provision( 'brian_dashboard_manager.grafana.provision.get_gws_indirect') _mocked_gws_indirect.return_value = [] - provision(data_config, raise_exceptions=True) + provision(data_config) @responses.activate @@ -868,7 +869,7 @@ def test_provision_re_peer_dashboard( data_config["organizations"] = [ {"name": "Testorg1", "excluded_nrens": ["GEANT"], "excluded_dashboards": []}, ] - provision(data_config, raise_exceptions=True) + provision(data_config) folder_uid = "RE_Peer" assert len(mock_grafana.dashboards_by_folder_uid[folder_uid]) == 1 panels = mock_grafana.dashboards_by_folder_uid[folder_uid][0]["panels"] @@ -930,7 +931,8 @@ def test_provision_nren_category( services = fetch_services(data_config['reporting_provider']) regions = get_nren_regions(data_config['inventory_provider']) - result = provision_folder( + result = list(provision_folder( + ThreadPoolExecutor(), mock_grafana.request, folder_name, dashboards[dashboard_id], @@ -938,5 +940,5 @@ def test_provision_nren_category( regions, "testdatasource", [], - ) + )) assert len(result) == expected_dashboard_count