-
Bjarke Madsen authoredBjarke Madsen authored
conftest.py 13.18 KiB
import copy
import datetime
import itertools
import json
import os
import pathlib
import re
import string
import threading
import pytest
import brian_dashboard_manager
import responses
from brian_dashboard_manager import environment
from brian_dashboard_manager.grafana.utils.request import TokenRequest
@pytest.fixture
def data_config():
return {
"admin_username": "fakeadmin",
"admin_password": "fakeadmin",
"hostname": "myfakehostname.org",
"inventory_provider": "http://inventory-provider01.geant.org:8080",
"reporting_provider": "http://hgf-reporting-provider.geant.org:1337",
"organizations": [
{
"name": "Testorg1",
"excluded_nrens": ['GEANT'],
"excluded_dashboards": []
},
{
"name": "GÉANT Testorg2",
"excluded_nrens": ['GEANT'],
"excluded_dashboards": []
},
{
"name": "NRENsTestorg3",
"excluded_nrens": ['GEANT'],
"excluded_dashboards": []
},
{
"name": "General Public",
"excluded_nrens": ["JISC", "PSNC"],
"excluded_dashboards": []
}
],
"datasources": {
"influxdb": {
"name": "PollerInfluxDB",
"type": "influxdb",
"access": "proxy",
"url": "http://prod-poller-ui01.geant.org:8086",
"database": "poller",
"basicAuth": False,
"isDefault": True,
"readOnly": False
}
},
"ignored_folders": ['fakefolder']
}
DATA_DIR = pathlib.Path(__file__).parent / "data"
@pytest.fixture
def get_test_data():
def _get_test_data(filename):
return json.loads((DATA_DIR / filename).read_text())
return _get_test_data
@pytest.fixture
def data_config_filename(data_config, tmp_path):
file = tmp_path / "data_config.json"
file.write_text(json.dumps(data_config))
return str(file)
@pytest.fixture
def client(mocker, data_config_filename):
mocker.patch.object(environment, "setup_logging")
os.environ["CONFIG_FILENAME"] = data_config_filename
with brian_dashboard_manager.create_app().test_client() as c:
yield c
@pytest.fixture
def mock_grafana(data_config):
def uid_generator():
return (
"".join(result)
for result in itertools.permutations(string.ascii_lowercase, 8)
)
lock = threading.RLock()
def synchronized(fun):
def _sync(*args, **kwargs):
with lock:
return fun(*args, **kwargs)
return _sync
class MockGrafana:
def __init__(self) -> None:
self.folders = {}
self.dashboards = {}
self.dashboards_by_folder_uid = {}
self.datasources = {}
self.organizations = {}
self.api_tokens = {}
self.uids = uid_generator()
self.ids = itertools.count(start=1)
self.request = TokenRequest(data_config["hostname"], token="abc")
@synchronized
def list_api_tokens(self):
return list(copy.copy(val) for val in self.api_tokens.values())
@synchronized
def create_api_token(self, payload):
lifetime = payload.get("secondsToLive")
return self._create_object(
{
**payload,
"key": "key",
"expiration": (
(
datetime.datetime.utcnow() + datetime.timedelta(lifetime)
).strftime("%Y-%m-%dT%H:%M:%SZ")
if lifetime is not None
else None
),
},
self.api_tokens,
)
@synchronized
def delete_api_token(self, uid):
return self._delete_object(uid, self.api_tokens, name="api token")
@synchronized
def list_organizations(self):
return list(copy.copy(val) for val in self.organizations.values())
@synchronized
def create_organization(self, organization):
organization = {
**organization,
"id": next(self.ids),
"uid": str(organization.get("uid") or next(self.uids)),
}
self.organizations[organization["uid"]] = organization
return {
"orgId": organization["id"],
"message": "Organization created",
}
@synchronized
def delete_organization(self, uid):
return self._delete_object(uid, self.organizations, name="organization")
@synchronized
def list_datasources(self):
return list(copy.copy(val) for val in self.datasources.values())
@synchronized
def create_datasource(self, datasource):
return self._create_object(datasource, self.datasources)
@synchronized
def delete_datasource(self, uid):
return self._delete_object(uid, self.datasources, name="datasource")
@synchronized
def list_folders(self):
return list(self.folders.values())
@synchronized
def create_folder(self, folder):
return self._create_object(folder, self.folders)
@synchronized
def delete_folder(self, uid):
return self._delete_object(uid, self.folders, name="folder")
@synchronized
def list_dashboards(self, title=None, folder_id=None):
return [
copy.copy(db)
for db in self.dashboards.values()
if (title is None or db["title"] == title)
and (folder_id is None or db.get("folderId") == folder_id)
]
@synchronized
def create_dashboard(self, dashboard, folder_id=None):
result = self._create_object(dashboard, self.dashboards)
folder_uid = next(
iter(f["uid"] for f in self.folders.values() if f["id"] == folder_id),
None,
)
for idx, db in enumerate(self.dashboards_by_folder_uid.get(folder_uid, [])):
if db["uid"] == result["uid"]:
self.dashboards_by_folder_uid[folder_uid].pop(idx)
self.dashboards_by_folder_uid[folder_uid].insert(idx, result)
break
else:
self.dashboards_by_folder_uid.setdefault(folder_uid, []).append(result)
return result
@synchronized
def delete_dashboard(self, uid):
result = self._delete_object(uid, self.dashboards, name="dashboard")
for dashboards in self.dashboards_by_folder_uid.values():
for idx, db in enumerate(dashboards):
if db["uid"] == result["uid"]:
dashboards.pop(idx)
break
return result
def _create_object(self, obj, all_objects):
id = obj.get("id")
uid = obj.get("uid") or next(self.uids)
obj = {
'url': '/fake/grafana/' + str(uid),
**obj,
"id": id if id is not None else next(self.ids),
"uid": str(uid),
}
all_objects[obj["uid"]] = obj
return obj
def _delete_object(self, uid, all_objects, name="object"):
del all_objects[uid]
return {"message": f"deleted {name}"}
grafana = MockGrafana()
url_prefix = f".*{data_config['hostname']}/"
def json_request_cb(fun):
def _wrapped(request):
payload = json.loads(request.body) if request.body else None
status, result = fun(payload, request)
return (status, {}, json.dumps(result))
return _wrapped
# --- Api Tokens ---
responses.add_callback(
method=responses.GET,
url=re.compile(url_prefix + r"api/auth/keys?.+$"),
callback=json_request_cb(lambda p, b: (200, grafana.list_api_tokens())),
)
@json_request_cb
def create_api_token_callback(payload, request):
return (200, grafana.create_api_token(payload))
responses.add_callback(
method=responses.POST,
url=re.compile(url_prefix + "api/auth/keys"),
callback=create_api_token_callback,
)
@json_request_cb
def delete_api_token_callback(payload, request):
id = int(request.path_url.split("/")[-1])
for uid, ds in grafana.api_tokens.items():
if ds["id"] == id:
return (200, grafana.delete_api_token(uid))
return (404, "")
responses.add_callback(
method=responses.DELETE,
url=re.compile(url_prefix + r"api/auth/keys/.+$"),
callback=delete_api_token_callback,
)
# --- Organizations ---
responses.add_callback(
method=responses.GET,
url=re.compile(url_prefix + "api/orgs"),
callback=json_request_cb(lambda p, b: (200, grafana.list_organizations())),
)
@json_request_cb
def create_organization_callback(body, _):
return (200, grafana.create_organization(body))
responses.add_callback(
method=responses.POST,
url=re.compile(url_prefix + "api/orgs"),
callback=create_organization_callback,
)
# --- Datasources ---
responses.add_callback(
method=responses.GET,
url=re.compile(url_prefix + "api/datasources"),
callback=json_request_cb(lambda p, b: (200, grafana.list_datasources())),
)
@json_request_cb
def create_datasource_callback(payload, _):
return (200, grafana.create_datasource(payload))
responses.add_callback(
method=responses.POST,
url=re.compile(url_prefix + "api/datasources"),
callback=create_datasource_callback,
)
@json_request_cb
def delete_datasource_callback(payload, request):
name = request.path_url.split("/")[-1]
for uid, ds in grafana.datasources.items():
if ds["name"] == name:
return (200, {}, grafana.delete_datasource(uid))
return (404, {}, "")
responses.add_callback(
method=responses.DELETE,
url=re.compile(url_prefix + r"api/datasources/name/.+$"),
callback=delete_datasource_callback,
)
# --- Folders ---
responses.add_callback(
method=responses.GET,
url=re.compile(url_prefix + "api/folders"),
callback=json_request_cb(lambda p, b: (200, grafana.list_folders())),
)
@json_request_cb
def create_folder_callback(payload, _):
return (200, grafana.create_folder(payload))
responses.add_callback(
method=responses.POST,
url=re.compile(url_prefix + "api/folders"),
callback=create_folder_callback,
)
@json_request_cb
def delete_folder_callback(payload, request):
uid = request.path_url.split("/")[-1]
try:
return (200, grafana.delete_folder(uid))
except KeyError:
return (404, {})
responses.add_callback(
method=responses.DELETE,
url=re.compile(url_prefix + r"api/folders/.+$"),
callback=delete_folder_callback,
)
# --- Dashboards ---
@json_request_cb
def get_dashboard_callback(payload, request):
uid = request.path_url.split("/")[-1]
try:
return (200, {"dashboard": grafana.dashboards[uid]})
except KeyError:
return (404, {})
responses.add_callback(
method=responses.GET,
url=re.compile(url_prefix + r"api/dashboards/uid/.+$"),
callback=get_dashboard_callback,
)
@json_request_cb
def search_dashboard_callback(_, request):
query = request.params
return (
200,
grafana.list_dashboards(query.get("title"), query.get("folderIds", query.get('folderUIDs'))),
)
responses.add_callback(
method=responses.GET,
url=re.compile(url_prefix + "api/search"),
callback=search_dashboard_callback,
)
@json_request_cb
def create_dashboard_callback(payload, request):
dashboard = payload["dashboard"]
folder_id = payload.get("folderId")
return (200, grafana.create_dashboard(dashboard, folder_id=folder_id))
responses.add_callback(
method=responses.POST,
url=re.compile(url_prefix + "api/dashboards/db"),
callback=create_dashboard_callback,
)
@json_request_cb
def delete_dashboard_callback(payload, request):
uid = request.path_url.split("/")[-1]
try:
return (200, grafana.delete_dashboard(uid))
except KeyError:
return (404, {})
responses.add_callback(
method=responses.DELETE,
url=re.compile(url_prefix + r"api/dashboards/uid/.+$"),
callback=delete_dashboard_callback,
)
# --- Other ---
responses.add(
method=responses.PUT,
url=re.compile(url_prefix + "api/org/preferences"),
json={"message": "Preferences updated"},
)
responses.add(
method=responses.POST,
url=re.compile(url_prefix + r"api/user/using/.+$"),
json={"message": "ok"},
)
return grafana