Skip to content
Snippets Groups Projects
nokia.py 9.32 KiB
import logging
import re

from ncclient import manager, xml_


logger = logging.getLogger(__name__)


ROOT_NS = 'urn:ietf:params:xml:ns:netconf:base:1.0'
NOKIA_NS = 'urn:nokia.com:sros:ns:yang:sr:conf'
NOKIA_STATE_NS = 'urn:nokia.com:sros:ns:yang:sr:state'
NS = {
    'r': ROOT_NS,
    'n': NOKIA_NS,
    's': NOKIA_STATE_NS,
}

BREAKOUT_PATTERN = re.compile(
    r'c(?P<count>\d+)-(?P<speed>\d+)(?P<unit>[a-zA-Z]+)'
)
'''
For translating the breakout string to a speed
example:
    c1-400g - This has a single 400G port, groups would be as follows:
        1 or 'count' - 1
        2 or 'speed' - 400
        3 or 'unit' - g
    c4-100g - This has four 100G ports, not all of them may be used
    groups would be as follows:
        1 or 'count' - 4
        2 or 'speed' - 100
        3 or 'unit' - g
'''
SPEED_UNITS = {
    'g': 'Gbps',
    'G': 'Gbps',
}
STATE_FILTER = '''<filter>
  <nokia-state:state xmlns:nokia-state="urn:nokia.com:sros:ns:yang:sr:state">
    <nokia-state:port>
        <nokia-state:port-id/>
        <nokia-state:down-reason/>
        <nokia-state:oper-state/>
        <nokia-state:port-state/>
        <nokia-state:type/>
        <nokia-state:if-index/>
        <nokia-state:ethernet>
            <nokia-state:oper-speed/>
        </nokia-state:ethernet>
    </nokia-state:port>
    <nokia-state:lag>
        <nokia-state:lag-name/>
        <nokia-state:oper-state/>
        <nokia-state:if-index/>
        <nokia-state:number-port-up/>
        <nokia-state:port>
            <nokia-state:port-id/>
        </nokia-state:port>
    </nokia-state:lag>
    <nokia-state:router>
      <nokia-state:interface>
        <nokia-state:interface-name/>
        <nokia-state:if-index/>
        <nokia-state:oper-state/>
        <nokia-state:protocol/>
        <nokia-state:ipv4>
            <nokia-state:oper-state/>
            <nokia-state:down-reason/>
            <nokia-state:primary>
                <nokia-state:oper-address/>
            </nokia-state:primary>
            <nokia-state:secondary>
                <nokia-state:address/>
                <nokia-state:oper-address/>
            </nokia-state:secondary>
        </nokia-state:ipv4>
        <nokia-state:ipv6>
            <nokia-state:oper-state/>
            <nokia-state:down-reason/>
            <nokia-state:address>
                <nokia-state:ipv6-address/>
                <nokia-state:address-state/>
                <nokia-state:oper-address/>
                <nokia-state:primary-preferred/>
            </nokia-state:address>
        </nokia-state:ipv6>
      </nokia-state:interface>
    </nokia-state:router>
  </nokia-state:state>
</filter>'''


def load_docs(hostname, ssh_params, hostkey_verify=False):
    logger.info(f'capturing netconf data for "{hostname}"')
    params = {
        'host': hostname,
        'hostkey_verify': hostkey_verify,
        'device_params': {'name': 'sros'},
        'nc_params': {'capabilities': ['urn:nokia.com:nc:pysros:pc']},
        'timeout': 60
    }
    params.update(ssh_params)
    with manager.connect(**params) as m:
        # when adding the device_params to the connect call, the object returned from
        # manager.get() and manager.get_config is a ncclient.xml_.NCElement object,
        # which does not provide the data property this is a workaround to get the data
        # property. The alternative would be to access the private _NCElement__result
        # which is of type ncclient.operations.retrieve.GetReply and has a data property
        running = xml_.to_ele(m.get_config(source='running').data_xml).getchildren()[0]
        state = xml_.to_ele(m.get(filter=STATE_FILTER).data_xml).getchildren()[0]
    return running, state


def get_ports_state(state_doc):
    def _port_info(e):
        pi = {
            'port-id': e.find('s:port-id', namespaces=NS).text,
            'oper-state': e.find('s:oper-state', namespaces=NS).text,
            'port-state': e.find('s:port-state', namespaces=NS).text,
            'type': e.find('s:type', namespaces=NS).text,
            'if-index': e.find('s:if-index', namespaces=NS).text,
        }
        down_reason = e.find('s:down-reason', namespaces=NS)
        if down_reason is not None:
            pi['down-reason'] = down_reason.text
        return pi
    ports = state_doc.findall('./s:state/s:port', namespaces=NS)
    for port in ports:
        yield _port_info(port)


def get_lags_state(state_doc):
    def _lag_info(e):
        _name = e.find('s:lag-name', namespaces=NS).text
        _state = e.find('s:oper-state', namespaces=NS).text
        _if_index = e.find('s:if-index', namespaces=NS).text
        _number_port_up = e.find('s:number-port-up', namespaces=NS).text
        ports = [p.find('s:port-id', namespaces=NS).text for p in e.findall('s:port', namespaces=NS)]
        return {
            'name': _name,
            'oper-state': _state,
            'if-index': _if_index,
            'number-port-up': _number_port_up,
            'ports': ports,
        }
    lags = state_doc.findall('./s:state/s:lag', namespaces=NS)
    for lag in lags:
        yield _lag_info(lag)


def get_interfaces_state(state_doc):
    interfaces = state_doc.findall('./s:state/s:router/s:interface', namespaces=NS)
    for interface_ in interfaces:
        details = {
            "interface-name": interface_.find('s:interface-name', namespaces=NS).text,
            "if-index": int(interface_.find('s:if-index', namespaces=NS).text),
            "oper-state": interface_.find('s:oper-state', namespaces=NS).text,
        }
        ipv4 = interface_.find('s:ipv4', namespaces=NS)
        if ipv4 is not None:
            details['ipv4-state'] = ipv4.find('s:oper-state', namespaces=NS).text
        ipv6 = interface_.find('s:ipv6', namespaces=NS)
        if ipv6 is not None:
            details['ipv6-state'] = ipv6.find('s:oper-state', namespaces=NS).text
        yield details


def get_ports_config(netconf_config):
    def _port_info(e):
        pi = {
            'port-id': e.find('n:port-id', namespaces=NS).text,
            'admin-state': e.find('n:admin-state', namespaces=NS).text,
        }
        description = e.find('n:description', namespaces=NS)

        pi['description'] = description.text if description is not None else ''
        breakout = e.find('./n:connector/n:breakout', namespaces=NS)
        if breakout is not None:
            breakout = breakout.text
            breakout_match = BREAKOUT_PATTERN.match(breakout)
            pi['breakout'] = breakout
            pi['speed'] = int(breakout_match.group('speed'))
            pi['speed-unit'] = SPEED_UNITS.get(
                breakout_match.group('unit'), 'Unknown')
        return pi

    # making the assumption that the breakout ports are listed directly before their
    # child ports
    ports = netconf_config.findall('./n:configure/n:port', namespaces=NS)
    current_parent_port = None
    for port in ports:
        port_info = _port_info(port)
        if 'breakout' in port_info:
            current_parent_port = port_info
        elif current_parent_port is not None and port_info['port-id'].startswith(
                current_parent_port['port-id']):
            port_info['speed'] = current_parent_port['speed']
            port_info['speed-unit'] = current_parent_port['speed-unit']

        yield port_info


def get_lags_config(netconf_config):
    def _lag_info(e):
        _name = e.find('./n:lag-name', namespaces=NS).text

        def get_port(p):
            return p.find('./n:port-id', namespaces=NS).text

        port_elements = e.findall('./n:port', namespaces=NS)
        ports = [get_port(p) for p in port_elements]
        admin_state = e.find('./n:admin-state', namespaces=NS).text
        description_e = e.find('n:description', namespaces=NS)
        ifc = {
            'name': _name,
            'description': (
                description_e.text if description_e is not None else ''
            ),
            'admin-state': admin_state,
            'ports': ports,
        }
        return ifc

    lags = netconf_config.findall('./n:configure/n:lag', namespaces=NS)
    for lag in lags:
        yield _lag_info(lag)


def get_interfaces_config(netconf_config):

    def _get_ip_address(e):
        for details_parent in e:
            # example element
            # <primary> # details_parent - not always primary
            #     <address>62.40.119.9</address>
            #     <prefix-length>32</prefix-length>
            # </primary>
            address = details_parent[0].text
            prefix_length = details_parent[1].text
            ip_string = f'{address}/{prefix_length}'
            yield ip_string

    interfaces = netconf_config.findall(
        'n:configure/n:router/n:interface', namespaces=NS)
    for interface in interfaces:
        details = {
            "interface-name": interface.find('n:interface-name', namespaces=NS).text,
            "ipv4": [],
            "ipv6": [],
        }
        description = interface.find('n:description', namespaces=NS)
        details["description"] = description.text if description is not None else ""

        admin_state = interface.find('n:admin-state', namespaces=NS)
        if admin_state is not None:
            details["admin-state"] = admin_state.text

        for element in interface.xpath('n:ipv4', namespaces=NS):
            details["ipv4"].extend(_get_ip_address(element))

        for element in interface.xpath('n:ipv6', namespaces=NS):
            details["ipv6"].extend(_get_ip_address(element))

        yield details