Skip to content
Snippets Groups Projects
organization.py 6.33 KiB
"""
Grafana Organization management helpers.

"""

import logging
import random
import string
from requests.exceptions import HTTPError
from datetime import datetime
from typing import Dict, List, Union

from brian_dashboard_manager.grafana.dashboard import create_dashboard
from brian_dashboard_manager.grafana.utils.request import AdminRequest, TokenRequest
from brian_dashboard_manager.templating.homedashboard import render_homedashboard

logger = logging.getLogger(__name__)


def switch_active_organization(request: AdminRequest, org_id: int):
    """
    Switches the active organization for the current session.

    :param request: AdminRequest object
    :param org_id: organization ID
    :return: response JSON
    """

    assert org_id

    logger.debug(f'Switched {str(request)} active organization to #{org_id}')
    return request.post(f'api/user/using/{org_id}', {}).json()


def get_organizations(request: AdminRequest) -> List[Dict]:
    """
    Returns all organizations.

    :param request: AdminRequest object
    :return: list of organization definitions
    """

    return request.get('api/orgs').json()


def create_organization(request: AdminRequest, name: str) -> Union[Dict, None]:
    """
    Creates a new organization with the given name.

    :param request: AdminRequest object
    :param name: organization name
    :return: organization definition or None if unsuccessful
    """

    assert name

    result = request.post('api/orgs', json={
        'name': name
    }).json()

    if result.get('message', '').lower() == 'organization created':
        id = result.get('orgId')
        logger.info(f'Created organization `{name}` with ID #{id}')
        return {'id': id, 'name': name}
    else:
        return None


def create_api_token(request: AdminRequest, org_id: int, key_data=None):
    """
    Creates a new API token for the given organization.

    :param request: AdminRequest object
    :param org_id: organization ID
    :param key_data: additional key data
    :return: API token definition
    """

    characters = string.ascii_uppercase + string.digits
    name = ''.join(random.choices(characters, k=16))
    data = {
        'name': name,
        'role': 'Admin',
        'secondsToLive': 3600  # 60 minutes
    }
    if key_data:
        data.update(key_data)

    switch_active_organization(request, org_id)
    result = request.post('api/auth/keys', json=data).json()
    token_id = result.get('id')

    logger.debug(f'Created API token #{token_id} for organization #{org_id}')

    return result


def delete_api_token(request: AdminRequest, token_id: int, org_id=None):
    """
    Deletes an API token.

    :param request: AdminRequest object
    :param token_id: API token ID
    :param org_id: organization ID
    :return: delete response
    """

    assert token_id is not None
    if org_id:
        switch_active_organization(request, org_id)
    result = request.delete(f'api/auth/keys/{token_id}')
    logger.debug(f'Deleted API token #{token_id}')
    return result


def delete_expired_api_tokens(request: AdminRequest) -> bool:
    """
    Deletes all expired API tokens.

    :param request: AdminRequest object
    :return: True if successful
    """

    tokens = request.get(
        'api/auth/keys', params={'includeExpired': True}).json()

    now = datetime.utcnow()

    def is_expired(token):
        date = datetime.strptime(token['expiration'], '%Y-%m-%dT%H:%M:%SZ')
        return date < now

    expired_tokens = [t for t in tokens if 'expiration' in t and is_expired(t)]

    for token in expired_tokens:
        delete_api_token(request, token['id'])
    return True


def get_or_create_service_account(request: AdminRequest, org_id):
    """
    Gets a service account for the given organization, or creates one if it does not exist.

    :param request: AdminRequest object
    :param org_id: organization ID
    :param name: service account name
    :return: service account definition
    """
    switch_active_organization(request, org_id)

    # get provision service account, if it exists
    try:
        service_accounts = request.get('api/serviceaccounts?perpage=10&page=1&query=provision').json()

        if service_accounts and service_accounts.get('totalCount') > 0:
            service_account = service_accounts.get('serviceAccounts')[0]
            return service_account
    except HTTPError as e:
        if e.response.status_code != 404:
            raise e

    # create a service account for provisioning
    try:
        result = request.post(
            'api/serviceaccounts', json={
                'name': 'provision',
                'role': 'Admin',
                'isDisabled': False,
            }).json()
    except HTTPError as e:
        print(e)

    logger.info(f'Created provision service account for organization #{org_id}')
    return result


def create_service_account_token(request: AdminRequest, service_account_id: int):
    """
    Creates a new API token for the given service account.

    :param request: AdminRequest object
    :param service_account_id: service account ID
    :return: Token definition
    """
    data = {
        'name': f'provision-token-{datetime.now().isoformat()}',
    }

    result = request.post(f'api/serviceaccounts/{service_account_id}/tokens', json=data).json()
    token_id = result.get('id')

    logger.debug(f'Created API token #{token_id} for service account #{service_account_id}')

    return result


def delete_service_account(request: AdminRequest, service_account_id: int):
    """
    Deletes a service account with the given ID.

    :param request: AdminRequest object
    :param service_account_id: service account ID
    :return: delete response
    """

    assert service_account_id is not None
    result = request.delete(f'api/serviceaccounts/{service_account_id}')
    logger.debug(f'Deleted service account #{service_account_id}')
    return result


def set_home_dashboard(request: TokenRequest, is_staff):
    """
    Sets the home dashboard for the organization
    the API token is registered to.

    :param request: TokenRequest object
    :param is_staff: True if the organization is the staff organization
    :return: True if successful
    """
    payload = render_homedashboard(staff=is_staff)
    dashboard = create_dashboard(request, payload)
    request.put('api/org/preferences', json={
        'homeDashboardId': dashboard.get('id')
    }).json()
    return dashboard