import copy import datetime import itertools import json import os import pathlib import re import string import threading import pytest import brian_dashboard_manager import responses from brian_dashboard_manager import environment from brian_dashboard_manager.grafana.utils.request import TokenRequest @pytest.fixture def data_config(): return { "admin_username": "fakeadmin", "admin_password": "fakeadmin", "hostname": "myfakehostname.org", "inventory_provider": "http://inventory-provider01.geant.org:8080", "reporting_provider": "http://hgf-reporting-provider.geant.org:1337", "organizations": [ { "name": "Testorg1", "excluded_nrens": ['GEANT'], "excluded_dashboards": [] }, { "name": "GÉANT Testorg2", "excluded_nrens": ['GEANT'], "excluded_dashboards": [] }, { "name": "NRENsTestorg3", "excluded_nrens": ['GEANT'], "excluded_dashboards": [] }, { "name": "General Public", "excluded_nrens": ["JISC", "PSNC"], "excluded_dashboards": [] } ], "datasources": { "influxdb": { "name": "PollerInfluxDB", "type": "influxdb", "access": "proxy", "url": "http://prod-poller-ui01.geant.org:8086", "database": "poller", "basicAuth": False, "isDefault": True, "readOnly": False } }, "ignored_folders": ['fakefolder'] } DATA_DIR = pathlib.Path(__file__).parent / "data" @pytest.fixture def get_test_data(): def _get_test_data(filename): return json.loads((DATA_DIR / filename).read_text()) return _get_test_data @pytest.fixture def data_config_filename(data_config, tmp_path): file = tmp_path / "data_config.json" file.write_text(json.dumps(data_config)) return str(file) @pytest.fixture def client(mocker, data_config_filename): mocker.patch.object(environment, "setup_logging") os.environ["CONFIG_FILENAME"] = data_config_filename with brian_dashboard_manager.create_app().test_client() as c: yield c @pytest.fixture def mock_grafana(data_config): def uid_generator(): return ( "".join(result) for result in itertools.permutations(string.ascii_lowercase, 8) ) lock = threading.RLock() def synchronized(fun): def _sync(*args, **kwargs): with lock: return fun(*args, **kwargs) return _sync class MockGrafana: def __init__(self) -> None: self.folders = {} self.dashboards = {} self.dashboards_by_folder_uid = {} self.datasources = {} self.organizations = {} self.api_tokens = {} self.uids = uid_generator() self.ids = itertools.count(start=1) self.request = TokenRequest(data_config["hostname"], token="abc") @synchronized def list_api_tokens(self): return list(copy.copy(val) for val in self.api_tokens.values()) @synchronized def create_api_token(self, payload): lifetime = payload.get("secondsToLive") return self._create_object( { **payload, "key": "key", "expiration": ( ( datetime.datetime.utcnow() + datetime.timedelta(lifetime) ).strftime("%Y-%m-%dT%H:%M:%SZ") if lifetime is not None else None ), }, self.api_tokens, ) @synchronized def delete_api_token(self, uid): return self._delete_object(uid, self.api_tokens, name="api token") @synchronized def list_organizations(self): return list(copy.copy(val) for val in self.organizations.values()) @synchronized def create_organization(self, organization): organization = { **organization, "id": next(self.ids), "uid": str(organization.get("uid") or next(self.uids)), } self.organizations[organization["uid"]] = organization return { "orgId": organization["id"], "message": "Organization created", } @synchronized def delete_organization(self, uid): return self._delete_object(uid, self.organizations, name="organization") @synchronized def list_datasources(self): return list(copy.copy(val) for val in self.datasources.values()) @synchronized def create_datasource(self, datasource): return self._create_object(datasource, self.datasources) @synchronized def delete_datasource(self, uid): return self._delete_object(uid, self.datasources, name="datasource") @synchronized def list_folders(self): return list(self.folders.values()) @synchronized def create_folder(self, folder): return self._create_object(folder, self.folders) @synchronized def delete_folder(self, uid): return self._delete_object(uid, self.folders, name="folder") @synchronized def list_dashboards(self, title=None, folder_id=None): return [ copy.copy(db) for db in self.dashboards.values() if (title is None or db["title"] == title) and (folder_id is None or db.get("folderId") == folder_id) ] @synchronized def create_dashboard(self, dashboard, folder_id=None): result = self._create_object(dashboard, self.dashboards) folder_uid = next( iter(f["uid"] for f in self.folders.values() if f["id"] == folder_id), None, ) for idx, db in enumerate(self.dashboards_by_folder_uid.get(folder_uid, [])): if db["uid"] == result["uid"]: self.dashboards_by_folder_uid[folder_uid].pop(idx) self.dashboards_by_folder_uid[folder_uid].insert(idx, result) break else: self.dashboards_by_folder_uid.setdefault(folder_uid, []).append(result) return result @synchronized def delete_dashboard(self, uid): result = self._delete_object(uid, self.dashboards, name="dashboard") for dashboards in self.dashboards_by_folder_uid.values(): for idx, db in enumerate(dashboards): if db["uid"] == result["uid"]: dashboards.pop(idx) break return result def _create_object(self, obj, all_objects): id = obj.get("id") uid = obj.get("uid") or next(self.uids) obj = { 'url': '/fake/grafana/' + str(uid), **obj, "id": id if id is not None else next(self.ids), "uid": str(uid), } all_objects[obj["uid"]] = obj return obj def _delete_object(self, uid, all_objects, name="object"): del all_objects[uid] return {"message": f"deleted {name}"} grafana = MockGrafana() url_prefix = f".*{data_config['hostname']}/" def json_request_cb(fun): def _wrapped(request): payload = json.loads(request.body) if request.body else None status, result = fun(payload, request) return (status, {}, json.dumps(result)) return _wrapped # --- Api Tokens --- responses.add_callback( method=responses.GET, url=re.compile(url_prefix + r"api/auth/keys?.+$"), callback=json_request_cb(lambda p, b: (200, grafana.list_api_tokens())), ) @json_request_cb def create_api_token_callback(payload, request): return (200, grafana.create_api_token(payload)) responses.add_callback( method=responses.POST, url=re.compile(url_prefix + "api/auth/keys"), callback=create_api_token_callback, ) @json_request_cb def delete_api_token_callback(payload, request): id = int(request.path_url.split("/")[-1]) for uid, ds in grafana.api_tokens.items(): if ds["id"] == id: return (200, grafana.delete_api_token(uid)) return (404, "") responses.add_callback( method=responses.DELETE, url=re.compile(url_prefix + r"api/auth/keys/.+$"), callback=delete_api_token_callback, ) # --- Organizations --- responses.add_callback( method=responses.GET, url=re.compile(url_prefix + "api/orgs"), callback=json_request_cb(lambda p, b: (200, grafana.list_organizations())), ) @json_request_cb def create_organization_callback(body, _): return (200, grafana.create_organization(body)) responses.add_callback( method=responses.POST, url=re.compile(url_prefix + "api/orgs"), callback=create_organization_callback, ) # --- Datasources --- responses.add_callback( method=responses.GET, url=re.compile(url_prefix + "api/datasources"), callback=json_request_cb(lambda p, b: (200, grafana.list_datasources())), ) @json_request_cb def create_datasource_callback(payload, _): return (200, grafana.create_datasource(payload)) responses.add_callback( method=responses.POST, url=re.compile(url_prefix + "api/datasources"), callback=create_datasource_callback, ) @json_request_cb def delete_datasource_callback(payload, request): name = request.path_url.split("/")[-1] for uid, ds in grafana.datasources.items(): if ds["name"] == name: return (200, {}, grafana.delete_datasource(uid)) return (404, {}, "") responses.add_callback( method=responses.DELETE, url=re.compile(url_prefix + r"api/datasources/name/.+$"), callback=delete_datasource_callback, ) # --- Folders --- responses.add_callback( method=responses.GET, url=re.compile(url_prefix + "api/folders"), callback=json_request_cb(lambda p, b: (200, grafana.list_folders())), ) @json_request_cb def create_folder_callback(payload, _): return (200, grafana.create_folder(payload)) responses.add_callback( method=responses.POST, url=re.compile(url_prefix + "api/folders"), callback=create_folder_callback, ) @json_request_cb def delete_folder_callback(payload, request): uid = request.path_url.split("/")[-1] try: return (200, grafana.delete_folder(uid)) except KeyError: return (404, {}) responses.add_callback( method=responses.DELETE, url=re.compile(url_prefix + r"api/folders/.+$"), callback=delete_folder_callback, ) # --- Dashboards --- @json_request_cb def get_dashboard_callback(payload, request): uid = request.path_url.split("/")[-1] try: return (200, {"dashboard": grafana.dashboards[uid]}) except KeyError: return (404, {}) responses.add_callback( method=responses.GET, url=re.compile(url_prefix + r"api/dashboards/uid/.+$"), callback=get_dashboard_callback, ) @json_request_cb def search_dashboard_callback(_, request): query = request.params return ( 200, grafana.list_dashboards(query.get("title"), query.get("folderIds", query.get('folderUIDs'))), ) responses.add_callback( method=responses.GET, url=re.compile(url_prefix + "api/search"), callback=search_dashboard_callback, ) @json_request_cb def create_dashboard_callback(payload, request): dashboard = payload["dashboard"] folder_id = payload.get("folderId") return (200, grafana.create_dashboard(dashboard, folder_id=folder_id)) responses.add_callback( method=responses.POST, url=re.compile(url_prefix + "api/dashboards/db"), callback=create_dashboard_callback, ) @json_request_cb def delete_dashboard_callback(payload, request): uid = request.path_url.split("/")[-1] try: return (200, grafana.delete_dashboard(uid)) except KeyError: return (404, {}) responses.add_callback( method=responses.DELETE, url=re.compile(url_prefix + r"api/dashboards/uid/.+$"), callback=delete_dashboard_callback, ) # --- Other --- responses.add( method=responses.PUT, url=re.compile(url_prefix + "api/org/preferences"), json={"message": "Preferences updated"}, ) responses.add( method=responses.POST, url=re.compile(url_prefix + r"api/user/using/.+$"), json={"message": "ok"}, ) return grafana