Skip to content
Snippets Groups Projects
Select Git revision
  • 88d49aaa9df505feb9d6c916f1e9370e012d587b
  • develop default
  • master protected
  • feature/add-oidc-support
  • 0.17
  • 0.16
  • 0.15
  • 0.14
  • 0.13
  • 0.12
  • 0.11
  • 0.10
  • 0.9
  • 0.8
  • 0.6
  • 0.5
  • 0.4
  • 0.3
  • 0.2
  • 0.1
20 results

__init__.py

Blame
  • nokia.py 5.37 KiB
    import logging
    import pathlib
    from typing import Dict, Optional, Sequence
    from brian_polling_manager.interface_stats.vendors.common import (
        extract_all_counters,
        extract_selected_counters,
        netconf_connect,
        remove_xml_namespaces,
    )
    from lxml import etree
    
    logger = logging.getLogger(__name__)
    
    
    NCCLIENT_PARAMS = {
        "device_params": {"name": "sros"},
        "nc_params": {"capabilities": ["urn:nokia.com:nc:pysros:pc"]},
        "timeout": 60,
    }
    
    INTERFACE_COUNTERS = {
        "__defaults__": {"transform": int, "required": False},
        "name": {
            "path": ["./lag-name", "./port-id"],
            "transform": lambda v: str(v).strip(),
        },
        "brian": {
            "ingressOctets": {"path": "./statistics/in-octets", "required": True},
            "ingressPackets": {
                "path": "./statistics/in-packets",
                "required": True,
            },
            "egressOctets": {"path": "./statistics/out-octets", "required": True},
            "egressPackets": {"path": "./statistics/out-packets", "required": True},
            "ingressErrors": {"path": "./statistics/in-errors"},
            "egressErrors": {"path": "./statistics/out-errors"},
            "ingressDiscards": {"path": "./statistics/in-discards"},
        },
        "errors": {
            "output_total_errors": {"path": "./statistics/out-errors"},
            "input_total_errors": {"path": "./statistics/in-errors"},
            "input_discards": {"path": "./statistics/in-discards"},
            "output_discards": {"path": "./statistics/out-discards"},
        },
    }
    
    INTERFACE_COUNTERS_ALT = {
        "__defaults__": {"transform": int, "required": False},
        "name": {
            "path": "./interface-name",
            "transform": lambda v: str(v).strip(),
        },
        "brian": {
            "ingressOctets": {"path": "./statistics/ip/in-octets", "required": True},
            "ingressPackets": {
                "path": "./statistics/ip/in-packets",
                "required": True,
            },
            "egressOctets": {"path": "./statistics/ip/out-octets", "required": True},
            "egressPackets": {"path": "./statistics/ip/out-packets", "required": True},
            "ingressOctetsv6": {"path": "./ipv6/statistics/in-octets"},
            "ingressPacketsv6": {"path": "./ipv6/statistics/in-packets"},
            "egressOctetsv6": {"path": "./ipv6/statistics/out-octets"},
            "egressPacketsv6": {"path": "./ipv6/statistics/out-packets"},
        },
        "errors": {"output_discards": {"path": "./statistics/ip/out-discard-packets"}},
    }
    
    
    def get_netconf_interface_info(
        router_name: str, ssh_params: dict
    ) -> Dict[str, etree.ElementBase]:
        """
        :param router_name: the router to poll
        :param ssh_params: ssh params for connecting to the router
    
        :returns: a dictionary  with doctype (``port``, ``lag`` & ``router-interface`` ) as
            keys and their respective netconf document as values
        """
    
        query_path = {
            "port": ["port", "statistics"],
            "lag": ["lag", "statistics"],
            "router-interface": ["router", "interface"],
        }
        with netconf_connect(
            hostname=router_name, ssh_params=ssh_params, **NCCLIENT_PARAMS
        ) as conn:
            return {
                doctype: remove_xml_namespaces(query(conn, *qpath).tostring.decode("utf-8"))
                for doctype, qpath in query_path.items()
            }
    
    
    def query(connection, *path):
        STATE_NS = "urn:nokia.com:sros:ns:yang:sr:state"
    
        root = etree.Element("filter")
        sub_elem = etree.SubElement(
            root, f"{{{STATE_NS}}}state", nsmap={"nokia-state": STATE_NS}
        )
        for p in path:
            sub_elem = etree.SubElement(sub_elem, f"{{{STATE_NS}}}{p}")
    
        return connection.get(filter=root)
    
    
    def get_netconf_interface_info_from_source_dir(router_name: str, source_dir: str):
        def read_doc_or_raise(name: str):
            file = pathlib.Path(source_dir) / name
            if not file.is_file():
                raise ValueError(f"file {file} is not a valid file")
    
            return etree.fromstring(file.read_text())
    
        return {
            key: read_doc_or_raise(f"{router_name}-{key}s.xml")
            for key in ["port", "lag", "router-interface"]
        }
    
    
    def _port_xml(state_doc: etree.Element):
        return state_doc.xpath("//data/state/port")
    
    
    def _lag_xml(state_doc: etree.Element):
        return state_doc.xpath("//data/state/lag")
    
    
    def _router_interface_xml(state_doc: etree.Element):
        return state_doc.xpath("//data/state/router/interface")
    
    
    def interface_counters(
        raw_counter_docs: dict, interfaces: Optional[Sequence[str]] = None
    ):
        """
        :param raw_counter_docs: output of a call to ``get_netconf_interface_info``
        :pararm interfaces: a sequence of interfaces for which to retrieve counters.
            ``None`` means return counters for all interfaces
        :return: iterable of interface counters
        """
        doc_info = {
            "port": (INTERFACE_COUNTERS, _port_xml),
            "lag": (INTERFACE_COUNTERS, _lag_xml),
            "router-interface": (INTERFACE_COUNTERS_ALT, _router_interface_xml),
        }
    
        if interfaces is None:
            for doctype, doc in raw_counter_docs.items():
                struct, xpath = doc_info[doctype]
                yield from extract_all_counters(xpath(doc), struct)
            return
    
        remaining = set(interfaces)
        for doctype, doc in raw_counter_docs.items():
            struct, xpath = doc_info[doctype]
            yield from extract_selected_counters(xpath(doc), struct, interfaces=remaining)
            if not remaining:
                return
    
        for ifc in remaining:
            logger.error(f"Interface {ifc} was not found on router")