-
Bjarke Madsen authored
Fix datasource comparison after 11.2 Add more useful logging for deletion of dashboards
Bjarke Madsen authoredFix datasource comparison after 11.2 Add more useful logging for deletion of dashboards
organization.py 6.33 KiB
"""
Grafana Organization management helpers.
"""
import logging
import random
import string
from requests.exceptions import HTTPError
from datetime import datetime
from typing import Dict, List, Union
from brian_dashboard_manager.grafana.dashboard import create_dashboard
from brian_dashboard_manager.grafana.utils.request import AdminRequest, TokenRequest
from brian_dashboard_manager.templating.homedashboard import render_homedashboard
logger = logging.getLogger(__name__)
def switch_active_organization(request: AdminRequest, org_id: int):
"""
Switches the active organization for the current session.
:param request: AdminRequest object
:param org_id: organization ID
:return: response JSON
"""
assert org_id
logger.debug(f'Switched {str(request)} active organization to #{org_id}')
return request.post(f'api/user/using/{org_id}', {}).json()
def get_organizations(request: AdminRequest) -> List[Dict]:
"""
Returns all organizations.
:param request: AdminRequest object
:return: list of organization definitions
"""
return request.get('api/orgs').json()
def create_organization(request: AdminRequest, name: str) -> Union[Dict, None]:
"""
Creates a new organization with the given name.
:param request: AdminRequest object
:param name: organization name
:return: organization definition or None if unsuccessful
"""
assert name
result = request.post('api/orgs', json={
'name': name
}).json()
if result.get('message', '').lower() == 'organization created':
id = result.get('orgId')
logger.info(f'Created organization `{name}` with ID #{id}')
return {'id': id, 'name': name}
else:
return None
def create_api_token(request: AdminRequest, org_id: int, key_data=None):
"""
Creates a new API token for the given organization.
:param request: AdminRequest object
:param org_id: organization ID
:param key_data: additional key data
:return: API token definition
"""
characters = string.ascii_uppercase + string.digits
name = ''.join(random.choices(characters, k=16))
data = {
'name': name,
'role': 'Admin',
'secondsToLive': 3600 # 60 minutes
}
if key_data:
data.update(key_data)
switch_active_organization(request, org_id)
result = request.post('api/auth/keys', json=data).json()
token_id = result.get('id')
logger.debug(f'Created API token #{token_id} for organization #{org_id}')
return result
def delete_api_token(request: AdminRequest, token_id: int, org_id=None):
"""
Deletes an API token.
:param request: AdminRequest object
:param token_id: API token ID
:param org_id: organization ID
:return: delete response
"""
assert token_id is not None
if org_id:
switch_active_organization(request, org_id)
result = request.delete(f'api/auth/keys/{token_id}')
logger.debug(f'Deleted API token #{token_id}')
return result
def delete_expired_api_tokens(request: AdminRequest) -> bool:
"""
Deletes all expired API tokens.
:param request: AdminRequest object
:return: True if successful
"""
tokens = request.get(
'api/auth/keys', params={'includeExpired': True}).json()
now = datetime.utcnow()
def is_expired(token):
date = datetime.strptime(token['expiration'], '%Y-%m-%dT%H:%M:%SZ')
return date < now
expired_tokens = [t for t in tokens if 'expiration' in t and is_expired(t)]
for token in expired_tokens:
delete_api_token(request, token['id'])
return True
def get_or_create_service_account(request: AdminRequest, org_id):
"""
Gets a service account for the given organization, or creates one if it does not exist.
:param request: AdminRequest object
:param org_id: organization ID
:param name: service account name
:return: service account definition
"""
switch_active_organization(request, org_id)
# get provision service account, if it exists
try:
service_accounts = request.get('api/serviceaccounts?perpage=10&page=1&query=provision').json()
if service_accounts and service_accounts.get('totalCount') > 0:
service_account = service_accounts.get('serviceAccounts')[0]
return service_account
except HTTPError as e:
if e.response.status_code != 404:
raise e
# create a service account for provisioning
try:
result = request.post(
'api/serviceaccounts', json={
'name': 'provision',
'role': 'Admin',
'isDisabled': False,
}).json()
except HTTPError as e:
print(e)
logger.info(f'Created provision service account for organization #{org_id}')
return result
def create_service_account_token(request: AdminRequest, service_account_id: int):
"""
Creates a new API token for the given service account.
:param request: AdminRequest object
:param service_account_id: service account ID
:return: Token definition
"""
data = {
'name': f'provision-token-{datetime.now().isoformat()}',
}
result = request.post(f'api/serviceaccounts/{service_account_id}/tokens', json=data).json()
token_id = result.get('id')
logger.debug(f'Created API token #{token_id} for service account #{service_account_id}')
return result
def delete_service_account(request: AdminRequest, service_account_id: int):
"""
Deletes a service account with the given ID.
:param request: AdminRequest object
:param service_account_id: service account ID
:return: delete response
"""
assert service_account_id is not None
result = request.delete(f'api/serviceaccounts/{service_account_id}')
logger.debug(f'Deleted service account #{service_account_id}')
return result
def set_home_dashboard(request: TokenRequest, is_staff):
"""
Sets the home dashboard for the organization
the API token is registered to.
:param request: TokenRequest object
:param is_staff: True if the organization is the staff organization
:return: True if successful
"""
payload = render_homedashboard(staff=is_staff)
dashboard = create_dashboard(request, payload)
request.put('api/org/preferences', json={
'homeDashboardId': dashboard.get('id')
}).json()
return dashboard