Skip to content
Snippets Groups Projects
Commit 08bcc139 authored by Bjarke Madsen's avatar Bjarke Madsen
Browse files

Implement datasource provisioning

parent 915e1009
No related branches found
No related tags found
No related merge requests found
import logging
import re
import os
import json
from typing import Dict, List
from requests.exceptions import HTTPError
from brian_dashboard_manager.grafana.utils.request import Request, TokenRequest
logger = logging.getLogger(__name__)
def _datasource_provisioned(datasource_to_check: Dict, provisioned_datasources: List[Dict]):
if len(datasource_to_check.keys()) == 0:
return True
for datasource in provisioned_datasources:
exists = all(datasource.get(key) == datasource_to_check.get(key)
for key in datasource_to_check)
if exists:
return True
return False
def get_missing_datasource_definitions(request: Request, dir=None):
datasource_dir = dir or os.path.join(
os.path.dirname(__file__), '../../datasources/')
provisioned_datasources = get_datasources(request)
for (dirpath, _, filenames) in os.walk(datasource_dir): # pragma: no cover
for file in filenames:
if file.endswith('.json'):
filename = os.path.join(dirpath, file)
datasource = json.load(open(filename, 'r'))
if not _datasource_provisioned(datasource, provisioned_datasources):
yield datasource
def get_datasources(request: Request):
return request.get('api/datasources')
def create_datasource(request: TokenRequest, datasource: Dict, environment):
try:
datasource["url"] = re.sub(
'test|uat|prod', environment, datasource["url"])
r = request.post('api/datasources', json=datasource)
except HTTPError as e:
logger.error('Error when provisioning datasource: ' + e.response.text)
return None
return r
def delete_datasource(request: TokenRequest, name: str):
return request.delete(f'api/datasources/name/{name}')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment