Skip to content
Snippets Groups Projects
Commit d52096ef authored by Erik Reid's avatar Erik Reid
Browse files

e2e test with influx container

parent ad8944c6
No related branches found
No related tags found
No related merge requests found
import click
import contextlib
from functools import partial
import json
import click
import jsonschema
from brian_polling_manager.interface_stats import config, brian, juniper
......@@ -119,7 +120,7 @@ def _main(app_config_params: dict):
netconf_doc=_ncdoc,
measurement_name=influx_params['measurement'])
with influx.influx_client(influx_params) as client:
with contextlib.closing(influx.influx_client(influx_params)) as client:
client.write_points(points)
......
......@@ -9,5 +9,6 @@ influxdb
pytest
responses
PyYAML
sphinx
sphinx-rtd-theme
import concurrent.futures
import contextlib
from functools import partial
import itertools
import json
import logging
import os
import random
import re
import socket
import subprocess
import tempfile
import time
from unittest.mock import patch
import jsonschema
from lxml import etree
import pytest
import responses
import yaml
from brian_polling_manager.interface_stats import \
PHYSICAL_INTERFACE_COUNTER_SCHEMA, LOGICAL_INTERFACE_COUNTER_SCHEMA
from brian_polling_manager.interface_stats import brian, juniper, cli
from brian_polling_manager import inventory, influx
# logging.basicConfig(level=logging.INFO)
# logging.getLogger('ncclient').setLevel(level=logging.WARNING)
DATA_DIRNAME = os.path.join(os.path.dirname(__file__), 'data', 'interface-info-snapshots')
DATA_FILENAME_EXTENSION = '-interface-info.xml'
......@@ -28,6 +33,21 @@ ROUTERS = [
]
CONTAINER_UP_TIMEOUT_S = 40
CONTAINER_HEALTH_TIMEOUT_S = 60
CONTAINER_DOWN_TIMEOUT_S = 40
@pytest.fixture
def free_host_port():
with contextlib.closing(
socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as _s:
_s.bind(('', 0))
_s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
return _s.getsockname()[1]
@pytest.fixture
def ifc_netconf_rpc():
def _mocked_netconf_rpc(*args, **kwargs):
......@@ -56,20 +76,16 @@ def test_validate_logical_counters_schema(router_fqdn, ifc_netconf_rpc):
jsonschema.validate(ifc, LOGICAL_INTERFACE_COUNTER_SCHEMA)
def mock_poller_interfaces():
# note: responses.activate doesn't seem to work if this is a fixture
with open(os.path.join(DATA_DIRNAME, 'poller-interfaces.json')) as f:
responses.add(
method=responses.GET,
url=re.compile(r'.*/poller/interfaces$'),
body=f.read(),
status=200,
content_type="application/json")
@pytest.fixture
def mocked_poller_interfaces():
with patch('brian_polling_manager.inventory.load_interfaces') as rpc:
with open(os.path.join(DATA_DIRNAME, 'poller-interfaces.json')) as f:
rpc.return_value = json.load(f)
yield # keep the patch in place until caller context finishe
def poller_interfaces():
mock_poller_interfaces()
@pytest.fixture
def polled_interfaces(mocked_poller_interfaces):
polled = {}
for ifc in inventory.load_interfaces(['https://bogus-hostname']):
if ifc['dashboards']:
......@@ -78,16 +94,14 @@ def poller_interfaces():
return polled
@responses.activate
@pytest.mark.xfail # srx's are missing from the test data
def test_sanity_check_snapshot_data(poller_interfaces):
def test_sanity_check_snapshot_data(polled_interfaces):
"""
verify that all routers with interfaces to be polled
are in the test data set
:return:
"""
expected_polled = poller_interfaces()
missing_routers = set(expected_polled().keys()) - set(ROUTERS)
missing_routers = set(polled_interfaces().keys()) - set(ROUTERS)
assert len(missing_routers) == 0
......@@ -100,9 +114,8 @@ def _is_enabled(ifc_name, ifc_doc):
return admin_status == 'up' and oper_status == 'up'
@responses.activate
@pytest.mark.parametrize('router_fqdn', ROUTERS)
def test_verify_all_interfaces_present(router_fqdn, ifc_netconf_rpc):
def test_verify_all_interfaces_present(router_fqdn, ifc_netconf_rpc, polled_interfaces):
"""
verify that all the interfaces we expect to poll
are available in the netconf data
......@@ -110,15 +123,14 @@ def test_verify_all_interfaces_present(router_fqdn, ifc_netconf_rpc):
a snapshot of inventory /poller/interfaces
(the snapshots were all taken around the same time)
"""
expected_polled = poller_interfaces()
if router_fqdn not in expected_polled:
if router_fqdn not in polled_interfaces:
pytest.xfail(f'{router_fqdn} has no expected polled interfaces')
doc = juniper.get_interface_info_ncrpc(router_fqdn)
phy = juniper.physical_interface_counters(doc)
log = juniper.logical_interface_counters(doc)
interfaces = set(x['name'] for x in itertools.chain(phy, log))
missing_interfaces = expected_polled[router_fqdn] - interfaces
missing_interfaces = polled_interfaces[router_fqdn] - interfaces
for ifc_name in missing_interfaces:
# verify that any missing interfaces are admin/oper disabled
assert not _is_enabled(ifc_name, doc)
......@@ -162,7 +174,12 @@ def test_logical_brian_points(router_fqdn, ifc_netconf_rpc):
@pytest.fixture
def app_config_params():
def app_config_params(free_host_port):
testenv_dbname = 'testdb' # can't contain dashes
testenv_username = 'bogus-user'
testenv_password = 'bogus-password'
with tempfile.NamedTemporaryFile() as f:
yield {
'ssh-config': f.name,
......@@ -173,38 +190,34 @@ def app_config_params():
],
'influx': {
'brian-counters': {
'hostname': 'bogus-hostname',
'port': 8086,
'database': 'bogus-database',
'measurement': 'bogus-measurement',
'username': 'bogus-username',
'password': 'bogus-password',
'ssl': True
'hostname': 'localhost',
'port': free_host_port,
'database': testenv_dbname,
'measurement': 'testenv_brian_counters',
'username': testenv_username,
'password': testenv_password,
'ssl': False
},
'error-counters': {
'hostname': 'bogus-hostname',
'port': 8086,
'database': 'bogus-database',
'measurement': 'bogus-measurement',
'username': 'bogus-username',
'password': 'bogus-password',
'ssl': True
'hostname': 'localhost',
'port': free_host_port,
'database': testenv_dbname,
'measurement': 'testenv_error_counters',
'username': testenv_username,
'password': testenv_password,
'ssl': False
},
}
}
@responses.activate
def test_validate_all_points(app_config_params, ifc_netconf_rpc):
def _mocked_managed_routers(inventory_base_urls):
all_routers = {_ifc['router'] for _ifc in inventory.load_interfaces(['https://abc'])}
return [_ifc for _ifc in all_routers if not _ifc.startswith('srx')]
def test_validate_all_points(app_config_params, ifc_netconf_rpc, mocked_poller_interfaces):
with patch('brian_polling_manager.interface_stats.cli._managed_routers') as rpc:
rpc.side_effect = _mocked_managed_routers
all_routers = {_ifc['router'] for _ifc in inventory.load_interfaces(['https://abc'])}
# TODO: add srx's to test data
rpc.return_value = [_ifc for _ifc in all_routers if not _ifc.startswith('srx')]
mock_poller_interfaces()
nc_doc_map = cli.fqdn_netconf_doc_map(app_config_params)
influx_params = app_config_params['influx']['brian-counters']
for _fqdn, _ncdoc in nc_doc_map.items():
......@@ -213,3 +226,226 @@ def test_validate_all_points(app_config_params, ifc_netconf_rpc):
netconf_doc=_ncdoc,
measurement_name=influx_params['measurement']):
jsonschema.validate(_p, influx.INFLUX_POINT)
@pytest.fixture
def brian_influx_container_params(app_config_params):
container_name = f'brian-influxdb-{random.randint(1000,10000)}'
# sanity (using same container for both)
assert app_config_params['influx']['brian-counters']['port'] \
== app_config_params['influx']['error-counters']['port']
assert app_config_params['influx']['brian-counters']['hostname'] \
== app_config_params['influx']['error-counters']['hostname']
influx_params = app_config_params['influx']['brian-counters']
with tempfile.TemporaryDirectory() as influx_data_dirname:
os.chmod(influx_data_dirname, 0o777)
yield {
'image': 'bitnami/influxdb:1.8.5-debian-10-r197',
'user': f'{os.getuid()}',
'container_name': container_name,
# this image serves on 2222
'ports': [f'{influx_params["port"]}:8086'],
'environment': {
'INFLUXDB_HTTP_AUTH_ENABLED': 'true',
'INFLUXDB_ADMIN_USER': 'admin123',
'INFLUXDB_ADMIN_USER_PASSWORD': 'secret',
'INFLUXDB_USER': influx_params['username'],
'INFLUXDB_USER_PASSWORD': influx_params['password'],
'INFLUXDB_DB': influx_params['database'], # no dashes!!
'INFLUXDB_LOGGING_LEVEL': 'debug'
},
'volumes': [
f'{influx_data_dirname}:/bitnami/influxdb'
],
'healthcheck': {
'interval': '2s',
'timeout': '2s',
'retries': 2,
'test': [
'CMD',
'curl',
'-f',
'http://localhost:8086/ping'
]
}
}
@pytest.fixture
def testenv_compose_params(brian_influx_container_params):
compose_config = {
'version': '3.8',
'services': {
'influx': brian_influx_container_params,
}
}
with tempfile.NamedTemporaryFile(mode='w', suffix='.yml') as f:
yaml.dump(compose_config, f)
f.flush()
yield {
'compose_filename': f.name,
'container': brian_influx_container_params['container_name']
}
def _output_or_die(args):
# easier to use run, but python 3.6 doesn't yet
# have capture_output (so may as well use Popen)
dc = subprocess.Popen(
args,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
encoding='utf-8'
)
output_info, _ = dc.communicate(timeout=CONTAINER_UP_TIMEOUT_S)
assert dc.returncode == 0, output_info
return output_info
def _wait_for_health_proc(container_name):
def _healthy():
args = ['docker', 'inspect', container_name]
# just die badly in case of any problems
# (e.g. json decoding error, no health in output)
inspect_output = json.loads(_output_or_die(args))
# barf if this element isn't the output
# ... we require a health check in the container
return inspect_output[0]['State']['Health']['Status'] == 'healthy'
start = time.time()
while True:
if time.time() - start > CONTAINER_HEALTH_TIMEOUT_S:
break
if _healthy():
return True
time.sleep(1)
return False
@contextlib.contextmanager
def run_compose(compose_filename, container_names):
# make a random project name, rather than some env dirname
project_name = f'test-{random.randint(1000, 10000)}'
def _compose_args(params):
compose_result = subprocess.call(
['docker', 'compose'],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
if compose_result:
return ['docker-compose'] + params
return ['docker', 'compose'] + params
args = _compose_args([
'-f', compose_filename,
'-p', project_name,
'up', '--detach'
])
_output_or_die(args)
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
future_to_container_name = {
executor.submit(_wait_for_health_proc, name): name
for name in container_names
}
for future in concurrent.futures \
.as_completed(future_to_container_name):
name = future_to_container_name[future]
assert future.result(), \
f'health check failed for container {name}'
yield # wait here until the context finishes
finally:
for name in container_names:
try:
args = ['docker', 'logs', name]
logging.info(_output_or_die(args))
except: # noqa: E722
# crappy bare except, but this is debugging code
# continue with cleanup - but log an error message about this
logging.exception(f'error calling `docker logs {name}`')
args = _compose_args([
'-f', compose_filename,
'-p', project_name,
'down'
])
_output_or_die(args)
@pytest.fixture
def testenv_containers(testenv_compose_params):
with run_compose(
compose_filename=testenv_compose_params['compose_filename'],
container_names=[testenv_compose_params['container']]):
# wait here until the caller context finishes
yield
def _use_docker_compose():
if 'USE_COMPOSE' not in os.environ:
logging.warning('"USE_COMPOSE" is not defined in the environment')
return False
# now just check if compose is available
compose_result = subprocess.call(
['docker', 'compose'],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
if compose_result == 0:
return True
compose_result = subprocess.call(
['docker-compose'],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
return compose_result == 0
@pytest.mark.skipif(
not _use_docker_compose(),
reason='docker compose not found or disabled')
def test_e2e(app_config_params, ifc_netconf_rpc, testenv_containers, mocked_poller_interfaces):
def _mocked_managed_routers(inventory_base_urls):
all_routers = {_ifc['router'] for _ifc in inventory.load_interfaces(['https://abc'])}
return [_ifc for _ifc in all_routers if not _ifc.startswith('srx')]
with patch('brian_polling_manager.interface_stats.cli._managed_routers') as rpc:
rpc.side_effect = _mocked_managed_routers
cli._main(app_config_params)
influx_config = app_config_params['influx']['brian-counters']
with contextlib.closing(influx.influx_client(influx_config)) as client:
expected_brian_fields = [
'egressOctets',
'egressPackets',
'ingressOctets',
'ingressPackets',
'egressOctetsv6',
'egressPacketsv6',
'ingressOctetsv6',
'ingressPacketsv6',
'egressErrors',
'ingressDiscards',
'ingressErrors',
]
query = f'select count(*) from {influx_config["measurement"]}'
counts = next(client.query(query).get_points(measurement=influx_config["measurement"]))
assert all(f'count_{n}' in counts for n in expected_brian_fields)
assert all(counts[f'count_{n}'] > 0 for n in expected_brian_fields)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment