Skip to content
Snippets Groups Projects
Commit 1a1849a5 authored by Bjarke Madsen's avatar Bjarke Madsen
Browse files

flake8 fixup

parent 2f114fd8
No related branches found
No related tags found
No related merge requests found
......@@ -14,7 +14,15 @@ CONFIG_SCHEMA = {
"organizations": {"type": "array", "items": {"type": "string"}},
"BRIAN_ENVIRONMENT": {"type": "string"}
},
"required": ["admin_username", "admin_password", "hostname", "listen_port", "grafana_port", "organizations", "BRIAN_ENVIRONMENT"],
"required": [
"admin_username",
"admin_password",
"hostname",
"listen_port",
"grafana_port",
"organizations",
"BRIAN_ENVIRONMENT"
],
"additionalProperties": False
}
......
......@@ -21,19 +21,20 @@ def get_dashboard_definitions(dir=None): # pragma: no cover
yield dashboard
# Deletes a single dashboard for the organization the API token is registered to.
# Deletes a single dashboard for the organization
# the API token is registered to.
def _delete_dashboard(request: TokenRequest, uid: int):
try:
r = request.delete(f'api/dashboards/uid/{uid}')
if r and 'deleted' in r.get('message', ''):
return r
except HTTPError as e:
logger.error(
f'Error when deleting dashboard with UID #{uid}: {e.response.text}')
except HTTPError:
logger.exception(f'Error when deleting dashboard with UID #{uid}')
return None
# Deletes all dashboards for the organization the API token is registered to.
# Deletes all dashboards for the organization
# the API token is registered to.
def delete_dashboards(request: TokenRequest):
r = request.get('api/search')
if r and len(r) > 0:
......@@ -42,7 +43,8 @@ def delete_dashboards(request: TokenRequest):
return True
# Searches Grafana for a dashboard matching the title of the provided dashboard.
# Searches Grafana for a dashboard
# matching the title of the provided dashboard.
def _search_dashboard(request: TokenRequest, dashboard: Dict):
try:
r = request.get('api/search', params={
......@@ -90,12 +92,12 @@ def create_dashboard(request: TokenRequest, dashboard: Dict):
'dashboard': dashboard,
'overwrite': False
}
title = dashboard["title"]
try:
logger.info(
f'{"Updating" if existing_dashboard else "Provisioning"} dashboard: {dashboard["title"]}')
action = "Updating" if existing_dashboard else "Provisioning"
logger.info(f'{action} dashboard: {title}')
r = request.post('api/dashboards/db', json=payload)
return r
except HTTPError as e:
logger.error(
f'Error when provisioning dashboard {dashboard["title"]}: {e.response.text}')
except HTTPError:
logger.exception(f'Error when provisioning dashboard {title}')
return None
......@@ -2,7 +2,7 @@ import logging
import re
import os
import json
from typing import Dict, List
from typing import Dict
from requests.exceptions import HTTPError
from brian_dashboard_manager.grafana.utils.request import Request, TokenRequest
......@@ -11,7 +11,7 @@ from brian_dashboard_manager.grafana.utils.request import Request, TokenRequest
logger = logging.getLogger(__name__)
def _datasource_provisioned(datasource_to_check: Dict, provisioned_datasources: List[Dict]):
def _datasource_provisioned(datasource_to_check, provisioned_datasources):
if len(datasource_to_check.keys()) == 0:
return True
for datasource in provisioned_datasources:
......@@ -25,15 +25,19 @@ def _datasource_provisioned(datasource_to_check: Dict, provisioned_datasources:
def get_missing_datasource_definitions(request: Request, dir=None):
datasource_dir = dir or os.path.join(
os.path.dirname(__file__), '../../datasources/')
existing_datasources = get_datasources(request)
def check_ds_not_provisioned(filename):
datasource = json.load(open(filename, 'r'))
if not _datasource_provisioned(datasource, existing_datasources):
return datasource
provisioned_datasources = get_datasources(request)
for (dirpath, _, filenames) in os.walk(datasource_dir): # pragma: no cover
for file in filenames:
if file.endswith('.json'):
filename = os.path.join(dirpath, file)
datasource = json.load(open(filename, 'r'))
if not _datasource_provisioned(datasource, provisioned_datasources):
yield datasource
if not file.endswith('.json'):
continue
filename = os.path.join(dirpath, file)
yield check_ds_not_provisioned(filename)
def get_datasources(request: Request):
......@@ -45,8 +49,8 @@ def create_datasource(request: TokenRequest, datasource: Dict, environment):
datasource["url"] = re.sub(
'test|uat|prod', environment, datasource["url"])
r = request.post('api/datasources', json=datasource)
except HTTPError as e:
logger.error('Error when provisioning datasource: ' + e.response.text)
except HTTPError:
logger.exception('Error when provisioning datasource')
return None
return r
......
......@@ -43,8 +43,10 @@ def delete_organization(request: AdminRequest, id: int) -> bool:
def create_api_token(request: AdminRequest, org_id: int, key_data=None):
characters = string.ascii_uppercase + string.digits
name = ''.join(random.choices(characters, k=16))
data = {
'name': ''.join(random.choices(string.ascii_uppercase + string.digits, k=16)),
'name': name,
'role': 'Admin',
'secondsToLive': 3600 # 60 minutes
}
......@@ -75,8 +77,12 @@ def delete_expired_api_tokens(request: AdminRequest, org_id: int) -> bool:
tokens = request.get('api/auth/keys', params={'includeExpired': True})
now = datetime.now()
expired_tokens = [t for t in tokens if 'expiration' in t
and datetime.strptime(t['expiration'], '%Y-%m-%dT%H:%M:%SZ') < now]
def is_expired(token):
date = datetime.strptime(token['expiration'], '%Y-%m-%dT%H:%M:%SZ')
return date < now
expired_tokens = [t for t in tokens if 'expiration' in t and is_expired(t)]
for token in expired_tokens:
delete_api_token(request, org_id, token['id'])
......
import logging
from brian_dashboard_manager.grafana.utils.request import AdminRequest, TokenRequest
from brian_dashboard_manager.grafana.organization import get_organizations, create_organization, create_api_token, delete_api_token, delete_expired_api_tokens
from brian_dashboard_manager.grafana.dashboard import get_dashboard_definitions, create_dashboard
from brian_dashboard_manager.grafana.datasource import get_missing_datasource_definitions, create_datasource
from brian_dashboard_manager.grafana.utils.request import \
AdminRequest, \
TokenRequest
from brian_dashboard_manager.grafana.organization import \
get_organizations, \
create_organization, \
create_api_token, \
delete_api_token, \
delete_expired_api_tokens
from brian_dashboard_manager.grafana.dashboard import \
get_dashboard_definitions, \
create_dashboard
from brian_dashboard_manager.grafana.datasource import \
get_missing_datasource_definitions,\
create_datasource
logger = logging.getLogger(__name__)
......@@ -31,8 +43,9 @@ def provision(config):
# Provision missing data sources
for datasource in get_missing_datasource_definitions(token_request):
ds = create_datasource(
token_request, datasource, config.get('BRIAN_ENVIRONMENT', 'test'))
ds = create_datasource(token_request,
datasource,
config.get('BRIAN_ENVIRONMENT', 'test'))
if ds:
logger.info(f'Provisioned datasource: {datasource["name"]}')
......
......@@ -53,10 +53,11 @@ class Request(object):
class AdminRequest(Request):
def __init__(self, hostname, grafana_port, admin_username, admin_password, **kwargs):
def __init__(self, hostname, grafana_port, admin_username, admin_password,
**kwargs):
self.username = admin_username
super().__init__(
f'http://{admin_username}:{admin_password}@{hostname}:{grafana_port}/')
url = f'{admin_username}:{admin_password}@{hostname}:{grafana_port}/'
super().__init__('http://' + url)
def __str__(self):
return f'admin user: {self.username}'
......
......@@ -13,7 +13,12 @@ def data_config():
"hostname": "localhost",
"listen_port": 65001,
"grafana_port": 65005,
"organizations": ['Testorg1', 'GÉANT Testorg2', 'NRENsTestorg3', 'General Public'],
"organizations": [
'Testorg1',
'GÉANT Testorg2',
'NRENsTestorg3',
'General Public'
],
"BRIAN_ENVIRONMENT": "test"
}
......
......@@ -144,11 +144,12 @@ def test_create_dashboard(data_config):
dashboard = {'id': ID, 'uid': UID, 'title': TITLE, 'version': VERSION}
request = TokenRequest(**data_config, token='test')
def get_callback(request):
return 200, {}, json.dumps({'dashboard': dashboard})
responses.add_callback(method=responses.GET,
url=request.BASE_URL + f'api/dashboards/uid/{UID}',
callback=lambda f: (200,
{},
json.dumps({'dashboard': dashboard})))
callback=get_callback)
responses.add_callback(
method=responses.GET,
......
......@@ -22,41 +22,42 @@ def test_get_datasources(data_config):
@responses.activate
def test_get_missing_datasource_definitions(data_config):
# this only retrieves data from the filesystem and checks against
# what's configured in grafana.. just make sure it fetches datasources
# what's configured in grafana.. just make sure
# we cover the part for fetching datasources
request = AdminRequest(**data_config)
responses.add(method=responses.GET, url=request.BASE_URL +
'api/datasources', json=[])
'api/datasources')
dir = '/tmp/dirthatreallyshouldnotexistsousealonganduniquestring'
# it returns a generator, so iterate :)
for data in provision.get_missing_datasource_definitions(
request, '/tmp/dirthatreallyshouldnotexistsousealonganduniquestring'):
for data in provision.get_missing_datasource_definitions(request, dir):
pass
def test_datasource_provisioned():
provisioned = datasource._datasource_provisioned({}, [])
assert provisioned
provisioned = datasource._datasource_provisioned({'id': 1}, [])
assert provisioned is False
provisioned = datasource._datasource_provisioned({'id': 1, "name": 'testcasetwo'},
[{'id': -1, 'name': 'testcaseone'},
{'id': 1, 'name': 'testcaseone'}])
assert provisioned is False
provisioned = datasource._datasource_provisioned({'id': 1},
[{'id': -1, 'name': 'testcaseone'},
{'id': 1, 'name': 'testcasetwo'}])
assert provisioned
provisioned = datasource._datasource_provisioned({'id': 2, "name": 'testcasetwo'},
[{'id': -1, 'name': 'testcaseone'},
{'id': 1, 'name': 'testcaseone'},
{'id': 2, 'name': 'testcasetwo'}])
assert provisioned
val = datasource._datasource_provisioned({}, [])
assert val
val = datasource._datasource_provisioned({'id': 1}, [])
assert val is False
val = datasource._datasource_provisioned({'id': 1, "name": 'testcase2'},
[{'id': -1, 'name': 'testcase1'},
{'id': 1, 'name': 'testcase1'}])
assert val is False
val = datasource._datasource_provisioned({'id': 1},
[{'id': -1, 'name': 'testcase1'},
{'id': 1, 'name': 'testcase2'}])
assert val
val = datasource._datasource_provisioned({'id': 2, "name": 'testcase2'},
[{'id': -1, 'name': 'testcase1'},
{'id': 1, 'name': 'testcase1'},
{'id': 2, 'name': 'testcase2'}])
assert val
@responses.activate
......
......@@ -2,15 +2,18 @@ import pytest
import responses
import requests
import json
from brian_dashboard_manager.grafana.utils.request import AdminRequest, TokenRequest
from brian_dashboard_manager.grafana.utils.request import \
AdminRequest, \
TokenRequest
@responses.activate
def test_admin_request(data_config):
ENDPOINT = 'test/url/endpoint'
request = AdminRequest(**data_config)
assert request.BASE_URL == 'http://{admin_username}:{admin_password}@{hostname}:{grafana_port}/'.format(
**data_config)
url = '{admin_username}:{admin_password}@{hostname}:{grafana_port}/'. \
format(**data_config)
assert request.BASE_URL == 'http://' + url
assert request.username == data_config['admin_username']
def get_callback(request):
......
......@@ -50,9 +50,10 @@ def test_provision(data_config, mocker, client):
_mocked_create_api_token.return_value = {
'key': 'testtoken', 'id': 0} # api token
_mocked_get_missing_datasource_definitions = mocker.patch(
'brian_dashboard_manager.grafana.provision.get_missing_datasource_definitions')
_mocked_get_missing_datasource_definitions.return_value = TEST_DATASOURCE # test datasource
_mocked_get_missing_ds_defs = mocker.patch(
'brian_dashboard_manager.grafana.provision.' +
'get_missing_datasource_definitions')
_mocked_get_missing_ds_defs.return_value = TEST_DATASOURCE
_mocked_create_datasource = mocker.patch(
'brian_dashboard_manager.grafana.provision.create_datasource')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment