Select Git revision
conftest.py
conftest.py 5.42 KiB
import json
import os
import random
import re
import tempfile
import jsonschema
import pytest
import responses
def _load_test_data(filename):
full_path = os.path.join(
os.path.dirname(__file__),
'data', filename)
with open(full_path) as f:
return f.read()
@pytest.fixture
def config():
with tempfile.TemporaryDirectory() as state_dir_name:
yield {
'inventory': [
'http://bogus-inventory01.xxx.yyy:12345',
'http://bogus-inventory02.xxx.yyy:12345',
'http://bogus-inventory03.xxx.yyy:12345'
],
'sensu': {
'api-base': [
'https://bogus-sensu01.xxx.yyy:12345',
'https://bogus-sensu02.xxx.yyy:12345',
'https://bogus-sensu03.xxx.yyy:12345'
],
'token': 'abc-token-blah-blah',
'interface-check': {
'script': '/var/lib/sensu/bin/counter2influx.sh',
'measurement': 'counters',
'interval': 300,
'subscriptions': ['interfacecounters'],
'output_metric_handlers': ['influx-db-handler'],
'namespace': 'default',
'round_robin': True,
'command': ('{script} {measurement} '
'{community} {hostname} '
'{interface} {ifIndex}'),
}
},
'statedir': state_dir_name
}
@pytest.fixture
def mocked_sensu():
saved_sensu_checks = {
c['metadata']['name']: c
for c in json.loads(_load_test_data('checks.json'))}
_check_schema = {
'$schema': 'http://json-schema.org/draft-07/schema#',
'type': 'object',
'properties': {
'command': {'type': 'string'},
'interval': {'type': 'integer'},
'subscriptions': {'type': 'array', 'items': {'type': 'string'}},
'proxy_entity_name': {'type': 'string'},
'round_robin': {'type': 'boolean'},
'output_metric_format': {'enum': ['influxdb_line']},
'output_metric_handlers': {
'type': 'array',
'items': {'type': 'string'}
},
'metadata': {
'type': 'object',
'properties': {
'name': {'type': 'string'},
'namespace': {'type': 'string'}
},
'required': ['name', 'namespace'],
'additionalProperties': True
},
},
'required': [
'command', 'interval', 'subscriptions',
'output_metric_format', 'output_metric_handlers',
'round_robin', 'metadata'],
'additionalProperties': True
}
# mocked api for returning all checks
responses.add(
method=responses.GET,
url=re.compile(r'.*sensu.+/api/core/v2/namespaces/[^\/]+/checks$'),
json=list(saved_sensu_checks.values())
)
def new_check_callback(request):
check = json.loads(request.body)
jsonschema.validate(check, _check_schema)
path_elems = request.path_url.split('/')
assert len(path_elems) == 7 # sanity
assert path_elems[5] == check['metadata']['namespace']
assert check['metadata']['name'] not in saved_sensu_checks
saved_sensu_checks[check['metadata']['name']] = check
return (201, {}, '')
# mocked api for creating a check
responses.add_callback(
method=responses.POST,
url=re.compile(r'.*sensu.+/api/core/v2/namespaces/[^\/]+/checks$'),
callback=new_check_callback)
def update_check_callback(request):
check = json.loads(request.body)
jsonschema.validate(check, _check_schema)
path_elems = request.path_url.split('/')
assert len(path_elems) == 8 # sanity
assert path_elems[5] == check['metadata']['namespace']
assert path_elems[-1] == check['metadata']['name']
assert check['metadata']['name'] in saved_sensu_checks, \
'we only intend to call this method for updating existing checks'
saved_sensu_checks[check['metadata']['name']] = check
return 201, {}, ''
# mocked api for updating a check
responses.add_callback(
method=responses.PUT,
url=re.compile(
r'.*sensu.+/api/core/v2/namespaces/[^\/]+/checks/[^\/]+$'),
callback=update_check_callback)
def delete_check_callback(request):
path_elems = request.path_url.split('/')
assert len(path_elems) == 8 # sanity
del saved_sensu_checks[path_elems[-1]]
return 204, {}, ''
# mocked api for deleting a check
responses.add_callback(
method=responses.DELETE,
url=re.compile(
r'.*sensu.+/api/core/v2/namespaces/[^\/]+/checks/[^\/]+$'),
callback=delete_check_callback)
yield saved_sensu_checks
@pytest.fixture
def mocked_inventory():
# mocked api for returning all checks
responses.add(
method=responses.GET,
url=re.compile(r'.*inventory.+/poller/interfaces.*'),
body=_load_test_data('interfaces.json'))
bogus_version = {'latch': {'timestamp': 10000 * random.random()}}
# mocked api for returning all checks
responses.add(
method=responses.GET,
url=re.compile(r'.*inventory.+/version.*'),
body=json.dumps(bogus_version))