Skip to content
Snippets Groups Projects
common.py 9.33 KiB
import contextlib
import logging
from functools import partial
from typing import Set
import ncclient.manager
from lxml import etree

logger = logging.getLogger(__name__)

DEFAULT_NETCONF_PORT = 830


BRIAN_POINT_FIELDS_SCHEMA = {
    "$schema": "https://json-schema.org/draft/2020-12/schema",
    "type": "object",
    "properties": {
        "egressOctets": {"type": "number"},
        "egressPackets": {"type": "number"},
        "ingressOctets": {"type": "number"},
        "ingressPackets": {"type": "number"},
        "egressOctetsv6": {"type": "number"},
        "egressPacketsv6": {"type": "number"},
        "ingressOctetsv6": {"type": "number"},
        "ingressPacketsv6": {"type": "number"},
        "egressErrors": {"type": "number"},
        "ingressDiscards": {"type": "number"},
        "ingressErrors": {"type": "number"},
    },
    "required": [
        "egressOctets",
        "egressPackets",
        "ingressOctets",
        "ingressPackets",
    ],
    "additionalProperties": False,
}


ERROR_POINT_FIELDS_SCHEMA = {
    "$schema": "https://json-schema.org/draft/2020-12/schema",
    "type": "object",
    "properties": {
        "bit_error_seconds": {"type": "integer"},
        "errored_blocks_seconds": {"type": "integer"},
        "input_crc_errors": {"type": "integer"},
        "input_discards": {"type": "integer"},
        "input_drops": {"type": "integer"},
        "input_fifo_errors": {"type": "integer"},
        "input_framing_errors": {"type": "integer"},
        "input_resource_errors": {"type": "integer"},
        "input_total_errors": {"type": "integer"},
        "output_collisions": {"type": "integer"},
        "output_crc_errors": {"type": "integer"},
        "output_discards": {"type": "integer"},
        "output_drops": {"type": "integer"},
        "output_fifo_errors": {"type": "integer"},
        "output_resource_errors": {"type": "integer"},
        "output_total_errors": {"type": "integer"},
        "oper_state_change_count": {"type": "integer"},
        "crc_align_errors": {"type": "integer"},
        "fcs_errors": {"type": "integer"},
    },
    "additionalProperties": False,
}

INTERFACE_COUNTER_SCHEMA = {
    "$schema": "https://json-schema.org/draft/2020-12/schema",
    "definitions": {
        "brian-counters": BRIAN_POINT_FIELDS_SCHEMA,
        "error-counters": ERROR_POINT_FIELDS_SCHEMA,
    },
    "type": "object",
    "properties": {
        "name": {"type": "string"},
        "brian": {"$ref": "#/definitions/brian-counters"},
        "errors": {"$ref": "#/definitions/error-counters"},
    },
    "required": ["name", "brian"],
    "additionalProperties": False,
}


def _counters_to_point(
    interface, router_fqdn, timestamp, measurement, counters_name, required=False
):
    """
    :param interface: an instance of INTERFACE_COUNTER_SCHEMA
    :param router_fqdn: ``hostname`` tag to be set in the generated points
    :param measurement: the measurement where the point will be written
    :param counter_name: the desired set of counters (either ``brian`` or ``errors``)
    :return: a brian_polling_manager.influx.INFLUX_POINT object
    """
    counters = interface.get(counters_name)
    if not counters:
        if required:
            raise ValueError(f"No data available for point '{counters_name}'")
        return None

    return {
        "time": timestamp.strftime("%Y-%m-%dT%H:%M:%SZ"),
        "measurement": measurement,
        "tags": {"hostname": router_fqdn, "interface_name": interface["name"]},
        "fields": counters,
    }


def brian_points(router_fqdn, interfaces, timestamp, measurement_name):
    """
    returns an interable of points that can be written to influxdb

    :param router_fqdn: 'hostname' tag to be set in the generated points
    :param interfaces: an iterable of INTERFACE_COUNTER_SCHEMA
    :param timestamp: timestamp to put in the generated points
    :param measurement_name: measurement name to put in the generated points
    :return:
    """
    point_creator = partial(
        _counters_to_point,
        router_fqdn=router_fqdn,
        timestamp=timestamp,
        measurement=measurement_name,
        counters_name="brian",
        required=True,
    )
    return map(point_creator, interfaces)


def error_points(router_fqdn, interfaces, timestamp, measurement_name):
    """
    returns an interable of points that can be written to influxdb

    :param router_fqdn: 'hostname' tag to be set in the generated points
    :param interfaces: an iterable of INTERFACE_COUNTER_SCHEMA
    :param timestamp: timestamp to put in the generated points
    :param measurement_name: measurement name to put in the generated points
    :return:
    """
    point_creator = partial(
        _counters_to_point,
        router_fqdn=router_fqdn,
        timestamp=timestamp,
        measurement=measurement_name,
        counters_name="errors",
        required=False,
    )
    return filter(lambda r: r is not None, map(point_creator, interfaces))


class ParsingError(ValueError):
    pass


def extract_all_counters(iterable, struct):
    """
    extract counters for all interfaces that can be parsed. Skip interfaces that can't
    be parsed because they lack required fields

    :param iterable: an xpath iterable for interface Element
    :param struct:  one of
        juniper.PHYSICAL_INTERFACE_COUNTERS
        juniper.LOGICAL_INTERFACE_COUNTERS
        nokia.INTERFACE_COUNTERS
        nokia.INTERFACE_COUNTERS_ALT
    """
    for ifc in iterable:
        try:
            result = parse_interface_xml(ifc, struct)
        except ParsingError:
            continue
        yield result


def extract_selected_counters(iterable, struct, interfaces: Set[str]):
    """
    Extract counters for interfaces in ``interfaces``. Logs an error if an interface
    can't be parsed because it lacks missing required fields.

    :param iterable: an xpath iterable for interface Element
    :param struct: one of
        juniper.PHYSICAL_INTERFACE_COUNTERS
        juniper.LOGICAL_INTERFACE_COUNTERS
        nokia.INTERFACE_COUNTERS
        nokia.INTERFACE_COUNTERS_ALT
    :param interfaces: a set of interface names. This may be modified by this function
        in place
    """
    for ifc in iterable:
        if not interfaces:
            return

        name = parse_interface_xml(ifc, struct["name"])
        if name not in interfaces:
            continue
        try:
            result = parse_interface_xml(ifc, struct)
        except ParsingError as e:
            logger.error(f"Error parsing netconf for interface {name}: {e}")
            continue
        finally:
            interfaces.remove(name)
        yield result


def parse_interface_xml(node, struct: dict, defaults=None):
    """
    parses an lxml.etree.Element xml tree and returns either a dictionary or a
    primitive (a single value). ``struct`` is the shape of the output. For a primitive
    the struct can have the following keys:
      * ``path`` an xpath expression to lookup a value. Alternatively a list of xpath
        expressions may be given, in which case the first found value will be returned
      * ``transform`` a function that takes a raw value (str) and returns a transformed
        value, (eg. ``int`` to cast to integer)
      * ``required`` boolean. Indicate whether the field must be present. Raises
        ParsingError if the path is not found in the node. If required is set to False,
        ``None`` may be returned

    To parse multiple values, ``struct`` may be a nested dict of primitive structs. In
    that case the output follows the shape of ``struct`` until a primitives struct is
    encountered, which it will resolve into a primitve value. (determined by whether
    ``path`` is a key of the struct).

    At any level a ``__defaults__`` key may be given that will be used as defaults for
    any lower level primitive structs.

    see the following structs for examples
        juniper.PHYSICAL_INTERFACE_COUNTERS
        juniper.LOGICAL_INTERFACE_COUNTERS
        nokia.INTERFACE_COUNTERS
        nokia.INTERFACE_COUNTERS_ALT

    :param node: an lxml.etree.Element
    :param struct: A (nested) dict / primitive struct
    :param defaults:
    :return:
    """
    defaults = struct.get("__defaults__", defaults) or {}

    if "path" in struct:
        return _read_counter(node, **{**defaults, **struct})

    result = {}
    for key, val in struct.items():
        if key == "__defaults__":
            continue
        parsed = parse_interface_xml(node, val, defaults)
        if parsed is None:
            continue
        result[key] = parsed
    return result or None


def _read_counter(node, path, transform, required=False):
    if isinstance(path, str):
        path = [path]
    for p in path:
        _elems = node.xpath(p + "/text()")
        if _elems:
            if len(_elems) > 1:
                logger.warning(f"found more than one element {p}")
            return transform(_elems[0])

    if required:
        raise ParsingError(f"required path {p} not found")


@contextlib.contextmanager
def netconf_connect(hostname: str, ssh_params: dict, **kwargs):
    params = {"host": hostname, **kwargs, **ssh_params}
    if "port" not in params:
        params["port"] = DEFAULT_NETCONF_PORT
    conn = ncclient.manager.connect(**params)
    yield conn


def remove_xml_namespaces(xml: str):
    etree_doc = etree.fromstring(xml)
    for elem in etree_doc.iter():
        elem.tag = etree.QName(elem).localname
    etree.cleanup_namespaces(etree_doc)
    return etree_doc