Select Git revision
test_interface_stats.py 8.40 KiB
from datetime import datetime
from unittest.mock import Mock, call, patch
from brian_polling_manager import influx
import jsonschema
import pytest
from brian_polling_manager.interface_stats import cli
from brian_polling_manager.interface_stats.vendors import Vendor, common
from lxml import etree
import ncclient.manager
def test_sanity_check_juniper_snapshot_data(polled_interfaces, all_juniper_routers):
"""
verify that all routers with interfaces to be polled
are in the test data set
:return:
"""
missing_routers = set(polled_interfaces.keys()) - set(all_juniper_routers)
assert len(missing_routers) == 0
def test_sanity_check_nokia_snapshot_data(all_nokia_routers):
# TODO: update this once we have Nokia routers in inventory provider
assert set(all_nokia_routers) == {
"rt0.ams.nl.lab.office.geant.net",
"rt0.lon.uk.lab.office.geant.net",
}
class TestParseCounters:
def test_parse_counters(self):
xml = """<root><ab>42</ab></root>"""
struct = {"something": {"path": "./ab", "transform": int}}
result = common.parse_interface_xml(etree.fromstring(xml), struct)
assert result == {"something": 42}
def test_parse_counters_multiple_path(self):
xml = """<root><a>This is something</a></root>"""
struct = {"something": {"path": ["./b", "./a"], "transform": str}}
result = common.parse_interface_xml(etree.fromstring(xml), struct)
assert result == {"something": "This is something"}
def test_parse_counters_nested(self):
xml = """<root><a>This is something</a></root>"""
struct = {"something": {"nested": {"path": "./a", "transform": str}}}
result = common.parse_interface_xml(etree.fromstring(xml), struct)
assert result == {"something": {"nested": "This is something"}}
def test_skips_unavailable_field(self):
xml = """<root><a>This is something</a></root>"""
struct = {
"something": {"path": "./a", "transform": str},
"something_else": {"nested": {"path": "./b", "transform": str}},
}
result = common.parse_interface_xml(etree.fromstring(xml), struct)
assert result == {"something": "This is something"}
def test_logs_on_missing_required_field(self, caplog):
xml = """<root></root>"""
struct = {
"something": {"path": "./a", "transform": str, "required": True},
}
result = common.parse_interface_xml(etree.fromstring(xml), struct)
assert result is None
record = caplog.records[0]
assert record.levelname == "ERROR"
assert "required path ./a" in record.message
def test_logs_on_double_entry(self, caplog):
xml = """<root><a>This is something</a><a>Something Else</a></root>"""
struct = {
"something": {"path": "./a", "transform": str},
}
result = common.parse_interface_xml(etree.fromstring(xml), struct)
assert result == {"something": "This is something"}
record = caplog.records[0]
assert record.levelname == "WARNING"
assert "found more than one ./a" in record.message
def test_brian_point_counters():
now = datetime.now()
points = list(
common.brian_points(
router_fqdn="some.router",
interfaces=[
{"name": "ae12", "brian": {"some": "fields"}},
{"name": "ae12.1", "brian": {"more": "other fields"}},
],
timestamp=now,
measurement_name="blah",
)
)
assert points == [
{
"time": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
"measurement": "blah",
"tags": {"hostname": "some.router", "interface_name": "ae12"},
"fields": {"some": "fields"},
},
{
"time": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
"measurement": "blah",
"tags": {"hostname": "some.router", "interface_name": "ae12.1"},
"fields": {"more": "other fields"},
},
]
def test_error_point_counters():
now = datetime.now()
points = list(
common.error_points(
router_fqdn="some.router",
interfaces=[
{"name": "ae12", "errors": {"some": "fields"}},
],
timestamp=now,
measurement_name="blah-errors",
)
)
assert points == [
{
"time": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
"measurement": "blah-errors",
"tags": {"hostname": "some.router", "interface_name": "ae12"},
"fields": {"some": "fields"},
}
]
def test_no_error_point_counters():
now = datetime.now()
points = list(
common.error_points(
router_fqdn="some.router",
interfaces=[
{
"name": "ae12.1",
},
{"name": "ae12", "errors": {"some": "fields"}},
],
timestamp=now,
measurement_name="blah-errors",
)
)
assert points == [
{
"time": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
"measurement": "blah-errors",
"tags": {"hostname": "some.router", "interface_name": "ae12"},
"fields": {"some": "fields"},
}
]
@patch.object(cli, "write_points")
def test_main_for_all_juniper_routers(
write_points, mocked_get_netconf, all_juniper_routers
):
config = {
"juniper": {"some": "params"},
"influx": {
"brian-counters": {"measurement": "brian"},
"error-counters": {"measurement": "error"},
},
}
total_points = 0
calls = 0
def validate(points, *_, **__):
nonlocal total_points, calls
points = list(points)
calls += 1
assert points
for point in points:
total_points += 1
jsonschema.validate(point, influx.INFLUX_POINT)
assert point["fields"] # must contain at least one field
write_points.side_effect = validate
cli.main(
app_config_params=config,
router_fqdns=all_juniper_routers,
vendor=Vendor.JUNIPER,
raise_errors=True,
)
assert calls > 0
assert total_points > 0
@pytest.fixture
def load_interfaces():
return Mock(
return_value=[{"router": "host1"}, {"router": "host2"}, {"router": "host3"}]
)
def test_validate_valid_hosts(load_interfaces):
assert cli.validate_router_hosts(
("host1", "host2"),
vendor=Vendor.JUNIPER,
inprov_hosts=["some_host"],
load_interfaces_=load_interfaces,
)
assert load_interfaces.called
def test_validate_invalid_hosts(load_interfaces):
with pytest.raises(ValueError):
cli.validate_router_hosts(
("host1", "invalid"),
vendor=Vendor.JUNIPER,
inprov_hosts=["some_host"],
load_interfaces_=load_interfaces,
)
assert load_interfaces.called
def test_doesnt_validate_without_inprov_hosts(load_interfaces):
assert cli.validate_router_hosts(
("host1", "invalid"),
vendor=Vendor.JUNIPER,
inprov_hosts=None,
load_interfaces_=load_interfaces,
)
assert not load_interfaces.called
@patch.object(cli, "influx_client")
def test_write_points_to_influx(influx_client):
points = [{"point": "one"}, {"point": "two"}]
influx_params = {"influx": "param"}
cli.write_points(
points=points,
influx_params=influx_params,
output=cli.OutputMethod.INFLUX,
)
assert influx_client.call_args == call({"timeout": 5, "influx": "param"})
assert influx_client().__enter__.call_count
assert influx_client().write_points.call_args == call(points, batch_size=50)
def test_write_points_to_stdout():
stream = Mock()
points = [{"point": "one"}, {"point": "two"}]
influx_params = {"measurement": "meas"}
cli.write_points(
points=points,
influx_params=influx_params,
output=cli.OutputMethod.STDOUT,
stream=stream,
)
assert stream.write.call_args_list == [
call('meas - {"point": "one"}\n'),
call('meas - {"point": "two"}\n'),
]
@patch.object(ncclient.manager, "connect")
def test_netconf_connect(connect):
with common.netconf_connect(
hostname="some.router", ssh_params={"ssh": "param"}, other="more_params"
):
pass
assert connect.call_args == call(
host="some.router", ssh="param", other="more_params", port=830
)