Select Git revision
      
  test_interface_stats.py
              Pelle Koster authored 
   test_interface_stats.py  11.09 KiB 
from datetime import datetime
from unittest.mock import Mock, call, patch
from brian_polling_manager import influx
import jsonschema
import pytest
from brian_polling_manager.interface_stats import cli
from brian_polling_manager.interface_stats.cli import PointGroup, Vendor
from brian_polling_manager.interface_stats.vendors import common
from lxml import etree
import ncclient.manager
def test_sanity_check_juniper_snapshot_data(juniper_inventory, all_juniper_routers):
    """
    verify that all routers with interfaces to be polled
    are in the test data set
    :return:
    """
    missing_routers = set(juniper_inventory.keys()) - set(all_juniper_routers)
    assert len(missing_routers) == 0
def test_sanity_check_nokia_snapshot_data(all_nokia_routers):
    # TODO: update this once we have Nokia routers in inventory provider
    assert set(all_nokia_routers) == {
        "rt0.ams.nl.lab.office.geant.net",
        "rt0.lon.uk.lab.office.geant.net",
        "rt0.ams.nl.geant.net",
    }
class TestParseInterfaceXML:
    def test_parse_xml(self):
        xml = """<root><ab>42</ab></root>"""
        struct = {"something": {"path": "./ab", "transform": int}}
        result = common.parse_interface_xml(etree.fromstring(xml), struct)
        assert result == {"something": 42}
    def test_parse_xml_deep(self):
        xml = """<root><a><b>42</b></a></root>"""
        struct = {"something": {"path": "./a/b", "transform": int}}
        result = common.parse_interface_xml(etree.fromstring(xml), struct)
        assert result == {"something": 42}
    def test_parse_xml_direct_to_value(self):
        xml = """<root><ab>42</ab></root>"""
        struct = {"path": "./ab", "transform": int}
        result = common.parse_interface_xml(etree.fromstring(xml), struct)
        assert result == 42
    def test_parse_xml_multiple_path(self):
        xml = """<root><a>This is something</a></root>"""
        struct = {"something": {"path": ["./b", "./a"], "transform": str}}
        result = common.parse_interface_xml(etree.fromstring(xml), struct)
        assert result == {"something": "This is something"}
    def test_parse_xml_nested(self):
        xml = """<root><a>This is something</a></root>"""
        struct = {"something": {"nested": {"path": "./a", "transform": str}}}
        result = common.parse_interface_xml(etree.fromstring(xml), struct)
        assert result == {"something": {"nested": "This is something"}}
    def test_skips_unavailable_field(self):
        xml = """<root><a>This is something</a></root>"""
        struct = {
            "something": {"path": "./a", "transform": str},
            "something_else": {"nested": {"path": "./b", "transform": str}},
        }
        result = common.parse_interface_xml(etree.fromstring(xml), struct)
        assert result == {"something": "This is something"}
    def test_raises_on_missing_required_field(self, caplog):
        xml = """<root></root>"""
        struct = {
            "something": {"path": "./a", "transform": str, "required": True},
        }
        with pytest.raises(common.ParsingError) as e:
            common.parse_interface_xml(etree.fromstring(xml), struct)
        assert "required path ./a" in str(e.value)
    def test_logs_on_double_entry(self, caplog):
        xml = """<root><a>This is something</a><a>Something Else</a></root>"""
        struct = {
            "something": {"path": "./a", "transform": str},
        }
        result = common.parse_interface_xml(etree.fromstring(xml), struct)
        assert result == {"something": "This is something"}
        record = caplog.records[0]
        assert record.levelname == "WARNING"
        assert "found more than one element ./a" in record.message
def test_brian_point_counters():
    now = datetime.now()
    points = list(
        common.brian_points(
            router_fqdn="some.router",
            interfaces=[
                {"name": "ae12", "brian": {"some": "fields"}},
                {"name": "ae12.1", "brian": {"more": "other fields"}},
            ],
            timestamp=now,
            measurement_name="blah",
        )
    )
    assert points == [
        {
            "time": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
            "measurement": "blah",
            "tags": {"hostname": "some.router", "interface_name": "ae12"},
            "fields": {"some": "fields"},
        },
        {
            "time": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
            "measurement": "blah",
            "tags": {"hostname": "some.router", "interface_name": "ae12.1"},
            "fields": {"more": "other fields"},
        },
    ]
def test_error_point_counters():
    now = datetime.now()
    points = list(
        common.error_points(
            router_fqdn="some.router",
            interfaces=[
                {"name": "ae12", "errors": {"some": "fields"}},
            ],
            timestamp=now,
            measurement_name="blah-errors",
        )
    )
    assert points == [
        {
            "time": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
            "measurement": "blah-errors",
            "tags": {"hostname": "some.router", "interface_name": "ae12"},
            "fields": {"some": "fields"},
        }
    ]
def test_no_error_point_counters():
    now = datetime.now()
    points = list(
        common.error_points(
            router_fqdn="some.router",
            interfaces=[
                {
                    "name": "ae12.1",
                },
                {"name": "ae12", "errors": {"some": "fields"}},
            ],
            timestamp=now,
            measurement_name="blah-errors",
        )
    )
    assert points == [
        {
            "time": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
            "measurement": "blah-errors",
            "tags": {"hostname": "some.router", "interface_name": "ae12"},
            "fields": {"some": "fields"},
        }
    ]
