Skip to content
Snippets Groups Projects
Commit 9e6d9a3f authored by Erik Reid's avatar Erik Reid
Browse files

added error stat output, and some refactoring

parent 4342a30a
No related branches found
No related tags found
No related merge requests found
......@@ -20,6 +20,7 @@ PHYSICAL_INTERFACE_COUNTER_SCHEMA = {
'resource': {'type': 'integer'},
'discards': {'type': 'integer'},
'fifo': {'type': 'integer'},
'framing': {'type': 'integer'},
},
'required': ['errors', 'drops', 'resource', 'discards'],
'additionalProperties': False
......@@ -80,6 +81,7 @@ PHYSICAL_INTERFACE_COUNTER_SCHEMA = {
'properties': {
'crc': {'type': 'integer'},
'fifo': {'type': 'integer'},
'total': {'type': 'integer'},
},
'required': ['crc', 'fifo'],
'additionalProperties': False
......
from datetime import datetime
BRIAN_COUNTER_DICT_SCHEMA = {
'$schema': 'https://json-schema.org/draft/2020-12/schema',
......@@ -68,25 +66,3 @@ def counters(router_fqdn, interface_counters):
return _p
return map(_counters2point, interface_counters)
def ctr2point(measurement, counters):
"""
:param measurement: the measurement where the point will be written
:param counters: a BRIAN_COUNTER_DICT_SCHEMA object
:return: a brian_polling_manager.influx.INFLUX_POINT object
"""
def _is_tag(key_name):
return key_name == 'hostname' or key_name == 'interface_name'
fields = dict([_k, _v] for _k, _v in counters.items() if not _is_tag(_k))
tags = dict([_k, _v] for _k, _v in counters.items() if _is_tag(_k))
return {
'measurement': measurement,
'tags': tags,
'fields': fields,
# TODO: let's set this at neconf query/response time
'time': datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ'),
}
import contextlib
from datetime import datetime
from functools import partial
import json
import click
import jsonschema
from brian_polling_manager.interface_stats import config, brian, juniper
from brian_polling_manager.interface_stats import config, brian, errors, juniper
from brian_polling_manager import inventory, influx
......@@ -69,6 +70,29 @@ def setup_logging():
logging.config.dictConfig(logging_config)
def ctr2point(measurement, counters):
"""
:param measurement: the measurement where the point will be written
:param counters: a ERROR_POINT_SCHEMA object
:return: a brian_polling_manager.influx.INFLUX_POINT object
"""
def _is_tag(key_name):
return key_name == 'hostname' or key_name == 'interface_name'
fields = dict([_k, _v] for _k, _v in counters.items() if not _is_tag(_k))
tags = dict([_k, _v] for _k, _v in counters.items() if _is_tag(_k))
return {
'measurement': measurement,
'tags': tags,
'fields': fields,
# TODO: let's set this at neconf query/response time
'time': datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ'),
}
def _validate_config(_unused_ctx, _unused_param, file):
try:
return config.load(file)
......@@ -93,11 +117,21 @@ def fqdn_netconf_doc_map(app_config_params):
def _brian_points(router_fqdn, netconf_doc, measurement_name):
interfaces = juniper.physical_interface_counters(netconf_doc)
counters = brian.counters(router_fqdn=router_fqdn, interface_counters=interfaces)
yield from map(partial(brian.ctr2point, measurement_name), counters)
yield from map(partial(ctr2point, measurement_name), counters)
interfaces = juniper.logical_interface_counters(netconf_doc)
counters = brian.counters(router_fqdn=router_fqdn, interface_counters=interfaces)
yield from map(partial(brian.ctr2point, measurement_name), counters)
yield from map(partial(ctr2point, measurement_name), counters)
def _error_points(router_fqdn, netconf_doc, measurement_name):
interfaces = juniper.physical_interface_counters(netconf_doc)
counters = errors.counters(router_fqdn=router_fqdn, interface_counters=interfaces)
yield from map(partial(ctr2point, measurement_name), counters)
interfaces = juniper.logical_interface_counters(netconf_doc)
counters = errors.counters(router_fqdn=router_fqdn, interface_counters=interfaces)
yield from map(partial(ctr2point, measurement_name), counters)
def _main(app_config_params: dict):
......@@ -112,6 +146,7 @@ def _main(app_config_params: dict):
setup_logging()
nc_doc_map = fqdn_netconf_doc_map(app_config_params)
influx_params = app_config_params['influx']['brian-counters']
for _fqdn, _ncdoc in nc_doc_map.items():
points = _brian_points(
......@@ -122,6 +157,15 @@ def _main(app_config_params: dict):
with contextlib.closing(influx.influx_client(influx_params)) as client:
client.write_points(points)
influx_params = app_config_params['influx']['error-counters']
for _fqdn, _ncdoc in nc_doc_map.items():
points = _error_points(
router_fqdn=_fqdn,
netconf_doc=_ncdoc,
measurement_name=influx_params['measurement'])
with contextlib.closing(influx.influx_client(influx_params)) as client:
client.write_points(points)
@click.command()
@click.option(
......
ERROR_POINT_SCHEMA = {
ERROR_COUNTER_DICT_SCHEMA = {
'$schema': 'https://json-schema.org/draft/2020-12/schema',
'type': 'object',
'properties': {
'hostname': {'type': 'string'},
'interface_name': {'type': 'string'},
'framing-errors': {'type': 'integer'},
'bit-error-seconds': {'type': 'integer'},
'input-crc-errors': {'type': 'integer'},
'input-total-errors': {'type': 'integer'},
'input-discards': {'type': 'integer'},
'input-drops': {'type': 'integer'},
'output-drops': {'type': 'integer'},
'bit_error_seconds': {'type': 'integer'},
'errored_blocks_seconds': {'type': 'integer'},
'input_crc_errors': {'type': 'integer'},
'input_fifo_errors': {'type': 'integer'},
'input_total_errors': {'type': 'integer'},
'input_discards': {'type': 'integer'},
'input_drops': {'type': 'integer'},
'output_drops': {'type': 'integer'},
'output_crc_errors': {'type': 'integer'},
'output_fifo_errors': {'type': 'integer'},
'output_total_errors': {'type': 'integer'},
'input_framing_errors': {'type': 'integer'},
'output_resource_errors': {'type': 'integer'},
'input_resource_errors': {'type': 'integer'},
'output_collisions': {'type': 'integer'},
},
'required': ['hostname', 'interface_name', 'egressOctets', 'ingressOctets'],
'required': ['hostname', 'interface_name'],
'additionalProperties': False
}
def points(router_fqdn, interface_counters):
def counters(router_fqdn, interface_counters):
"""
:param router_fqdn: hostname
:param interface_counters: either PHYSICAL_INTERFACE_COUNTER_SCHEMA
......@@ -28,26 +36,70 @@ def points(router_fqdn, interface_counters):
def _counters2point(_ifc):
_brct = _ifc['brian']
_p = {
'hostname': router_fqdn,
'interface_name': _ifc['name'],
'egressOctets': _brct['egress']['bytes'],
'ingressOctets': _brct['ingress']['bytes']
'interface_name': _ifc['name']
}
if 'v6' in _brct['egress']:
_p['egressOctetsv6'] = _brct['egress']['v6']['bytes']
ingress_errors = _brct['ingress'].get('errors', {})
if 'framing' in ingress_errors:
_p['input_framing_errors'] = ingress_errors['framing']
if 'discards' in ingress_errors:
_p['input_discards'] = ingress_errors['discards']
if 'drops' in ingress_errors:
_p['input_drops'] = ingress_errors['drops']
if 'fifo' in ingress_errors:
_p['input_fifo_errors'] = ingress_errors['fifo']
if 'drops' in ingress_errors:
_p['input_drops'] = ingress_errors['drops']
if 'resource' in ingress_errors:
_p['input_resource_errors'] = ingress_errors['resource']
egress_errors = _brct['egress'].get('errors', {})
if 'drops' in egress_errors:
_p['output_drops'] = egress_errors['drops']
if 'fifo' in egress_errors:
_p['output_fifo_errors'] = egress_errors['fifo']
if 'collisions' in egress_errors:
_p['output_collisions'] = egress_errors['collisions']
if 'resource' in egress_errors:
_p['output_resource_errors'] = egress_errors['resource']
if 'v6' in _brct['ingress']:
_p['ingressOctetsv6'] = _brct['ingress']['v6']['bytes']
if 'errors' in _brct['egress']:
_p['egressErrors'] = _brct['egress']['errors']['errors']
if 'l2' in _ifc:
_l2ct = _ifc['l2']
ingress_errors = _l2ct['ingress'].get('errors', {})
if 'errors' in _l2ct['ingress']:
if 'crc' in ingress_errors:
_p['input_crc_errors'] = ingress_errors['crc']
# use the value from brian/l3
# if 'fifo' in ingress_errors:
# _p['input_fifo_errors'] = ingress_errors['fifo']
if 'total' in ingress_errors:
_p['input_total_errors'] = ingress_errors['total']
egress_errors = _l2ct['egress'].get('errors', {})
if 'errors' in _l2ct['egress']:
if 'crc' in egress_errors:
_p['output_crc_errors'] = egress_errors['crc']
# use the value from brian/l3
# if 'fifo' in egress_errors:
# _p['output_fifo_errors'] = egress_errors['fifo']
if 'total' in egress_errors:
_p['output_total_errors'] = egress_errors['total']
pcs_stats = _l2ct.get('pcs', {})
if 'bit-error-seconds' in pcs_stats:
_p['bit_error_seconds'] = pcs_stats['bit-error-seconds']
if 'errored-blocks-seconds' in pcs_stats:
_p['errored_blocks_seconds'] = pcs_stats['errored-blocks-seconds']
if 'errors' in _brct['ingress']:
_p['ingressDiscards'] = _brct['ingress']['errors']['discards']
_p['ingressErrors'] = _brct['ingress']['errors']['errors']
return _p
return map(_counters2point, interface_counters)
def _has_data(ctrs):
return len(set(ctrs.keys()) - {'hostname', 'interface_name'}) > 0
return filter(_has_data, map(_counters2point, interface_counters))
......@@ -132,6 +132,7 @@ def physical_interface_counters(ifc_doc):
{'path': './input-error-list/input-resource-errors', 'key': 'resource'},
{'path': './input-error-list/input-discards', 'key': 'discards'},
{'path': './input-error-list/input-fifo-errors', 'key': 'fifo'},
{'path': './input-error-list/framing-errors', 'key': 'framing'},
]
egress_error_fields = [
{'path': './output-error-list/output-errors', 'key': 'errors'},
......@@ -183,14 +184,14 @@ def physical_interface_counters(ifc_doc):
ingress_error_fields = [
# this one isn't available on qfx's
# 'total': _elem_int(mac_stats, 'input-total-errors'),
{'path': '././ethernet-mac-statistics/input-crc-errors', 'key': 'crc'},
{'path': '././ethernet-mac-statistics/input-fifo-errors', 'key': 'fifo'},
{'path': '././ethernet-mac-statistics/input-total-errors', 'key': 'total'},
]
egress_error_fields = [
# 'total': _elem_int(mac_stats, 'input-total-errors'),
{'path': '././ethernet-mac-statistics/output-crc-errors', 'key': 'crc'},
{'path': '././ethernet-mac-statistics/output-fifo-errors', 'key': 'fifo'},
{'path': '././ethernet-mac-statistics/output-total-errors', 'key': 'total'},
]
errors = _counter_dict(ifc_node, ingress_error_fields)
......
......@@ -18,10 +18,8 @@ from lxml import etree
import pytest
import yaml
from brian_polling_manager.interface_stats import \
PHYSICAL_INTERFACE_COUNTER_SCHEMA, LOGICAL_INTERFACE_COUNTER_SCHEMA
from brian_polling_manager.interface_stats import brian, juniper, cli
from brian_polling_manager import inventory, influx
from brian_polling_manager.interface_stats import brian, errors, juniper, cli
from brian_polling_manager import inventory, influx, interface_stats
DATA_DIRNAME = os.path.join(os.path.dirname(__file__), 'data', 'interface-info-snapshots')
DATA_FILENAME_EXTENSION = '-interface-info.xml'
......@@ -65,14 +63,14 @@ def ifc_netconf_rpc():
def test_validate_physical_counter_schema(router_fqdn, ifc_netconf_rpc):
doc = juniper.get_interface_info_ncrpc(router_fqdn)
for ifc in juniper.physical_interface_counters(doc):
jsonschema.validate(ifc, PHYSICAL_INTERFACE_COUNTER_SCHEMA)
jsonschema.validate(ifc, interface_stats.PHYSICAL_INTERFACE_COUNTER_SCHEMA)
@pytest.mark.parametrize('router_fqdn', ROUTERS)
def test_validate_logical_counters_schema(router_fqdn, ifc_netconf_rpc):
doc = juniper.get_interface_info_ncrpc(router_fqdn)
for ifc in juniper.logical_interface_counters(doc):
jsonschema.validate(ifc, LOGICAL_INTERFACE_COUNTER_SCHEMA)
jsonschema.validate(ifc, interface_stats.LOGICAL_INTERFACE_COUNTER_SCHEMA)
@pytest.fixture
......@@ -144,6 +142,15 @@ def test_physical_brian_counters(router_fqdn, ifc_netconf_rpc):
jsonschema.validate(ctrs, brian.BRIAN_COUNTER_DICT_SCHEMA)
@pytest.mark.parametrize('router_fqdn', ROUTERS)
def test_physical_error_counters(router_fqdn, ifc_netconf_rpc):
doc = juniper.get_interface_info_ncrpc(router_fqdn)
interfaces = juniper.physical_interface_counters(doc)
counters = errors.counters(router_fqdn=router_fqdn, interface_counters=interfaces)
for ctrs in counters:
jsonschema.validate(ctrs, errors.ERROR_COUNTER_DICT_SCHEMA)
@pytest.mark.parametrize('router_fqdn', ROUTERS)
def test_logical_brian_counters(router_fqdn, ifc_netconf_rpc):
doc = juniper.get_interface_info_ncrpc(router_fqdn)
......@@ -154,22 +161,60 @@ def test_logical_brian_counters(router_fqdn, ifc_netconf_rpc):
@pytest.mark.parametrize('router_fqdn', ROUTERS)
def test_physical_brian_points(router_fqdn, ifc_netconf_rpc):
def test_logical_error_counters(router_fqdn, ifc_netconf_rpc):
doc = juniper.get_interface_info_ncrpc(router_fqdn)
interfaces = juniper.physical_interface_counters(doc)
counters = brian.counters(router_fqdn=router_fqdn, interface_counters=interfaces)
for _p in map(partial(brian.ctr2point, 'bogus-measurement'), counters):
jsonschema.validate(_p, influx.INFLUX_POINT)
interfaces = juniper.logical_interface_counters(doc)
counters = errors.counters(router_fqdn=router_fqdn, interface_counters=interfaces)
for ctrs in counters:
jsonschema.validate(ctrs, errors.ERROR_COUNTER_DICT_SCHEMA)
@pytest.mark.parametrize('router_fqdn', ROUTERS)
def test_logical_brian_points(router_fqdn, ifc_netconf_rpc):
doc = juniper.get_interface_info_ncrpc(router_fqdn)
interfaces = juniper.logical_interface_counters(doc)
counters = brian.counters(router_fqdn=router_fqdn, interface_counters=interfaces)
for _p in map(partial(brian.ctr2point, 'bogus-measurement'), counters):
def test_brian_points(router_fqdn, ifc_netconf_rpc):
for _p in cli._brian_points(
router_fqdn=router_fqdn,
netconf_doc=juniper.get_interface_info_ncrpc(router_fqdn),
measurement_name='blah'):
jsonschema.validate(_p, influx.INFLUX_POINT)
assert _p['fields'] # any trivial points should already be filtered
@pytest.mark.parametrize('router_fqdn', ROUTERS)
def test_error_points(router_fqdn, ifc_netconf_rpc):
for _p in cli._error_points(
router_fqdn=router_fqdn,
netconf_doc=juniper.get_interface_info_ncrpc(router_fqdn),
measurement_name='blah'):
jsonschema.validate(_p, influx.INFLUX_POINT)
assert _p['fields'] # any trivial points should already be filtered
#
# @pytest.mark.parametrize('router_fqdn', ROUTERS)
# def test_physical_error_points(router_fqdn, ifc_netconf_rpc):
# doc = juniper.get_interface_info_ncrpc(router_fqdn)
# interfaces = juniper.physical_interface_counters(doc)
# counters = errors.counters(router_fqdn=router_fqdn, interface_counters=interfaces)
# for _p in map(partial(cli.ctr2point, 'bogus-measurement'), counters):
# jsonschema.validate(_p, influx.INFLUX_POINT)
#
#
# @pytest.mark.parametrize('router_fqdn', ROUTERS)
# def test_logical_brian_points(router_fqdn, ifc_netconf_rpc):
# doc = juniper.get_interface_info_ncrpc(router_fqdn)
# interfaces = juniper.logical_interface_counters(doc)
# counters = brian.counters(router_fqdn=router_fqdn, interface_counters=interfaces)
# for _p in map(partial(cli.ctr2point, 'bogus-measurement'), counters):
# jsonschema.validate(_p, influx.INFLUX_POINT)
#
#
# @pytest.mark.parametrize('router_fqdn', ROUTERS)
# def test_logical_error_points(router_fqdn, ifc_netconf_rpc):
# doc = juniper.get_interface_info_ncrpc(router_fqdn)
# interfaces = juniper.logical_interface_counters(doc)
# counters = errors.counters(router_fqdn=router_fqdn, interface_counters=interfaces)
# for _p in map(partial(cli.ctr2point, 'bogus-measurement'), counters):
# jsonschema.validate(_p, influx.INFLUX_POINT)
#
@pytest.fixture
def app_config_params(free_host_port):
......@@ -209,23 +254,6 @@ def app_config_params(free_host_port):
}
def test_validate_all_points(app_config_params, ifc_netconf_rpc, mocked_poller_interfaces):
with patch('brian_polling_manager.interface_stats.cli._managed_routers') as rpc:
all_routers = {_ifc['router'] for _ifc in inventory.load_interfaces(['https://abc'])}
# TODO: add srx's to test data
rpc.return_value = [_ifc for _ifc in all_routers if not _ifc.startswith('srx')]
nc_doc_map = cli.fqdn_netconf_doc_map(app_config_params)
influx_params = app_config_params['influx']['brian-counters']
for _fqdn, _ncdoc in nc_doc_map.items():
for _p in cli._brian_points(
router_fqdn=_fqdn,
netconf_doc=_ncdoc,
measurement_name=influx_params['measurement']):
jsonschema.validate(_p, influx.INFLUX_POINT)
@pytest.fixture
def brian_influx_container_params(app_config_params):
......@@ -425,23 +453,32 @@ def test_e2e(app_config_params, ifc_netconf_rpc, testenv_containers, mocked_poll
rpc.side_effect = _mocked_managed_routers
cli._main(app_config_params)
influx_config = app_config_params['influx']['brian-counters']
with contextlib.closing(influx.influx_client(influx_config)) as client:
expected_brian_fields = [
'egressOctets',
'egressPackets',
'ingressOctets',
'ingressPackets',
'egressOctetsv6',
'egressPacketsv6',
'ingressOctetsv6',
'ingressPacketsv6',
'egressErrors',
'ingressDiscards',
'ingressErrors',
]
query = f'select count(*) from {influx_config["measurement"]}'
counts = next(client.query(query).get_points(measurement=influx_config["measurement"]))
assert all(f'count_{n}' in counts for n in expected_brian_fields)
assert all(counts[f'count_{n}'] > 0 for n in expected_brian_fields)
def verify_influx_content(influx_config, expected_fields):
# for each expected field, verify that there's
# at least one point with that field populated
with contextlib.closing(influx.influx_client(influx_config)) as client:
query = f'select count(*) from {influx_config["measurement"]}'
counts = next(client.query(query).get_points(measurement=influx_config["measurement"]))
expected_count_fields = {f'count_{n}' for n in expected_fields}
assert expected_count_fields <= set(counts.keys())
assert all(counts[_f] > 0 for _f in expected_count_fields)
def _expected_fields_from_schema(schema):
assert schema == errors.ERROR_COUNTER_DICT_SCHEMA \
or brian.BRIAN_COUNTER_DICT_SCHEMA
return {
k for k in
schema['properties'].keys()
if k != 'hostname' and k != 'interface_name'
}
verify_influx_content(
influx_config=app_config_params['influx']['brian-counters'],
expected_fields=_expected_fields_from_schema(brian.BRIAN_COUNTER_DICT_SCHEMA))
verify_influx_content(
influx_config=app_config_params['influx']['error-counters'],
expected_fields=_expected_fields_from_schema(errors.ERROR_COUNTER_DICT_SCHEMA))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment