Skip to content
Snippets Groups Projects
Select Git revision
  • d0ee269e417ec1bb759d074fb77f23b3eaba601a
  • develop default protected
  • inventory-auth
  • master protected
  • feature/POL1-813-error-report-sensu-check
  • 0.21
  • 0.20
  • 0.19
  • 0.18
  • 0.17
  • 0.16
  • 0.15
  • 0.14
  • 0.13
  • 0.12
  • 0.11
  • 0.10
  • 0.9
  • 0.8
  • 0.7
  • 0.6
  • 0.5
  • 0.4
  • 0.3
  • 0.2
25 results

test_interface_stats.py

Blame
  • user avatar
    Pelle Koster authored
    d0ee269e
    History
    test_interface_stats.py 8.23 KiB
    import datetime
    from functools import partial
    import itertools
    import re
    from typing import Iterable
    from unittest.mock import MagicMock, call, patch
    
    from brian_polling_manager.interface_stats.services.writers import (
        InfluxPointWriter,
        PointWriter,
    )
    import jsonschema
    import pytest
    from brian_polling_manager import influx
    from brian_polling_manager.interface_stats import brian, cli, errors, vendors
    from brian_polling_manager.interface_stats.services.netconf import (
        JuniperNetconfProvider,
        get_netconf_provider,
    )
    from brian_polling_manager.interface_stats.vendors import juniper
    from ncclient.operations.rpc import RPCReply
    
    
    def test_sanity_check_snapshot_data(polled_interfaces, all_juniper_routers):
        """
        verify that all routers with interfaces to be polled
        are in the test data set
        :return:
        """
        missing_routers = set(polled_interfaces.keys()) - set(all_juniper_routers)
        assert len(missing_routers) == 0
    
    
    def _is_enabled(ifc_name, ifc_doc):
        m = re.match(r"^([^\.]+)\.?.*", ifc_name)
        assert m  # sanity: should never fail
        phy = ifc_doc.xpath(
            f'//interface-information/physical-interface[normalize-space(name)="{m.group(1)}"]'
        )[0]
        admin_status = phy.xpath("./admin-status/text()")[0].strip()
        oper_status = phy.xpath("./oper-status/text()")[0].strip()
    
        return admin_status == "up" and oper_status == "up"
    
    
    def test_verify_all_interfaces_present(
        single_router_fqdn, get_netconf, polled_interfaces
    ):
        """
        verify that all the interfaces we expect to poll
        are available in the netconf data
        compares a snapshot of all netconf docs with a
        a snapshot of inventory /poller/interfaces
        (the snapshots were all taken around the same time)
        """
        if single_router_fqdn not in polled_interfaces:
            pytest.skip(f"{single_router_fqdn} has no expected polled interfaces")
    
        doc = get_netconf(single_router_fqdn)
        phy = juniper.physical_interface_counters(doc)
        log = juniper.logical_interface_counters(doc)
        interfaces = set(x["name"] for x in itertools.chain(phy, log))
        missing_interfaces = polled_interfaces[single_router_fqdn] - interfaces
        for ifc_name in missing_interfaces:
            # verify that any missing interfaces are admin/oper disabled
            assert not _is_enabled(ifc_name, doc)
    
    
    @pytest.fixture(
        params=[
            (
                juniper.physical_interface_counters,
                vendors.PHYSICAL_INTERFACE_COUNTER_SCHEMA,
            ),
            (
                juniper.logical_interface_counters,
                vendors.LOGICAL_INTERFACE_COUNTER_SCHEMA,
            ),
        ]
    )
    def interface_counter_calculator(request):
        return request.param
    
    
    def test_validate_interface_counters(
        interface_counter_calculator, single_router_fqdn, get_netconf
    ):
        fun, schema = interface_counter_calculator
        doc = get_netconf(single_router_fqdn)
        interfaces = list(fun(doc))
        assert interfaces
        for ifc in interfaces:
            jsonschema.validate(ifc, schema)
    
    
    def test_interface_counters(router_fqdn, interface_counter_calculator, get_netconf):
        fun, _ = interface_counter_calculator
        doc = get_netconf(router_fqdn)
        interfaces = list(fun(doc))
        assert interfaces
    
    
    @pytest.fixture(
        params=[
            (brian.counters, brian.BRIAN_COUNTER_DICT_SCHEMA),
            (errors.counters, errors.ERROR_COUNTER_DICT_SCHEMA),
        ]
    )
    def point_counter_calculator(request):
        return request.param
    
    
    def test_validate_point_counters(
        single_router_fqdn,
        interface_counter_calculator,
        point_counter_calculator,
        get_netconf,
    ):
        interface_fun, _ = interface_counter_calculator
        point_fun, schema = point_counter_calculator
        if (
            interface_fun is juniper.logical_interface_counters
            and point_fun is errors.counters
        ):
            # We should not have any error counters for logical interfaces
            pytest.skip()
    
        doc = get_netconf(single_router_fqdn)
        interfaces = interface_fun(doc)
        counters = list(
            point_fun(router_fqdn=single_router_fqdn, interface_counters=interfaces)
        )
    
        assert counters
        for ctrs in counters:
            jsonschema.validate(ctrs, schema)
    
    
    def test_point_counters(
        router_fqdn,
        interface_counter_calculator,
        point_counter_calculator,
        get_netconf,
    ):
        interface_fun, _ = interface_counter_calculator
        point_fun, _ = point_counter_calculator
        doc = get_netconf(router_fqdn)
        interfaces = interface_fun(doc)
        counters = list(point_fun(router_fqdn=router_fqdn, interface_counters=interfaces))
        if (
            interface_fun is juniper.logical_interface_counters
            and point_fun is errors.counters
        ):
            # We should not have any error counters for logical interfaces
            assert not counters
        else:
            assert counters
    
    
    @pytest.fixture(params=[cli._brian_points, cli._error_points])
    def influx_point_calculator(request):
        return request.param
    
    
    def test_validate_influx_points(
        single_router_fqdn, influx_point_calculator, get_netconf
    ):
        points = list(
            influx_point_calculator(
                router_fqdn=single_router_fqdn,
                netconf_doc=get_netconf(single_router_fqdn),
                timestamp=datetime.datetime.now(),
                measurement_name="blah",
            )
        )
        assert len(points)
        for point in points:
            jsonschema.validate(point, influx.INFLUX_POINT)
            assert point["fields"]  # any trivial points should already be filtered
    
    
    def test_influx_points(router_fqdn, influx_point_calculator, get_netconf):
        points = list(
            influx_point_calculator(
                router_fqdn=router_fqdn,
                netconf_doc=get_netconf(router_fqdn),
                timestamp=datetime.datetime.now(),
                measurement_name="blah",
            )
        )
        assert len(points)
        for point in points:
            assert point["fields"]  # any trivial points should already be filtered
    
    
    def test_main_for_all_juniper_routers(all_juniper_routers, data_dir):
        config = {
            "juniper": {"some": "params"},
            "influx": {
                "brian-counters": {"measurement": "brian"},
                "error-counters": {"measurement": "error"},
            },
        }
    
        class ValidatingPointWriter(PointWriter):
            def __init__(self, *args, **kwargs) -> None:
                pass
    
            def write_points(self, points: Iterable[dict]):
                points = list(points)
                assert points
                for point in points:
                    jsonschema.validate(point, influx.INFLUX_POINT)
                    assert point["fields"]  # must contain at least one field
    
        cli.main(
            app_config_params=config,
            juniper_fqdns=all_juniper_routers,
            nokia_fqdns=[],
            get_point_writer_=ValidatingPointWriter,
            get_netconf_provider_=partial(get_netconf_provider, source_dir=data_dir),
            raise_errors=True,
        )
    
    
    class TestJuniperNetconfProvider:
        RAW_RESPONSE_FILE = "raw-response-sample.xml"
    
        @pytest.fixture(autouse=True)
        def mocked_rpc(self, data_dir):
            raw_response = data_dir.joinpath(self.RAW_RESPONSE_FILE).read_text()
            with patch(
                "brian_polling_manager.interface_stats.services.netconf._rpc",
                return_value=RPCReply(raw_response),
            ) as mock:
                yield mock
    
        def test_calls_rpc_with_params(self, mocked_rpc):
            ssh_params = {}
            router_name = "some-router"
            provider = JuniperNetconfProvider(ssh_params)
    
            provider.get(router_name)
    
            call_args = mocked_rpc.call_args
            assert call_args[0][0] == router_name
            assert call_args[1]["ssh_params"] is ssh_params
            assert call_args[1]["command"].tag == "get-interface-information"
    
        def test_converts_rpc_response_to_xml(self):
            ssh_params = {}
            router_name = "some-router"
            provider = JuniperNetconfProvider(ssh_params)
    
            result = provider.get(router_name)
    
            assert result.tag == "rpc-reply"
    
    
    def test_influx_point_writer():
        client_factory = MagicMock()
        influx_params = {"some": "param"}
        writer = InfluxPointWriter(influx_params, timeout=10, client_factory=client_factory)
        writer.write_points(["point1", "point2"])
        assert client_factory.call_args == call({"timeout": 10, "some": "param"})
        assert client_factory().__enter__.call_count == 1
        assert client_factory().write_points.call_args == call(["point1", "point2"])