-
Bjarke Madsen authoredBjarke Madsen authored
test_interface_stats.py 10.99 KiB
from datetime import datetime
from unittest.mock import Mock, call, patch
from brian_polling_manager import influx
from brian_polling_manager.interface_stats.juniper import JuniperRouterProcessor
import pytest
from brian_polling_manager.interface_stats import cli
from brian_polling_manager.interface_stats import common
from brian_polling_manager.interface_stats.common import PointGroup, RouterProcessor
from lxml import etree
import ncclient.manager
def test_sanity_check_juniper_snapshot_data(juniper_inventory, all_juniper_routers):
"""
verify that all routers with interfaces to be polled
are in the test data set
:return:
"""
missing_routers = set(juniper_inventory.keys()) - set(all_juniper_routers)
assert len(missing_routers) == 0
def test_sanity_check_nokia_snapshot_data(nokia_inventory, all_nokia_routers):
"""
verify that all routers with interfaces to be polled
are in the test data set
:return:
"""
missing_routers = set(nokia_inventory.keys()) - set(all_nokia_routers)
assert len(missing_routers) == 0
class TestParseInterfaceXML:
def test_parse_xml(self):
xml = """<root><ab>42</ab></root>"""
struct = {"something": {"path": "./ab", "transform": int}}
result = common.parse_interface_xml(etree.fromstring(xml), struct)
assert result == {"something": 42}
def test_parse_xml_deep(self):
xml = """<root><a><b>42</b></a></root>"""
struct = {"something": {"path": "./a/b", "transform": int}}
result = common.parse_interface_xml(etree.fromstring(xml), struct)
assert result == {"something": 42}
def test_parse_xml_direct_to_value(self):
xml = """<root><ab>42</ab></root>"""
struct = {"path": "./ab", "transform": int}
result = common.parse_interface_xml(etree.fromstring(xml), struct)
assert result == 42
def test_parse_xml_multiple_path(self):
xml = """<root><a>This is something</a></root>"""
struct = {"something": {"path": ["./b", "./a"], "transform": str}}
result = common.parse_interface_xml(etree.fromstring(xml), struct)
assert result == {"something": "This is something"}
def test_parse_xml_nested(self):
xml = """<root><a>This is something</a></root>"""
struct = {"something": {"nested": {"path": "./a", "transform": str}}}
result = common.parse_interface_xml(etree.fromstring(xml), struct)
assert result == {"something": {"nested": "This is something"}}
def test_skips_unavailable_field(self):
xml = """<root><a>This is something</a></root>"""
struct = {
"something": {"path": "./a", "transform": str},
"something_else": {"nested": {"path": "./b", "transform": str}},
}
result = common.parse_interface_xml(etree.fromstring(xml), struct)
assert result == {"something": "This is something"}
def test_raises_on_missing_required_field(self, caplog):
xml = """<root></root>"""
struct = {
"something": {"path": "./a", "transform": str, "required": True},
}
with pytest.raises(common.ParsingError) as e:
common.parse_interface_xml(etree.fromstring(xml), struct)
assert "required path ./a" in str(e.value)
def test_logs_on_double_entry(self, caplog):
xml = """<root><a>This is something</a><a>Something Else</a></root>"""
struct = {
"something": {"path": "./a", "transform": str},
}
result = common.parse_interface_xml(etree.fromstring(xml), struct)
assert result == {"something": "This is something"}
record = caplog.records[0]
assert record.levelname == "WARNING"
assert "found more than one element ./a" in record.message
def test_counters_to_point():
now = datetime.now()
assert common.counters_to_point(
router_fqdn="some.router",
interface_name="ae12",
counters={"some": "fields"},
timestamp=now,
measurement="blah",
) == {
"time": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
"measurement": "blah",
"tags": {"hostname": "some.router", "interface_name": "ae12"},
"fields": {"some": "fields"},
}
def test_error_point_generator(config):
class DummyProcessor(RouterProcessor):
def _interface_counters(self, *args, **kwargs):
return [
{
"name": "ae12.1",
},
{"name": "ae12", PointGroup.ERROR: {"some": "fields"}},
]
now = datetime.now()
points = list(
DummyProcessor("some.router", config).points(
point_group=PointGroup.ERROR,
interfaces=None,
timestamp=now,
)
)
assert points == [
{
"time": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
"measurement": "testenv_error_counters",
"tags": {"hostname": "some.router", "interface_name": "ae12"},
"fields": {"some": "fields"},
}
]
@patch.object(cli.OutputMethod, "write_points")
@pytest.mark.usefixtures("mocked_get_netconf")
def test_main_for_all_juniper_routers(
write_points, juniper_router_fqdn, juniper_inventory, schemavalidate, config
):
total_points = 0
calls = 0
def validate(points, *_, **__):
nonlocal total_points, calls
points = list(points)
calls += 1
assert points
for point in points:
total_points += 1
schemavalidate(point, influx.INFLUX_POINT)
assert point["fields"] # must contain at least one field
write_points.side_effect = validate
cli.main(
None,
processor=JuniperRouterProcessor(juniper_router_fqdn, config),
interfaces=juniper_inventory[juniper_router_fqdn],
)
assert calls > 0
assert total_points > 0
@pytest.fixture
def mocked_load_inventory():
with patch.object(cli, "load_inventory_json") as mock:
mock.return_value = [
{"router": "mx1.ams.nl.geant.net", "name": "ifc1"},
{"router": "mx1.ams.nl.geant.net", "name": "ifc2"},
{"router": "mx1.lon.uk.geant.net", "name": "ifc3"},
]
yield mock
@patch.object(cli, "process_router")
def test_main_with_some_interfaces(
process_router, mocked_load_inventory, mocked_get_netconf
):
config = {
"juniper": {"some": "params"},
"inventory": ["some-inprov"],
"brian-counters": {},
"error-counters": {},
}
cli.main(
None,
processor=JuniperRouterProcessor("mx1.ams.nl.geant.net", config),
interfaces=["ifc1"],
)
assert process_router.call_args[1]["interfaces"] == {"ifc1": {}}
@patch.object(cli, "process_router")
def test_main_with_all_interfaces_and_inprov_hosts(
process_router, mocked_load_inventory, mocked_get_netconf
):
config = {
"juniper": {"some": "params"},
"inventory": ["some-inprov"],
"brian-counters": {},
"error-counters": {},
}
cli.main(
None,
processor=JuniperRouterProcessor("mx1.ams.nl.geant.net", config),
)
assert process_router.call_args[1]["interfaces"].keys() == {"ifc1", "ifc2"}
@patch.object(cli, "process_router")
def test_main_with_all_interfaces_no_inprov_hosts(
process_router, mocked_load_inventory, mocked_get_netconf
):
config = {
"juniper": {"some": "params"},
"brian-counters": {"influx": None},
"error-counters": {"influx": None},
}
cli.main(
None,
processor=JuniperRouterProcessor("mx1.ams.nl.geant.net", config),
)
assert process_router.call_args[1]["interfaces"] is None
@pytest.mark.parametrize(
"point_group, url",
[
(PointGroup.BRIAN, "/brian/endpoint"),
(PointGroup.ERROR, "/error/endpoint"),
],
)
def test_loads_interfaces_from_endpoint(point_group, url, mocked_load_inventory):
inprov_hosts = ["some.inprov"]
config = {
"inventory": inprov_hosts,
"brian-counters": {"inventory-url": "/brian/endpoint"},
"error-counters": {"inventory-url": "/error/endpoint"},
}
cli.load_interfaces(
"mx1.ams.nl.geant.net",
interfaces=cli.ALL_,
point_group=point_group,
config=config,
)
assert mocked_load_inventory.call_args == call(
url,
inprov_hosts,
cli.INVENTORY_INTERFACES_SCHEMA,
)
def test_loads_interfaces_for_gws_indirect_points(mocked_load_inventory):
mocked_load_inventory.return_value = [
{
"name": "SOME_NAME",
"interface": "lag-11.333",
"hostname": "rt0.ams.nl",
}
]
config = {
"inventory": ["some.inprov"],
"brian-counters": {"inventory-url": "/brian/endpoint"},
"gws-indirect-counters": {"inventory-url": "/gws-indirect/endpoint"},
}
cli.load_interfaces(
"rt0.ams.nl",
interfaces=cli.ALL_,
point_group=PointGroup.GWS_INDIRECT,
config=config,
)
assert mocked_load_inventory.call_args == call(
config["gws-indirect-counters"]["inventory-url"],
config["inventory"],
cli.GWS_INDIRECT_SCHEMA,
)
@patch.object(cli, "influx_client")
def test_write_points_to_influx(influx_client):
points = [{"point": "one"}, {"point": "two"}]
influx_params = {"influx": "param"}
cli.OutputMethod.INFLUX.write_points(
points=points,
influx_params=influx_params,
)
assert influx_client.call_args == call({"timeout": 5, "influx": "param"})
assert influx_client().__enter__.call_count
assert influx_client().write_points.call_args == call(points, batch_size=50)
def test_write_points_to_stdout():
stream = Mock()
points = [{"point": "one"}, {"point": "two"}]
influx_params = {"measurement": "meas"}
cli.OutputMethod.STDOUT.write_points(
points=points,
influx_params=influx_params,
output=cli.OutputMethod.STDOUT,
stream=stream,
)
assert stream.write.call_args_list == [
call('meas - {"point": "one"}\n'),
call('meas - {"point": "two"}\n'),
]
@patch.object(ncclient.manager, "connect")
def test_netconf_connect(connect):
with common.netconf_connect(
hostname="some.router", ssh_params={"ssh": "param"}, other="more_params"
):
pass
assert connect.call_args == call(
host="some.router", ssh="param", other="more_params", port=830
)
@patch.object(cli, "logging")
def test_setup_logging_returns_message_counter(logging):
logging.ERROR = 42
result = cli.setup_logging()
assert isinstance(result, cli.MessageCounter)
assert result.level == logging.ERROR
@pytest.mark.parametrize(
"verbosity, level",
[
(True, "DEBUG"),
(False, "INFO"),
],
)
@patch.object(cli, "logging")
def test_setup_logging_sets_loglevel(logging, verbosity, level):
logging.ERROR = "ERROR"
logging.INFO = "INFO"
logging.DEBUG = "DEBUG"
cli.setup_logging(verbosity)
assert logging.basicConfig.call_args[1]["level"] == level
assert logging.StreamHandler().setLevel.call_args[0][0] == level