-
Pelle Koster authoredPelle Koster authored
test_juniper.py 8.77 KiB
from datetime import datetime
import itertools
import re
from unittest.mock import call, patch
import jsonschema
import pytest
from brian_polling_manager import influx
from brian_polling_manager.interface_stats.vendors import common, juniper
from lxml import etree
from ncclient.operations.rpc import RPCReply
@pytest.fixture
def get_netconf(data_dir):
def _get_netconf(router_name):
return juniper.get_netconf_interface_info_from_source_dir(
router_name, source_dir=data_dir
)
return _get_netconf
@pytest.fixture
def juniper_router_doc_containing_every_field():
doc = """\
<interface-information>
<physical-interface>
<name>ae12</name>
<admin-status>up</admin-status>
<oper-status>up</oper-status>
<traffic-statistics>
<input-bytes>1</input-bytes>
<input-packets>2</input-packets>
<output-bytes>3</output-bytes>
<output-packets>4</output-packets>
<ipv6-transit-statistics>
<input-bytes>5</input-bytes>
<input-packets>6</input-packets>
<output-bytes>7</output-bytes>
<output-packets>8</output-packets>
</ipv6-transit-statistics>
</traffic-statistics>
<input-error-list>
<input-errors>11</input-errors>
<input-discards>12</input-discards>
<input-fifo-errors>13</input-fifo-errors>
<input-drops>14</input-drops>
<framing-errors>15</framing-errors>
<input-resource-errors>16</input-resource-errors>
</input-error-list>
<output-error-list>
<output-errors>21</output-errors>
<output-drops>22</output-drops>
<output-resource-errors>23</output-resource-errors>
<output-fifo-errors>24</output-fifo-errors>
<output-collisions>25</output-collisions>
<output-discards>26</output-discards>
</output-error-list>
<ethernet-mac-statistics>
<input-crc-errors>31</input-crc-errors>
<output-crc-errors>32</output-crc-errors>
<input-total-errors>33</input-total-errors>
<output-total-errors>34</output-total-errors>
</ethernet-mac-statistics>
<ethernet-pcs-statistics>
<bit-error-seconds>41</bit-error-seconds>
<errored-blocks-seconds>42</errored-blocks-seconds>
</ethernet-pcs-statistics>
<logical-interface>
<name>ae12.1</name>
<traffic-statistics>
<input-bytes>51</input-bytes>
<input-packets>52</input-packets>
<output-bytes>53</output-bytes>
<output-packets>54</output-packets>
<ipv6-transit-statistics>
<input-bytes>55</input-bytes>
<input-packets>56</input-packets>
<output-bytes>57</output-bytes>
<output-packets>58</output-packets>
</ipv6-transit-statistics>
</traffic-statistics>
</logical-interface>
</physical-interface>
</interface-information>
"""
return etree.fromstring(doc)
def test_verify_all_interfaces_present(
juniper_router_fqdn, polled_interfaces, get_netconf
):
"""
verify that all the interfaces we expect to poll
are available in the netconf data
compares a snapshot of all netconf docs with a
a snapshot of inventory /poller/interfaces
(the snapshots were all taken around the same time)
"""
def _is_enabled(ifc_name, ifc_doc):
m = re.match(r"^([^\.]+)\.?.*", ifc_name)
assert m # sanity: should never fail
phy = ifc_doc.xpath(
f'//interface-information/physical-interface[normalize-space(name)="{m.group(1)}"]'
)[0]
admin_status = phy.xpath("./admin-status/text()")[0].strip()
oper_status = phy.xpath("./oper-status/text()")[0].strip()
return admin_status == "up" and oper_status == "up"
if juniper_router_fqdn not in polled_interfaces:
pytest.skip(f"{juniper_router_fqdn} has no expected polled interfaces")
doc = get_netconf(juniper_router_fqdn)
phy = juniper._physical_interface_counters(doc)
log = juniper._logical_interface_counters(doc)
interfaces = set(x["name"] for x in itertools.chain(phy, log))
missing_interfaces = polled_interfaces[juniper_router_fqdn] - interfaces
for ifc_name in missing_interfaces:
# verify that any missing interfaces are admin/oper disabled
assert not _is_enabled(ifc_name, doc)
def test_physical_interface_counters(juniper_router_doc_containing_every_field):
result = list(
juniper._physical_interface_counters(juniper_router_doc_containing_every_field)
)
assert len(result) == 1
assert result[0] == {
"name": "ae12",
"brian": {
"ingressOctets": 1,
"ingressPackets": 2,
"egressOctets": 3,
"egressPackets": 4,
"ingressOctetsv6": 5,
"ingressPacketsv6": 6,
"egressOctetsv6": 7,
"egressPacketsv6": 8,
"ingressErrors": 11,
"ingressDiscards": 12,
"egressErrors": 21,
},
"errors": {
"input_discards": 12,
"input_fifo_errors": 13,
"input_drops": 14,
"input_framing_errors": 15,
"input_resource_errors": 16,
"output_drops": 22,
"output_resource_errors": 23,
"output_fifo_errors": 24,
"output_collisions": 25,
"output_discards": 26,
"input_crc_errors": 31,
"output_crc_errors": 32,
"input_total_errors": 33,
"output_total_errors": 34,
"bit_error_seconds": 41,
"errored_blocks_seconds": 42,
},
}
def test_logical_interface_counters(juniper_router_doc_containing_every_field):
result = list(
juniper._logical_interface_counters(juniper_router_doc_containing_every_field)
)
assert len(result) == 1
assert result[0] == {
"name": "ae12.1",
"brian": {
"ingressOctets": 51,
"ingressPackets": 52,
"egressOctets": 53,
"egressPackets": 54,
"ingressOctetsv6": 55,
"ingressPacketsv6": 56,
"egressOctetsv6": 57,
"egressPacketsv6": 58,
},
}
def test_juniper_router_docs_do_not_generate_errors(
get_netconf, juniper_router_fqdn, caplog
):
doc = get_netconf(juniper_router_fqdn)
counters = list(juniper.interface_counters(doc))
assert counters
assert not [r for r in caplog.records if r.levelname in ("ERROR", "WARNING")]
def test_validate_interface_counters_and_influx_points_for_all_juniper_routers(
juniper_router_fqdn, get_netconf
):
doc = get_netconf(juniper_router_fqdn)
interfaces = list(juniper.interface_counters(doc))
assert interfaces
for ifc in interfaces:
jsonschema.validate(ifc, common.INTERFACE_COUNTER_SCHEMA)
bpoints = list(
common.brian_points(
juniper_router_fqdn,
interfaces,
timestamp=datetime.now(),
measurement_name="blah",
)
)
assert bpoints
for point in bpoints:
jsonschema.validate(point, influx.INFLUX_POINT)
jsonschema.validate(point["fields"], common.BRIAN_POINT_FIELDS_SCHEMA)
epoints = list(
common.error_points(
juniper_router_fqdn,
interfaces,
timestamp=datetime.now(),
measurement_name="blah",
)
)
assert epoints
for point in epoints:
jsonschema.validate(point, influx.INFLUX_POINT)
jsonschema.validate(point["fields"], common.ERROR_POINT_FIELDS_SCHEMA)
class TestGetJuniperNetConf:
RAW_RESPONSE_FILE = "raw-response-juniper-sample.xml"
@pytest.fixture(autouse=True)
def mocked_connect(self, data_dir):
raw_response = data_dir.joinpath(self.RAW_RESPONSE_FILE).read_text()
with patch.object(juniper, "netconf_connect") as mock:
mock().__enter__().rpc.return_value = RPCReply(raw_response)
mock.reset_mock()
yield mock
def test_calls_ncclient_with_params(self, mocked_connect):
router_name = "some-router"
juniper.get_netconf_interface_info(router_name, ssh_params={"ssh": "param"})
mocked_connect.call_args = call(hostname="some-router", ssh="param", timeout=5)
assert (
mocked_connect().__enter__().rpc.call_args[0][0].tag
== "get-interface-information"
)
def test_converts_rpc_response_to_xml(self):
router_name = "some-router"
doc = juniper.get_netconf_interface_info(
router_name, ssh_params={"some": "param"}
)
assert doc.tag == "rpc-reply"