Skip to content
Snippets Groups Projects
test_juniper.py 8.45 KiB
from datetime import datetime
from unittest.mock import call, patch

import pytest
from brian_polling_manager import influx
from brian_polling_manager.interface_stats.vendors import common, juniper
from lxml import etree
from ncclient.operations.rpc import RPCReply


@pytest.fixture
def get_netconf(data_dir):
    def _get_netconf(router_name):
        return juniper.get_netconf_interface_info_from_source_dir(
            router_name, source_dir=data_dir
        )

    return _get_netconf


@pytest.fixture
def juniper_router_doc_containing_every_field():
    doc = """\
<interface-information>
    <physical-interface>
        <name>ae12</name>
        <admin-status>up</admin-status>
        <oper-status>up</oper-status>
        <traffic-statistics>
            <input-bytes>1</input-bytes>
            <input-packets>2</input-packets>
            <output-bytes>3</output-bytes>
            <output-packets>4</output-packets>
            <ipv6-transit-statistics>
                <input-bytes>5</input-bytes>
                <input-packets>6</input-packets>
                <output-bytes>7</output-bytes>
                <output-packets>8</output-packets>
            </ipv6-transit-statistics>
        </traffic-statistics>
        <input-error-list>
            <input-errors>11</input-errors>
            <input-discards>12</input-discards>
            <input-fifo-errors>13</input-fifo-errors>
            <input-drops>14</input-drops>
            <framing-errors>15</framing-errors>
            <input-resource-errors>16</input-resource-errors>
        </input-error-list>
        <output-error-list>
            <output-errors>21</output-errors>
            <output-drops>22</output-drops>
            <output-resource-errors>23</output-resource-errors>
            <output-fifo-errors>24</output-fifo-errors>
            <output-collisions>25</output-collisions>
            <output-discards>26</output-discards>
        </output-error-list>
        <ethernet-mac-statistics>
            <input-crc-errors>31</input-crc-errors>
            <output-crc-errors>32</output-crc-errors>
            <input-total-errors>33</input-total-errors>
            <output-total-errors>34</output-total-errors>
        </ethernet-mac-statistics>
        <ethernet-pcs-statistics>
            <bit-error-seconds>41</bit-error-seconds>
            <errored-blocks-seconds>42</errored-blocks-seconds>
        </ethernet-pcs-statistics>
        <logical-interface>
            <name>ae12.1</name>
            <traffic-statistics>
                <input-bytes>51</input-bytes>
                <input-packets>52</input-packets>
                <output-bytes>53</output-bytes>
                <output-packets>54</output-packets>
                <ipv6-transit-statistics>
                    <input-bytes>55</input-bytes>
                    <input-packets>56</input-packets>
                    <output-bytes>57</output-bytes>
                    <output-packets>58</output-packets>
                </ipv6-transit-statistics>
            </traffic-statistics>
        </logical-interface>
    </physical-interface>
</interface-information>
"""
    return etree.fromstring(doc)


def test_verify_all_interfaces_present(
    juniper_router_fqdn, juniper_inventory, get_netconf
):
    """
    verify that all the interfaces we expect to poll are available in the netconf data
    compares a snapshot of all netconf docs with a a snapshot of inventory
    /poller/interfaces (the snapshots were all taken around the same time)
    """

    if juniper_router_fqdn not in juniper_inventory:
        pytest.skip(f"{juniper_router_fqdn} has no expected polled interfaces")
    all_interfaces = juniper_inventory[juniper_router_fqdn]
    doc = get_netconf(juniper_router_fqdn)
    counters = juniper.interface_counters(doc, interfaces=all_interfaces)
    interfaces = {x["name"] for x in counters}
    missing = juniper_inventory[juniper_router_fqdn] - interfaces
    assert not missing


