Skip to content
Snippets Groups Projects
conftest.py 13.18 KiB
import copy
import datetime
import itertools
import json
import os
import pathlib
import re
import string
import threading
import pytest
import brian_dashboard_manager
import responses
from brian_dashboard_manager import environment
from brian_dashboard_manager.grafana.utils.request import TokenRequest


@pytest.fixture
def data_config():
    return {
        "admin_username": "fakeadmin",
        "admin_password": "fakeadmin",
        "hostname": "myfakehostname.org",
        "inventory_provider": "http://inventory-provider01.geant.org:8080",
        "reporting_provider": "http://hgf-reporting-provider.geant.org:1337",
        "organizations": [
            {
                "name": "Testorg1",
                "excluded_nrens": ['GEANT'],
                "excluded_dashboards": []
            },
            {
                "name": "GÉANT Testorg2",
                "excluded_nrens": ['GEANT'],
                "excluded_dashboards": []
            },
            {
                "name": "NRENsTestorg3",
                "excluded_nrens": ['GEANT'],
                "excluded_dashboards": []
            },
            {
                "name": "General Public",
                "excluded_nrens": ["JISC", "PSNC"],
                "excluded_dashboards": []
            }
        ],
        "datasources": {
            "influxdb": {
                "name": "PollerInfluxDB",
                "type": "influxdb",
                "access": "proxy",
                "url": "http://prod-poller-ui01.geant.org:8086",
                "database": "poller",
                "basicAuth": False,
                "isDefault": True,
                "readOnly": False
            }
        },
        "ignored_folders": ['fakefolder']
    }


DATA_DIR = pathlib.Path(__file__).parent / "data"


@pytest.fixture
def get_test_data():
    def _get_test_data(filename):
        return json.loads((DATA_DIR / filename).read_text())

    return _get_test_data


@pytest.fixture
def data_config_filename(data_config, tmp_path):
    file = tmp_path / "data_config.json"
    file.write_text(json.dumps(data_config))
    return str(file)


@pytest.fixture
def client(mocker, data_config_filename):
    mocker.patch.object(environment, "setup_logging")
    os.environ["CONFIG_FILENAME"] = data_config_filename
    with brian_dashboard_manager.create_app().test_client() as c:
        yield c


@pytest.fixture
def mock_grafana(data_config):
    def uid_generator():
        return (
            "".join(result)
            for result in itertools.permutations(string.ascii_lowercase, 8)
        )

    lock = threading.RLock()

    def synchronized(fun):
        def _sync(*args, **kwargs):
            with lock:
                return fun(*args, **kwargs)

        return _sync

    class MockGrafana:
        def __init__(self) -> None:
            self.folders = {}
            self.dashboards = {}
            self.dashboards_by_folder_uid = {}
            self.datasources = {}
            self.organizations = {}
            self.api_tokens = {}
            self.uids = uid_generator()
            self.ids = itertools.count(start=1)
            self.request = TokenRequest(data_config["hostname"], token="abc")

        @synchronized
        def list_api_tokens(self):
            return list(copy.copy(val) for val in self.api_tokens.values())

        @synchronized
        def create_api_token(self, payload):
            lifetime = payload.get("secondsToLive")
            return self._create_object(
                {
                    **payload,
                    "key": "key",
                    "expiration": (
                        (
                            datetime.datetime.utcnow() + datetime.timedelta(lifetime)
                        ).strftime("%Y-%m-%dT%H:%M:%SZ")
                        if lifetime is not None
                        else None
                    ),
                },
                self.api_tokens,
            )

        @synchronized
        def delete_api_token(self, uid):
            return self._delete_object(uid, self.api_tokens, name="api token")

        @synchronized
        def list_organizations(self):
            return list(copy.copy(val) for val in self.organizations.values())

        @synchronized
        def create_organization(self, organization):
            organization = {
                **organization,
                "id": next(self.ids),
                "uid": str(organization.get("uid") or next(self.uids)),
            }
            self.organizations[organization["uid"]] = organization
            return {
                "orgId": organization["id"],
                "message": "Organization created",
            }

        @synchronized
        def delete_organization(self, uid):
            return self._delete_object(uid, self.organizations, name="organization")

        @synchronized
        def list_datasources(self):
            return list(copy.copy(val) for val in self.datasources.values())

        @synchronized
        def create_datasource(self, datasource):
            return self._create_object(datasource, self.datasources)

        @synchronized
        def delete_datasource(self, uid):
            return self._delete_object(uid, self.datasources, name="datasource")

        @synchronized
        def list_folders(self):
            return list(self.folders.values())

        @synchronized
        def create_folder(self, folder):
            return self._create_object(folder, self.folders)

        @synchronized
        def delete_folder(self, uid):
            return self._delete_object(uid, self.folders, name="folder")

        @synchronized
        def list_dashboards(self, title=None, folder_id=None):
            return [
                copy.copy(db)
                for db in self.dashboards.values()
                if (title is None or db["title"] == title)
                and (folder_id is None or db.get("folderId") == folder_id)
            ]

        @synchronized
        def create_dashboard(self, dashboard, folder_id=None):
            result = self._create_object(dashboard, self.dashboards)
            folder_uid = next(
                iter(f["uid"] for f in self.folders.values() if f["id"] == folder_id),
                None,
            )
            for idx, db in enumerate(self.dashboards_by_folder_uid.get(folder_uid, [])):
                if db["uid"] == result["uid"]:
                    self.dashboards_by_folder_uid[folder_uid].pop(idx)
                    self.dashboards_by_folder_uid[folder_uid].insert(idx, result)
                    break
            else:
                self.dashboards_by_folder_uid.setdefault(folder_uid, []).append(result)
            return result

        @synchronized
        def delete_dashboard(self, uid):
            result = self._delete_object(uid, self.dashboards, name="dashboard")
            for dashboards in self.dashboards_by_folder_uid.values():
                for idx, db in enumerate(dashboards):
                    if db["uid"] == result["uid"]:
                        dashboards.pop(idx)
                        break
            return result

        def _create_object(self, obj, all_objects):
            id = obj.get("id")
            uid = obj.get("uid") or next(self.uids)
            obj = {
                'url': '/fake/grafana/' + str(uid),
                **obj,
                "id": id if id is not None else next(self.ids),
                "uid": str(uid),
            }
            all_objects[obj["uid"]] = obj
            return obj

        def _delete_object(self, uid, all_objects, name="object"):
            del all_objects[uid]
            return {"message": f"deleted {name}"}

    grafana = MockGrafana()

    url_prefix = f".*{data_config['hostname']}/"

    def json_request_cb(fun):
        def _wrapped(request):
            payload = json.loads(request.body) if request.body else None
            status, result = fun(payload, request)
            return (status, {}, json.dumps(result))

        return _wrapped

    # --- Api Tokens ---
    responses.add_callback(
        method=responses.GET,
        url=re.compile(url_prefix + r"api/auth/keys?.+$"),
        callback=json_request_cb(lambda p, b: (200, grafana.list_api_tokens())),
    )

    @json_request_cb
    def create_api_token_callback(payload, request):
        return (200, grafana.create_api_token(payload))

    responses.add_callback(
        method=responses.POST,
        url=re.compile(url_prefix + "api/auth/keys"),
        callback=create_api_token_callback,
    )

    @json_request_cb
    def delete_api_token_callback(payload, request):
        id = int(request.path_url.split("/")[-1])
        for uid, ds in grafana.api_tokens.items():
            if ds["id"] == id:
                return (200, grafana.delete_api_token(uid))
        return (404, "")

    responses.add_callback(
        method=responses.DELETE,
        url=re.compile(url_prefix + r"api/auth/keys/.+$"),
        callback=delete_api_token_callback,
    )
    # --- Organizations ---
    responses.add_callback(
        method=responses.GET,
        url=re.compile(url_prefix + "api/orgs"),
        callback=json_request_cb(lambda p, b: (200, grafana.list_organizations())),
    )

    @json_request_cb
    def create_organization_callback(body, _):
        return (200, grafana.create_organization(body))

    responses.add_callback(
        method=responses.POST,
        url=re.compile(url_prefix + "api/orgs"),
        callback=create_organization_callback,
    )

    # --- Datasources ---
    responses.add_callback(
        method=responses.GET,
        url=re.compile(url_prefix + "api/datasources"),
        callback=json_request_cb(lambda p, b: (200, grafana.list_datasources())),
    )

    @json_request_cb
    def create_datasource_callback(payload, _):
        return (200, grafana.create_datasource(payload))

    responses.add_callback(
        method=responses.POST,
        url=re.compile(url_prefix + "api/datasources"),
        callback=create_datasource_callback,
    )

    @json_request_cb
    def delete_datasource_callback(payload, request):
        name = request.path_url.split("/")[-1]
        for uid, ds in grafana.datasources.items():
            if ds["name"] == name:
                return (200, {}, grafana.delete_datasource(uid))
        return (404, {}, "")

    responses.add_callback(
        method=responses.DELETE,
        url=re.compile(url_prefix + r"api/datasources/name/.+$"),
        callback=delete_datasource_callback,
    )
    # --- Folders ---
    responses.add_callback(
        method=responses.GET,
        url=re.compile(url_prefix + "api/folders"),
        callback=json_request_cb(lambda p, b: (200, grafana.list_folders())),
    )

    @json_request_cb
    def create_folder_callback(payload, _):
        return (200, grafana.create_folder(payload))

    responses.add_callback(
        method=responses.POST,
        url=re.compile(url_prefix + "api/folders"),
        callback=create_folder_callback,
    )

    @json_request_cb
    def delete_folder_callback(payload, request):
        uid = request.path_url.split("/")[-1]
        try:
            return (200, grafana.delete_folder(uid))
        except KeyError:
            return (404, {})

    responses.add_callback(
        method=responses.DELETE,
        url=re.compile(url_prefix + r"api/folders/.+$"),
        callback=delete_folder_callback,
    )

    # --- Dashboards ---
    @json_request_cb
    def get_dashboard_callback(payload, request):
        uid = request.path_url.split("/")[-1]
        try:
            return (200, {"dashboard": grafana.dashboards[uid]})
        except KeyError:
            return (404, {})

    responses.add_callback(
        method=responses.GET,
        url=re.compile(url_prefix + r"api/dashboards/uid/.+$"),
        callback=get_dashboard_callback,
    )

    @json_request_cb
    def search_dashboard_callback(_, request):
        query = request.params
        return (
            200,
            grafana.list_dashboards(query.get("title"), query.get("folderIds", query.get('folderUIDs'))),
        )

    responses.add_callback(
        method=responses.GET,
        url=re.compile(url_prefix + "api/search"),
        callback=search_dashboard_callback,
    )

    @json_request_cb
    def create_dashboard_callback(payload, request):
        dashboard = payload["dashboard"]
        folder_id = payload.get("folderId")

        return (200, grafana.create_dashboard(dashboard, folder_id=folder_id))

    responses.add_callback(
        method=responses.POST,
        url=re.compile(url_prefix + "api/dashboards/db"),
        callback=create_dashboard_callback,
    )

    @json_request_cb
    def delete_dashboard_callback(payload, request):
        uid = request.path_url.split("/")[-1]
        try:
            return (200, grafana.delete_dashboard(uid))
        except KeyError:
            return (404, {})

    responses.add_callback(
        method=responses.DELETE,
        url=re.compile(url_prefix + r"api/dashboards/uid/.+$"),
        callback=delete_dashboard_callback,
    )

    # --- Other ---
    responses.add(
        method=responses.PUT,
        url=re.compile(url_prefix + "api/org/preferences"),
        json={"message": "Preferences updated"},
    )
    responses.add(
        method=responses.POST,
        url=re.compile(url_prefix + r"api/user/using/.+$"),
        json={"message": "ok"},
    )
    return grafana