from datetime import datetime from unittest.mock import Mock, call, patch from brian_polling_manager import influx from brian_polling_manager.interface_stats.juniper import JuniperRouterProcessor import pytest from brian_polling_manager.interface_stats import cli from brian_polling_manager.interface_stats import common from brian_polling_manager.interface_stats.common import PointGroup, RouterProcessor from lxml import etree import ncclient.manager def test_sanity_check_juniper_snapshot_data(juniper_inventory, all_juniper_routers): """ verify that all routers with interfaces to be polled are in the test data set :return: """ missing_routers = set(juniper_inventory.keys()) - set(all_juniper_routers) assert len(missing_routers) == 0 def test_sanity_check_nokia_snapshot_data(nokia_inventory, all_nokia_routers): """ verify that all routers with interfaces to be polled are in the test data set :return: """ missing_routers = set(nokia_inventory.keys()) - set(all_nokia_routers) assert len(missing_routers) == 0 class TestParseInterfaceXML: def test_parse_xml(self): xml = """<root><ab>42</ab></root>""" struct = {"something": {"path": "./ab", "transform": int}} result = common.parse_interface_xml(etree.fromstring(xml), struct) assert result == {"something": 42} def test_parse_xml_deep(self): xml = """<root><a><b>42</b></a></root>""" struct = {"something": {"path": "./a/b", "transform": int}} result = common.parse_interface_xml(etree.fromstring(xml), struct) assert result == {"something": 42} def test_parse_xml_direct_to_value(self): xml = """<root><ab>42</ab></root>""" struct = {"path": "./ab", "transform": int} result = common.parse_interface_xml(etree.fromstring(xml), struct) assert result == 42 def test_parse_xml_multiple_path(self): xml = """<root><a>This is something</a></root>""" struct = {"something": {"path": ["./b", "./a"], "transform": str}} result = common.parse_interface_xml(etree.fromstring(xml), struct) assert result == {"something": "This is something"} def test_parse_xml_nested(self): xml = """<root><a>This is something</a></root>""" struct = {"something": {"nested": {"path": "./a", "transform": str}}} result = common.parse_interface_xml(etree.fromstring(xml), struct) assert result == {"something": {"nested": "This is something"}} def test_skips_unavailable_field(self): xml = """<root><a>This is something</a></root>""" struct = { "something": {"path": "./a", "transform": str}, "something_else": {"nested": {"path": "./b", "transform": str}}, } result = common.parse_interface_xml(etree.fromstring(xml), struct) assert result == {"something": "This is something"} def test_raises_on_missing_required_field(self, caplog): xml = """<root></root>""" struct = { "something": {"path": "./a", "transform": str, "required": True}, } with pytest.raises(common.ParsingError) as e: common.parse_interface_xml(etree.fromstring(xml), struct) assert "required path ./a" in str(e.value) def test_logs_on_double_entry(self, caplog): xml = """<root><a>This is something</a><a>Something Else</a></root>""" struct = { "something": {"path": "./a", "transform": str}, } result = common.parse_interface_xml(etree.fromstring(xml), struct) assert result == {"something": "This is something"} record = caplog.records[0] assert record.levelname == "WARNING" assert "found more than one element ./a" in record.message def test_counters_to_point(): now = datetime.now() assert common.counters_to_point( router_fqdn="some.router", interface_name="ae12", counters={"some": "fields"}, timestamp=now, measurement="blah", ) == { "time": now.strftime("%Y-%m-%dT%H:%M:%SZ"), "measurement": "blah", "tags": {"hostname": "some.router", "interface_name": "ae12"}, "fields": {"some": "fields"}, } def test_error_point_generator(config): class DummyProcessor(RouterProcessor): def _interface_counters(self, *args, **kwargs): return [ { "name": "ae12.1", }, {"name": "ae12", PointGroup.ERROR: {"some": "fields"}}, ] now = datetime.now() points = list( DummyProcessor("some.router", config).points( point_group=PointGroup.ERROR, interfaces=None, timestamp=now, ) ) assert points == [ { "time": now.strftime("%Y-%m-%dT%H:%M:%SZ"), "measurement": "testenv_error_counters", "tags": {"hostname": "some.router", "interface_name": "ae12"}, "fields": {"some": "fields"}, } ] @patch.object(cli.OutputMethod, "write_points") @pytest.mark.usefixtures("mocked_get_netconf") def test_main_for_all_juniper_routers( write_points, juniper_router_fqdn, juniper_inventory, schemavalidate, config ): total_points = 0 calls = 0 def validate(points, *_, **__): nonlocal total_points, calls points = list(points) calls += 1 assert points for point in points: total_points += 1 schemavalidate(point, influx.INFLUX_POINT) assert point["fields"] # must contain at least one field write_points.side_effect = validate cli.main( None, processor=JuniperRouterProcessor(juniper_router_fqdn, config), interfaces=juniper_inventory[juniper_router_fqdn], ) assert calls > 0 assert total_points > 0 @pytest.fixture def mocked_load_inventory(): with patch.object(cli, "load_inventory_json") as mock: mock.return_value = [ {"router": "mx1.ams.nl.geant.net", "name": "ifc1"}, {"router": "mx1.ams.nl.geant.net", "name": "ifc2"}, {"router": "mx1.lon.uk.geant.net", "name": "ifc3"}, ] yield mock @patch.object(cli, "process_router") def test_main_with_some_interfaces( process_router, mocked_load_inventory, mocked_get_netconf ): config = { "juniper": {"some": "params"}, "inventory": ["some-inprov"], "brian-counters": {}, "error-counters": {}, } cli.main( None, processor=JuniperRouterProcessor("mx1.ams.nl.geant.net", config), interfaces=["ifc1"], ) assert process_router.call_args[1]["interfaces"] == {"ifc1": {}} @patch.object(cli, "process_router") def test_main_with_all_interfaces_and_inprov_hosts( process_router, mocked_load_inventory, mocked_get_netconf ): config = { "juniper": {"some": "params"}, "inventory": ["some-inprov"], "brian-counters": {}, "error-counters": {}, } cli.main( None, processor=JuniperRouterProcessor("mx1.ams.nl.geant.net", config), ) assert process_router.call_args[1]["interfaces"].keys() == {"ifc1", "ifc2"} @patch.object(cli, "process_router") def test_main_with_all_interfaces_no_inprov_hosts( process_router, mocked_load_inventory, mocked_get_netconf ): config = { "juniper": {"some": "params"}, "brian-counters": {"influx": None}, "error-counters": {"influx": None}, } cli.main( None, processor=JuniperRouterProcessor("mx1.ams.nl.geant.net", config), ) assert process_router.call_args[1]["interfaces"] is None @pytest.mark.parametrize( "point_group, url", [ (PointGroup.BRIAN, "/brian/endpoint"), (PointGroup.ERROR, "/error/endpoint"), ], ) def test_loads_interfaces_from_endpoint(point_group, url, mocked_load_inventory): inprov_hosts = ["some.inprov"] config = { "inventory": inprov_hosts, "brian-counters": {"inventory-url": "/brian/endpoint"}, "error-counters": {"inventory-url": "/error/endpoint"}, } cli.load_interfaces( "mx1.ams.nl.geant.net", interfaces=cli.ALL_, point_group=point_group, config=config, ) assert mocked_load_inventory.call_args == call( url, inprov_hosts, cli.INVENTORY_INTERFACES_SCHEMA, ) def test_loads_interfaces_for_gws_indirect_points(mocked_load_inventory): mocked_load_inventory.return_value = [ { "name": "SOME_NAME", "interface": "lag-11.333", "hostname": "rt0.ams.nl", } ] config = { "inventory": ["some.inprov"], "brian-counters": {"inventory-url": "/brian/endpoint"}, "gws-indirect-counters": {"inventory-url": "/gws-indirect/endpoint"}, } cli.load_interfaces( "rt0.ams.nl", interfaces=cli.ALL_, point_group=PointGroup.GWS_INDIRECT, config=config, ) assert mocked_load_inventory.call_args == call( config["gws-indirect-counters"]["inventory-url"], config["inventory"], cli.GWS_INDIRECT_SCHEMA, ) @patch.object(cli, "influx_client") def test_write_points_to_influx(influx_client): points = [{"point": "one"}, {"point": "two"}] influx_params = {"influx": "param"} cli.OutputMethod.INFLUX.write_points( points=points, influx_params=influx_params, ) assert influx_client.call_args == call({"timeout": 5, "influx": "param"}) assert influx_client().__enter__.call_count assert influx_client().write_points.call_args == call(points, batch_size=50) def test_write_points_to_stdout(): stream = Mock() points = [{"point": "one"}, {"point": "two"}] influx_params = {"measurement": "meas"} cli.OutputMethod.STDOUT.write_points( points=points, influx_params=influx_params, output=cli.OutputMethod.STDOUT, stream=stream, ) assert stream.write.call_args_list == [ call('meas - {"point": "one"}\n'), call('meas - {"point": "two"}\n'), ] @patch.object(ncclient.manager, "connect") def test_netconf_connect(connect): with common.netconf_connect( hostname="some.router", ssh_params={"ssh": "param"}, other="more_params" ): pass assert connect.call_args == call( host="some.router", ssh="param", other="more_params", port=830 ) @patch.object(cli, "logging") def test_setup_logging_returns_message_counter(logging): logging.ERROR = 42 result = cli.setup_logging() assert isinstance(result, cli.MessageCounter) assert result.level == logging.ERROR @pytest.mark.parametrize( "verbosity, level", [ (True, "DEBUG"), (False, "INFO"), ], ) @patch.object(cli, "logging") def test_setup_logging_sets_loglevel(logging, verbosity, level): logging.ERROR = "ERROR" logging.INFO = "INFO" logging.DEBUG = "DEBUG" cli.setup_logging(verbosity) assert logging.basicConfig.call_args[1]["level"] == level assert logging.StreamHandler().setLevel.call_args[0][0] == level