Skip to content
Snippets Groups Projects
Commit d0ee269e authored by Pelle Koster's avatar Pelle Koster
Browse files

refactor tests

parent 74aaaf9d
No related branches found
No related tags found
No related merge requests found
......@@ -43,7 +43,8 @@ def influx_client(influx_params):
username=influx_params['username'],
password=influx_params['password'],
ssl=influx_params['ssl'],
verify_ssl=influx_params['ssl'])
verify_ssl=influx_params['ssl'],
timeout=influx_params.get('timeout'))
def save_counters(
......
......
......@@ -133,6 +133,9 @@ def _error_points(router_fqdn, netconf_doc, timestamp, measurement_name):
counters = errors.counters(router_fqdn=router_fqdn, interface_counters=interfaces)
yield from map(_ctr2point, counters)
# [2024-03-21] We currently have no definition for error points on logical
# interfaces. This operation is essentially a no-op. Perhaps in the future we will
# get a definition for errors on logical interfaces
interfaces = juniper.logical_interface_counters(netconf_doc)
counters = errors.counters(router_fqdn=router_fqdn, interface_counters=interfaces)
yield from map(_ctr2point, counters)
......@@ -191,6 +194,7 @@ def main(
nokia_fqdns: List[str],
get_netconf_provider_: Callable[[Vendor, dict], NetconfProvider],
get_point_writer_: Callable[[dict], PointWriter],
raise_errors=False,
):
if not juniper_fqdns and not nokia_fqdns:
......@@ -229,7 +233,8 @@ def main(
)
except Exception as e:
logger.exception(f"Error while processing juniper {router}", exc_info=e)
if raise_errors:
raise
for router in nokia_fqdns:
try:
process_nokia_router(
......@@ -240,6 +245,8 @@ def main(
)
except Exception as e:
logger.exception(f"Error while processing nokia {router}", exc_info=e)
if raise_errors:
raise
return error_count
......
......
......@@ -4,20 +4,25 @@ import click
import jsonschema
import socket
def validate_config(_unused_ctx, _unused_param, file):
try:
return config.load(file)
except json.JSONDecodeError:
raise click.BadParameter('config file is not valid json')
raise click.BadParameter("config file is not valid json")
except jsonschema.ValidationError as e:
raise click.BadParameter(e)
def validate_hostname(_unused_ctx, _unused_param, hostname_or_names):
hostnames = hostname_or_names if isinstance(hostname_or_names, (list, tuple)) else [hostname_or_names]
hostnames = (
hostname_or_names
if isinstance(hostname_or_names, (list, tuple))
else [hostname_or_names]
)
for _h in hostnames:
try:
socket.gethostbyname(_h)
except socket.error:
raise click.BadParameter(f'{_h} is not resolveable')
raise click.BadParameter(f"{_h} is not resolveable")
return hostname_or_names
......@@ -29,8 +29,8 @@ ERROR_COUNTER_DICT_SCHEMA = {
def counters(router_fqdn, interface_counters):
"""
:param router_fqdn: hostname
:param interface_counters: either PHYSICAL_INTERFACE_COUNTER_SCHEMA
or LOGICAL_INTERFACE_COUNTER_SCHEMA
:param interface_counters: PHYSICAL_INTERFACE_COUNTER_SCHEMA
:return: iterable of BRIAN_POINT_SCHEMA objects
"""
......
......
......@@ -27,13 +27,15 @@ class InfluxPointWriter(PointWriter):
def __init__(
self,
influx_params: dict,
timeout=90,
client_factory: Callable[[dict], InfluxDBClient] = influx.influx_client,
) -> None:
self.influx_params = influx_params
self.client_factory = client_factory
self.timeout = timeout
def write_points(self, points: Iterable[dict]):
client = self.client_factory(self.influx_params)
client = self.client_factory({"timeout": self.timeout, **self.influx_params})
with client:
client.write_points(points)
......
......
......@@ -53,3 +53,8 @@ def all_juniper_routers():
@pytest.fixture(params=JUNIPER_ROUTERS)
def router_fqdn(request):
return request.param
@pytest.fixture()
def single_router_fqdn():
return JUNIPER_ROUTERS[0]
import datetime
from functools import partial
import itertools
import re
from unittest.mock import call
from typing import Iterable
from unittest.mock import MagicMock, call, patch
from brian_polling_manager.interface_stats.services.writers import (
InfluxPointWriter,
PointWriter,
)
import jsonschema
import pytest
from brian_polling_manager import influx
from brian_polling_manager.interface_stats import brian, cli, errors, vendors
from brian_polling_manager.interface_stats.services.netconf import (
JuniperNetconfProvider,
get_netconf_provider,
)
from brian_polling_manager.interface_stats.vendors import juniper
from lxml import etree
from ncclient.operations.rpc import RPCReply
......@@ -37,7 +43,9 @@ def _is_enabled(ifc_name, ifc_doc):
return admin_status == "up" and oper_status == "up"
def test_verify_all_interfaces_present(router_fqdn, get_netconf, polled_interfaces):
def test_verify_all_interfaces_present(
single_router_fqdn, get_netconf, polled_interfaces
):
"""
verify that all the interfaces we expect to poll
are available in the netconf data
......@@ -45,98 +53,190 @@ def test_verify_all_interfaces_present(router_fqdn, get_netconf, polled_interfac
a snapshot of inventory /poller/interfaces
(the snapshots were all taken around the same time)
"""
if router_fqdn not in polled_interfaces:
pytest.skip(f"{router_fqdn} has no expected polled interfaces")
if single_router_fqdn not in polled_interfaces:
pytest.skip(f"{single_router_fqdn} has no expected polled interfaces")
doc = get_netconf(router_fqdn)
doc = get_netconf(single_router_fqdn)
phy = juniper.physical_interface_counters(doc)
log = juniper.logical_interface_counters(doc)
interfaces = set(x["name"] for x in itertools.chain(phy, log))
missing_interfaces = polled_interfaces[router_fqdn] - interfaces
missing_interfaces = polled_interfaces[single_router_fqdn] - interfaces
for ifc_name in missing_interfaces:
# verify that any missing interfaces are admin/oper disabled
assert not _is_enabled(ifc_name, doc)
def test_validate_physical_counter_schema(router_fqdn, get_netconf):
doc = get_netconf(router_fqdn)
for ifc in juniper.physical_interface_counters(doc):
jsonschema.validate(ifc, vendors.PHYSICAL_INTERFACE_COUNTER_SCHEMA)
@pytest.fixture(
params=[
(
juniper.physical_interface_counters,
vendors.PHYSICAL_INTERFACE_COUNTER_SCHEMA,
),
(
juniper.logical_interface_counters,
vendors.LOGICAL_INTERFACE_COUNTER_SCHEMA,
),
]
)
def interface_counter_calculator(request):
return request.param
def test_validate_logical_counters_schema(router_fqdn, get_netconf):
doc = get_netconf(router_fqdn)
for ifc in juniper.logical_interface_counters(doc):
jsonschema.validate(ifc, vendors.LOGICAL_INTERFACE_COUNTER_SCHEMA)
def test_validate_interface_counters(
interface_counter_calculator, single_router_fqdn, get_netconf
):
fun, schema = interface_counter_calculator
doc = get_netconf(single_router_fqdn)
interfaces = list(fun(doc))
assert interfaces
for ifc in interfaces:
jsonschema.validate(ifc, schema)
def test_physical_brian_counters(router_fqdn, get_netconf):
def test_interface_counters(router_fqdn, interface_counter_calculator, get_netconf):
fun, _ = interface_counter_calculator
doc = get_netconf(router_fqdn)
interfaces = juniper.physical_interface_counters(doc)
counters = brian.counters(router_fqdn=router_fqdn, interface_counters=interfaces)
for ctrs in counters:
jsonschema.validate(ctrs, brian.BRIAN_COUNTER_DICT_SCHEMA)
interfaces = list(fun(doc))
assert interfaces
def test_physical_error_counters(router_fqdn, get_netconf):
doc = get_netconf(router_fqdn)
interfaces = juniper.physical_interface_counters(doc)
counters = errors.counters(router_fqdn=router_fqdn, interface_counters=interfaces)
for ctrs in counters:
jsonschema.validate(ctrs, errors.ERROR_COUNTER_DICT_SCHEMA)
@pytest.fixture(
params=[
(brian.counters, brian.BRIAN_COUNTER_DICT_SCHEMA),
(errors.counters, errors.ERROR_COUNTER_DICT_SCHEMA),
]
)
def point_counter_calculator(request):
return request.param
def test_logical_brian_counters(router_fqdn, get_netconf):
doc = get_netconf(router_fqdn)
interfaces = juniper.logical_interface_counters(doc)
counters = brian.counters(router_fqdn=router_fqdn, interface_counters=interfaces)
def test_validate_point_counters(
single_router_fqdn,
interface_counter_calculator,
point_counter_calculator,
get_netconf,
):
interface_fun, _ = interface_counter_calculator
point_fun, schema = point_counter_calculator
if (
interface_fun is juniper.logical_interface_counters
and point_fun is errors.counters
):
# We should not have any error counters for logical interfaces
pytest.skip()
doc = get_netconf(single_router_fqdn)
interfaces = interface_fun(doc)
counters = list(
point_fun(router_fqdn=single_router_fqdn, interface_counters=interfaces)
)
assert counters
for ctrs in counters:
jsonschema.validate(ctrs, brian.BRIAN_COUNTER_DICT_SCHEMA)
jsonschema.validate(ctrs, schema)
def test_logical_error_counters(router_fqdn, get_netconf):
def test_point_counters(
router_fqdn,
interface_counter_calculator,
point_counter_calculator,
get_netconf,
):
interface_fun, _ = interface_counter_calculator
point_fun, _ = point_counter_calculator
doc = get_netconf(router_fqdn)
interfaces = juniper.logical_interface_counters(doc)
counters = errors.counters(router_fqdn=router_fqdn, interface_counters=interfaces)
for ctrs in counters:
jsonschema.validate(ctrs, errors.ERROR_COUNTER_DICT_SCHEMA)
interfaces = interface_fun(doc)
counters = list(point_fun(router_fqdn=router_fqdn, interface_counters=interfaces))
if (
interface_fun is juniper.logical_interface_counters
and point_fun is errors.counters
):
# We should not have any error counters for logical interfaces
assert not counters
else:
assert counters
def test_brian_points(router_fqdn, get_netconf):
for _p in cli._brian_points(
router_fqdn=router_fqdn,
netconf_doc=get_netconf(router_fqdn),
@pytest.fixture(params=[cli._brian_points, cli._error_points])
def influx_point_calculator(request):
return request.param
def test_validate_influx_points(
single_router_fqdn, influx_point_calculator, get_netconf
):
points = list(
influx_point_calculator(
router_fqdn=single_router_fqdn,
netconf_doc=get_netconf(single_router_fqdn),
timestamp=datetime.datetime.now(),
measurement_name="blah",
):
jsonschema.validate(_p, influx.INFLUX_POINT)
assert _p["fields"] # any trivial points should already be filtered
)
)
assert len(points)
for point in points:
jsonschema.validate(point, influx.INFLUX_POINT)
assert point["fields"] # any trivial points should already be filtered
def test_error_points(router_fqdn, get_netconf):
for _p in cli._error_points(
def test_influx_points(router_fqdn, influx_point_calculator, get_netconf):
points = list(
influx_point_calculator(
router_fqdn=router_fqdn,
netconf_doc=get_netconf(router_fqdn),
timestamp=datetime.datetime.now(),
measurement_name="blah",
):
jsonschema.validate(_p, influx.INFLUX_POINT)
assert _p["fields"] # any trivial points should already be filtered
)
)
assert len(points)
for point in points:
assert point["fields"] # any trivial points should already be filtered
def test_main_for_all_juniper_routers(all_juniper_routers, data_dir):
config = {
"juniper": {"some": "params"},
"influx": {
"brian-counters": {"measurement": "brian"},
"error-counters": {"measurement": "error"},
},
}
class ValidatingPointWriter(PointWriter):
def __init__(self, *args, **kwargs) -> None:
pass
def write_points(self, points: Iterable[dict]):
points = list(points)
assert points
for point in points:
jsonschema.validate(point, influx.INFLUX_POINT)
assert point["fields"] # must contain at least one field
cli.main(
app_config_params=config,
juniper_fqdns=all_juniper_routers,
nokia_fqdns=[],
get_point_writer_=ValidatingPointWriter,
get_netconf_provider_=partial(get_netconf_provider, source_dir=data_dir),
raise_errors=True,
)
class TestJuniperNetconfProvider:
RAW_RESPONSE_FILE = "raw-response-sample.xml"
@pytest.fixture(autouse=True)
def mocked_rpc(self, mocker, data_dir):
def mocked_rpc(self, data_dir):
raw_response = data_dir.joinpath(self.RAW_RESPONSE_FILE).read_text()
return mocker.patch(
with patch(
"brian_polling_manager.interface_stats.services.netconf._rpc",
return_value=RPCReply(raw_response),
)
) as mock:
yield mock
def test_calls_rpc_with_params(self, mocked_rpc):
ssh_params = object()
ssh_params = {}
router_name = "some-router"
provider = JuniperNetconfProvider(ssh_params)
......@@ -148,10 +248,20 @@ class TestJuniperNetconfProvider:
assert call_args[1]["command"].tag == "get-interface-information"
def test_converts_rpc_response_to_xml(self):
ssh_params = object()
ssh_params = {}
router_name = "some-router"
provider = JuniperNetconfProvider(ssh_params)
result = provider.get(router_name)
assert result.tag == "rpc-reply"
def test_influx_point_writer():
client_factory = MagicMock()
influx_params = {"some": "param"}
writer = InfluxPointWriter(influx_params, timeout=10, client_factory=client_factory)
writer.write_points(["point1", "point2"])
assert client_factory.call_args == call({"timeout": 10, "some": "param"})
assert client_factory().__enter__.call_count == 1
assert client_factory().write_points.call_args == call(["point1", "point2"])
......@@ -7,6 +7,7 @@ import socket
import subprocess
import tempfile
import time
from unittest.mock import patch
from click.testing import CliRunner
from typing import Any, Dict
from brian_polling_manager import influx
......@@ -261,7 +262,9 @@ def app_config_params(free_host_port):
@pytest.mark.skipif(
not _use_docker_compose(), reason="docker compose not found or disabled"
)
@patch.object(cli, 'setup_logging')
def test_e2e(
unused_setup_logging,
app_config_params: Dict[str, Any],
app_config_filename: str,
testenv_containers: None,
......@@ -272,6 +275,7 @@ def test_e2e(
load all router interfaces into a tmp influx container, check that
all potential counter fields are populated
"""
cli_args = ["--config", app_config_filename, "--source-dir", str(data_dir)]
for router_fqdn in all_juniper_routers:
cli_args.extend(["--juniper", router_fqdn])
......
......
......@@ -3,7 +3,6 @@ envlist = py36
[flake8]
max-line-length = 120
exclude = venv,.tox
[testenv]
passenv = USE_COMPOSE
......@@ -20,6 +19,6 @@ commands =
coverage html
coverage report
coverage report --fail-under 80
flake8
flake8 brian_polling_manager/ test/ setup.py
sphinx-build -M html docs/source docs/build
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment