Skip to content
Snippets Groups Projects
Select Git revision
  • fd3870ee58c52d63a74ffaf8cfa2edbd72f168b4
  • python3 default protected
  • feature/exabgp_support2
  • feature/exabgp_support2.bgpextcommunity
  • feature/exabgp_support2.django4.2
  • fix/existingcheck_honor_fragtype
  • feature/python3-authz_netmask
  • feature/authz_netmask
  • fix/wrong_ratelimit_stats
  • feature/requirements_version_update2024-01
  • feature/split_celery
  • feature/improved-warning-mails
  • fix/reenable_expireset_via_restapi
  • feature/admin_user_delete_with_owned_rule_reassigning1
  • feature/admin_user_delete_with_owned_rule_reassigning
  • feature/branded_doc
  • fix/forked_snmp_polling_worker_exit_issue
  • fix/false_user_activation_error
  • feature/exabgp_with_docker-compose
  • fix/prefix_overlap_handling
  • fix/js_security_issues-a
  • save1
  • rpm-1.5-7
  • working1
  • myv1.6
  • t12b1
  • v1.5_newnew2
  • merged_final
  • v1.5_newnew
  • startstop_old
  • myadd2
  • tomas3
  • merge_jra2t6_and_RESTAPI
  • mytomas2
  • mynew1
  • new_jra2t6
  • v1.5_final
  • fod16_ruleroutes-merged_old
  • merged_new
  • v1.6_new_old
  • v1.5_new_old_follower
41 results

tasks.py

Blame
  • test_interface_stats.py 8.40 KiB
    from datetime import datetime
    from unittest.mock import Mock, call, patch
    
    from brian_polling_manager import influx
    import jsonschema
    import pytest
    from brian_polling_manager.interface_stats import cli
    from brian_polling_manager.interface_stats.vendors import Vendor, common
    from lxml import etree
    import ncclient.manager
    
    
    def test_sanity_check_juniper_snapshot_data(polled_interfaces, all_juniper_routers):
        """
        verify that all routers with interfaces to be polled
        are in the test data set
        :return:
        """
        missing_routers = set(polled_interfaces.keys()) - set(all_juniper_routers)
        assert len(missing_routers) == 0
    
    
    def test_sanity_check_nokia_snapshot_data(all_nokia_routers):
        # TODO: update this once we have Nokia routers in inventory provider
        assert set(all_nokia_routers) == {
            "rt0.ams.nl.lab.office.geant.net",
            "rt0.lon.uk.lab.office.geant.net",
        }
    
    
    class TestParseCounters:
        def test_parse_counters(self):
            xml = """<root><ab>42</ab></root>"""
            struct = {"something": {"path": "./ab", "transform": int}}
            result = common.parse_interface_xml(etree.fromstring(xml), struct)
            assert result == {"something": 42}
    
        def test_parse_counters_multiple_path(self):
            xml = """<root><a>This is something</a></root>"""
            struct = {"something": {"path": ["./b", "./a"], "transform": str}}
            result = common.parse_interface_xml(etree.fromstring(xml), struct)
            assert result == {"something": "This is something"}
    
        def test_parse_counters_nested(self):
            xml = """<root><a>This is something</a></root>"""
            struct = {"something": {"nested": {"path": "./a", "transform": str}}}
            result = common.parse_interface_xml(etree.fromstring(xml), struct)
            assert result == {"something": {"nested": "This is something"}}
    
        def test_skips_unavailable_field(self):
            xml = """<root><a>This is something</a></root>"""
            struct = {
                "something": {"path": "./a", "transform": str},
                "something_else": {"nested": {"path": "./b", "transform": str}},
            }
            result = common.parse_interface_xml(etree.fromstring(xml), struct)
            assert result == {"something": "This is something"}
    
        def test_logs_on_missing_required_field(self, caplog):
            xml = """<root></root>"""
            struct = {
                "something": {"path": "./a", "transform": str, "required": True},
            }
            result = common.parse_interface_xml(etree.fromstring(xml), struct)
            assert result is None
            record = caplog.records[0]
            assert record.levelname == "ERROR"
            assert "required path ./a" in record.message
    
        def test_logs_on_double_entry(self, caplog):
            xml = """<root><a>This is something</a><a>Something Else</a></root>"""
            struct = {
                "something": {"path": "./a", "transform": str},
            }
            result = common.parse_interface_xml(etree.fromstring(xml), struct)
            assert result == {"something": "This is something"}
            record = caplog.records[0]
            assert record.levelname == "WARNING"
            assert "found more than one ./a" in record.message
    
    
    def test_brian_point_counters():
        now = datetime.now()
        points = list(
            common.brian_points(
                router_fqdn="some.router",
                interfaces=[
                    {"name": "ae12", "brian": {"some": "fields"}},
                    {"name": "ae12.1", "brian": {"more": "other fields"}},
                ],
                timestamp=now,
                measurement_name="blah",
            )
        )
    
        assert points == [
            {
                "time": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
                "measurement": "blah",
                "tags": {"hostname": "some.router", "interface_name": "ae12"},
                "fields": {"some": "fields"},
            },
            {
                "time": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
                "measurement": "blah",
                "tags": {"hostname": "some.router", "interface_name": "ae12.1"},
                "fields": {"more": "other fields"},
            },
        ]
    
    
    def test_error_point_counters():
        now = datetime.now()
        points = list(
            common.error_points(
                router_fqdn="some.router",
                interfaces=[
                    {"name": "ae12", "errors": {"some": "fields"}},
                ],
                timestamp=now,
                measurement_name="blah-errors",
            )
        )
    
        assert points == [
            {
                "time": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
                "measurement": "blah-errors",
                "tags": {"hostname": "some.router", "interface_name": "ae12"},
                "fields": {"some": "fields"},
            }
        ]
    
    
    def test_no_error_point_counters():
        now = datetime.now()
        points = list(
            common.error_points(
                router_fqdn="some.router",
                interfaces=[
                    {
                        "name": "ae12.1",
                    },
                    {"name": "ae12", "errors": {"some": "fields"}},
                ],
                timestamp=now,
                measurement_name="blah-errors",
            )
        )
    
        assert points == [
            {
                "time": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
                "measurement": "blah-errors",
                "tags": {"hostname": "some.router", "interface_name": "ae12"},
                "fields": {"some": "fields"},
            }
        ]
    
    
    @patch.object(cli, "write_points")
    def test_main_for_all_juniper_routers(
        write_points, mocked_get_netconf, all_juniper_routers
    ):
        config = {
            "juniper": {"some": "params"},
            "influx": {
                "brian-counters": {"measurement": "brian"},
                "error-counters": {"measurement": "error"},
            },
        }
        total_points = 0
        calls = 0
    
        def validate(points, *_, **__):
            nonlocal total_points, calls
            points = list(points)
            calls += 1
            assert points
            for point in points:
                total_points += 1
                jsonschema.validate(point, influx.INFLUX_POINT)
                assert point["fields"]  # must contain at least one field
    
        write_points.side_effect = validate
    
        cli.main(
            app_config_params=config,
            router_fqdns=all_juniper_routers,
            vendor=Vendor.JUNIPER,
            raise_errors=True,
        )
    
        assert calls > 0
        assert total_points > 0
    
    
    @pytest.fixture
    def load_interfaces():
        return Mock(
            return_value=[{"router": "host1"}, {"router": "host2"}, {"router": "host3"}]
        )
    
    
    def test_validate_valid_hosts(load_interfaces):
        assert cli.validate_router_hosts(
            ("host1", "host2"),
            vendor=Vendor.JUNIPER,
            inprov_hosts=["some_host"],
            load_interfaces_=load_interfaces,
        )
        assert load_interfaces.called
    
    
    def test_validate_invalid_hosts(load_interfaces):
        with pytest.raises(ValueError):
            cli.validate_router_hosts(
                ("host1", "invalid"),
                vendor=Vendor.JUNIPER,
                inprov_hosts=["some_host"],
                load_interfaces_=load_interfaces,
            )
        assert load_interfaces.called
    
    
    def test_doesnt_validate_without_inprov_hosts(load_interfaces):
        assert cli.validate_router_hosts(
            ("host1", "invalid"),
            vendor=Vendor.JUNIPER,
            inprov_hosts=None,
            load_interfaces_=load_interfaces,
        )
        assert not load_interfaces.called
    
    
    @patch.object(cli, "influx_client")
    def test_write_points_to_influx(influx_client):
        points = [{"point": "one"}, {"point": "two"}]
        influx_params = {"influx": "param"}
        cli.write_points(
            points=points,
            influx_params=influx_params,
            output=cli.OutputMethod.INFLUX,
        )
        assert influx_client.call_args == call({"timeout": 5, "influx": "param"})
        assert influx_client().__enter__.call_count
        assert influx_client().write_points.call_args == call(points, batch_size=50)
    
    
    def test_write_points_to_stdout():
        stream = Mock()
        points = [{"point": "one"}, {"point": "two"}]
        influx_params = {"measurement": "meas"}
        cli.write_points(
            points=points,
            influx_params=influx_params,
            output=cli.OutputMethod.STDOUT,
            stream=stream,
        )
        assert stream.write.call_args_list == [
            call('meas - {"point": "one"}\n'),
            call('meas - {"point": "two"}\n'),
        ]
    
    
    @patch.object(ncclient.manager, "connect")
    def test_netconf_connect(connect):
        with common.netconf_connect(
            hostname="some.router", ssh_params={"ssh": "param"}, other="more_params"
        ):
            pass
    
        assert connect.call_args == call(
            host="some.router", ssh="param", other="more_params", port=830
        )