def test_interface_counters(juniper_router_doc_containing_every_field):
    result = list(
        juniper.interface_counters(
            juniper_router_doc_containing_every_field, interfaces=["ae12", "ae12.1"]
        )
    )
    assert result == [
        {
            "name": "ae12",
            "brian": {
                "ingressOctets": 1,
                "ingressPackets": 2,
                "egressOctets": 3,
                "egressPackets": 4,
                "ingressOctetsv6": 5,
                "ingressPacketsv6": 6,
                "egressOctetsv6": 7,
                "egressPacketsv6": 8,
                "ingressErrors": 11,
                "ingressDiscards": 12,
                "egressErrors": 21,
            },
            "errors": {
                "input_discards": 12,
                "input_fifo_errors": 13,
                "input_drops": 14,
                "input_framing_errors": 15,
                "input_resource_errors": 16,
                "output_drops": 22,
                "output_resource_errors": 23,
                "output_fifo_errors": 24,
                "output_collisions": 25,
                "output_discards": 26,
                "input_crc_errors": 31,
                "output_crc_errors": 32,
                "input_total_errors": 33,
                "output_total_errors": 34,
                "bit_error_seconds": 41,
                "errored_blocks_seconds": 42,
            },
        },
        {
            "name": "ae12.1",
            "brian": {
                "ingressOctets": 51,
                "ingressPackets": 52,
                "egressOctets": 53,
                "egressPackets": 54,
                "ingressOctetsv6": 55,
                "ingressPacketsv6": 56,
                "egressOctetsv6": 57,
                "egressPacketsv6": 58,
            },
        },
    ]


def test_juniper_router_docs_do_not_generate_errors(
    get_netconf, juniper_router_fqdn, caplog, juniper_inventory
):
    doc = get_netconf(juniper_router_fqdn)
    counters = list(
        juniper.interface_counters(
            doc, interfaces=juniper_inventory[juniper_router_fqdn]
        )
    )
    assert counters
    assert not [r for r in caplog.records if r.levelname in ("ERROR", "WARNING")]


def test_validate_interface_counters_and_influx_points_for_all_juniper_routers(
    juniper_router_fqdn, get_netconf, juniper_inventory, schemavalidate
):
    doc = get_netconf(juniper_router_fqdn)

    interfaces = list(
        juniper.interface_counters(
            doc, interfaces=juniper_inventory[juniper_router_fqdn]
        )
    )
    assert interfaces
    for ifc in interfaces:
        schemavalidate(ifc, common.INTERFACE_COUNTER_SCHEMA)
        schemavalidate(ifc["brian"], common.BRIAN_POINT_FIELDS_SCHEMA)

    bpoints = list(
        common.brian_points(
            juniper_router_fqdn,
            interfaces,
            timestamp=datetime.now(),
            measurement_name="blah",
        )
    )
    assert bpoints
    for point in bpoints:
        schemavalidate(point, influx.INFLUX_POINT)
        schemavalidate(point["fields"], common.BRIAN_POINT_FIELDS_SCHEMA)
        for value in point['fields'].values():
            assert isinstance(value, float)

    epoints = list(
        common.error_points(
            juniper_router_fqdn,
            interfaces,
            timestamp=datetime.now(),
            measurement_name="blah",
        )
    )
    assert epoints
    for point in epoints:
        schemavalidate(point, influx.INFLUX_POINT)
        schemavalidate(point["fields"], common.ERROR_POINT_FIELDS_SCHEMA)


class TestGetJuniperNetConf:
    RAW_RESPONSE_FILE = "raw-response-juniper-sample.xml"

    @pytest.fixture(autouse=True)
    def mocked_connect(self, data_dir):
        raw_response = data_dir.joinpath(self.RAW_RESPONSE_FILE).read_text()
        with patch.object(juniper, "netconf_connect") as mock:
            mock().__enter__().rpc.return_value = RPCReply(raw_response)
            mock.reset_mock()
            yield mock

    def test_calls_ncclient_with_params(self, mocked_connect):
        router_name = "some-router"
        juniper.get_netconf_interface_info(router_name, ssh_params={"ssh": "param"})

        mocked_connect.call_args = call(hostname="some-router", ssh="param", timeout=5)

        assert (
            mocked_connect().__enter__().rpc.call_args[0][0].tag
            == "get-interface-information"
        )

    def test_converts_rpc_response_to_xml(self):
        router_name = "some-router"

        doc = juniper.get_netconf_interface_info(
            router_name, ssh_params={"some": "param"}
        )

        assert doc.tag == "rpc-reply"