@patch.object(cli.OutputMethod, "write_points")
def test_main_for_all_juniper_routers(
    write_points, mocked_get_netconf, juniper_router_fqdn, juniper_inventory
):
    config = {
        "juniper": {"some": "params"},
        "brian-counters": {"influx": {"measurement": "brian"}},
        "error-counters": {"influx": {"measurement": "error"}},
    }
    total_points = 0
    calls = 0
    def validate(points, *_, **__):
        nonlocal total_points, calls
        points = list(points)
        calls += 1
        assert points
        for point in points:
            total_points += 1
            jsonschema.validate(point, influx.INFLUX_POINT)
            assert point["fields"]  # must contain at least one field
    write_points.side_effect = validate
    cli.main(
        app_config_params=config,
        router_fqdn=juniper_router_fqdn,
        vendor=Vendor.JUNIPER,
        interfaces=juniper_inventory[juniper_router_fqdn],
    )
    assert calls > 0
    assert total_points > 0
@pytest.fixture
def mocked_load_inventory():
    with patch.object(cli, "load_inventory_json") as mock:
        mock.return_value = [
            {"router": "mx1.ams.nl.geant.net", "name": "ifc1"},
            {"router": "mx1.ams.nl.geant.net", "name": "ifc2"},
            {"router": "mx1.lon.uk.geant.net", "name": "ifc3"},
        ]
        yield mock
@patch.object(cli, "process_router")
def test_main_with_some_interfaces(
    process_router, mocked_load_inventory, mocked_get_netconf
):
    config = {
        "juniper": {"some": "params"},
        "inventory": ["some-inprov"],
        "brian-counters": {},
        "error-counters": {},
    }
    cli.main(
        config,
        "mx1.ams.nl.geant.net",
        Vendor.JUNIPER,
        interfaces=["ifc1"],
    )
    assert process_router.call_args[1]["interfaces"] == ["ifc1"]
@patch.object(cli, "process_router")
def test_main_with_all_interfaces_and_inprov_hosts(
    process_router, mocked_load_inventory, mocked_get_netconf
):
    config = {
        "juniper": {"some": "params"},
        "inventory": ["some-inprov"],
        "brian-counters": {},
        "error-counters": {},
    }
    cli.main(config, "mx1.ams.nl.geant.net", Vendor.JUNIPER)
    assert process_router.call_args[1]["interfaces"] == ["ifc1", "ifc2"]
@patch.object(cli, "process_router")
def test_main_with_all_interfaces_no_inprov_hosts(
    process_router, mocked_load_inventory, mocked_get_netconf
):
    config = {
        "juniper": {"some": "params"},
        "brian-counters": {"influx": None},
        "error-counters": {"influx": None},
    }
    cli.main(config, "mx1.ams.nl.geant.net", Vendor.JUNIPER)
    assert process_router.call_args[1]["interfaces"] is None
@pytest.mark.parametrize(
    "point_group, url",
    [
        (PointGroup.BRIAN, "/brian/endpoint"),
        (PointGroup.ERRORS, "/error/endpoint"),
    ],
)
def test_loads_interfaces_from_endpoint(point_group, url, mocked_load_inventory):
    inprov_hosts = ["some.inprov"]
    config = {
        "inventory": inprov_hosts,
        "brian-counters": {"inventory-url": "/brian/endpoint"},
        "error-counters": {"inventory-url": "/error/endpoint"},
    }
    cli.load_interfaces(
        "mx1.ams.nl.geant.net",
        interfaces=cli.ALL_,
        app_config_params=config,
        point_group=point_group,
    )
    assert mocked_load_inventory.call_args == call(
        url,
        inprov_hosts,
        cli.INVENTORY_INTERFACES_SCHEMA,
    )
@patch.object(cli, "influx_client")
def test_write_points_to_influx(influx_client):
    points = [{"point": "one"}, {"point": "two"}]
    influx_params = {"influx": "param"}
    cli.OutputMethod.INFLUX.write_points(
        points=points,
        influx_params=influx_params,
    )
    assert influx_client.call_args == call({"timeout": 5, "influx": "param"})
    assert influx_client().__enter__.call_count
    assert influx_client().write_points.call_args == call(points, batch_size=50)
def test_write_points_to_stdout():
    stream = Mock()
    points = [{"point": "one"}, {"point": "two"}]
    influx_params = {"measurement": "meas"}
    cli.OutputMethod.STDOUT.write_points(
        points=points,
        influx_params=influx_params,
        output=cli.OutputMethod.STDOUT,
        stream=stream,
    )
    assert stream.write.call_args_list == [
        call('meas - {"point": "one"}\n'),
        call('meas - {"point": "two"}\n'),
    ]
@patch.object(ncclient.manager, "connect")
def test_netconf_connect(connect):
    with common.netconf_connect(
        hostname="some.router", ssh_params={"ssh": "param"}, other="more_params"
    ):
        pass
    assert connect.call_args == call(
        host="some.router", ssh="param", other="more_params", port=830
    )
@patch.object(cli, "logging")
def test_setup_logging_returns_message_counter(logging):
    logging.ERROR = 42
    result = cli.setup_logging()
    assert isinstance(result, cli.MessageCounter)
    assert result.level == logging.ERROR
@pytest.mark.parametrize(
    "verbosity, level",
    [
        (True, "DEBUG"),
        (False, "INFO"),
    ],
)
@patch.object(cli, "logging")
def test_setup_logging_sets_loglevel(logging, verbosity, level):
    logging.ERROR = "ERROR"
    logging.INFO = "INFO"
    logging.DEBUG = "DEBUG"
    cli.setup_logging(verbosity)
    assert logging.basicConfig.call_args[1]["level"] == level
    assert logging.StreamHandler().setLevel.call_args[0][0] == level