-
Pelle Koster authoredPelle Koster authored
nokia.py 9.32 KiB
import logging
import re
from ncclient import manager, xml_
logger = logging.getLogger(__name__)
ROOT_NS = 'urn:ietf:params:xml:ns:netconf:base:1.0'
NOKIA_NS = 'urn:nokia.com:sros:ns:yang:sr:conf'
NOKIA_STATE_NS = 'urn:nokia.com:sros:ns:yang:sr:state'
NS = {
'r': ROOT_NS,
'n': NOKIA_NS,
's': NOKIA_STATE_NS,
}
BREAKOUT_PATTERN = re.compile(
r'c(?P<count>\d+)-(?P<speed>\d+)(?P<unit>[a-zA-Z]+)'
)
'''
For translating the breakout string to a speed
example:
c1-400g - This has a single 400G port, groups would be as follows:
1 or 'count' - 1
2 or 'speed' - 400
3 or 'unit' - g
c4-100g - This has four 100G ports, not all of them may be used
groups would be as follows:
1 or 'count' - 4
2 or 'speed' - 100
3 or 'unit' - g
'''
SPEED_UNITS = {
'g': 'Gbps',
'G': 'Gbps',
}
STATE_FILTER = '''<filter>
<nokia-state:state xmlns:nokia-state="urn:nokia.com:sros:ns:yang:sr:state">
<nokia-state:port>
<nokia-state:port-id/>
<nokia-state:down-reason/>
<nokia-state:oper-state/>
<nokia-state:port-state/>
<nokia-state:type/>
<nokia-state:if-index/>
<nokia-state:ethernet>
<nokia-state:oper-speed/>
</nokia-state:ethernet>
</nokia-state:port>
<nokia-state:lag>
<nokia-state:lag-name/>
<nokia-state:oper-state/>
<nokia-state:if-index/>
<nokia-state:number-port-up/>
<nokia-state:port>
<nokia-state:port-id/>
</nokia-state:port>
</nokia-state:lag>
<nokia-state:router>
<nokia-state:interface>
<nokia-state:interface-name/>
<nokia-state:if-index/>
<nokia-state:oper-state/>
<nokia-state:protocol/>
<nokia-state:ipv4>
<nokia-state:oper-state/>
<nokia-state:down-reason/>
<nokia-state:primary>
<nokia-state:oper-address/>
</nokia-state:primary>
<nokia-state:secondary>
<nokia-state:address/>
<nokia-state:oper-address/>
</nokia-state:secondary>
</nokia-state:ipv4>
<nokia-state:ipv6>
<nokia-state:oper-state/>
<nokia-state:down-reason/>
<nokia-state:address>
<nokia-state:ipv6-address/>
<nokia-state:address-state/>
<nokia-state:oper-address/>
<nokia-state:primary-preferred/>
</nokia-state:address>
</nokia-state:ipv6>
</nokia-state:interface>
</nokia-state:router>
</nokia-state:state>
</filter>'''
def load_docs(hostname, ssh_params, hostkey_verify=False):
logger.info(f'capturing netconf data for "{hostname}"')
params = {
'host': hostname,
'hostkey_verify': hostkey_verify,
'device_params': {'name': 'sros'},
'nc_params': {'capabilities': ['urn:nokia.com:nc:pysros:pc']},
'timeout': 60
}
params.update(ssh_params)
with manager.connect(**params) as m:
# when adding the device_params to the connect call, the object returned from
# manager.get() and manager.get_config is a ncclient.xml_.NCElement object,
# which does not provide the data property this is a workaround to get the data
# property. The alternative would be to access the private _NCElement__result
# which is of type ncclient.operations.retrieve.GetReply and has a data property
running = xml_.to_ele(m.get_config(source='running').data_xml).getchildren()[0]
state = xml_.to_ele(m.get(filter=STATE_FILTER).data_xml).getchildren()[0]
return running, state
def get_ports_state(state_doc):
def _port_info(e):
pi = {
'port-id': e.find('s:port-id', namespaces=NS).text,
'oper-state': e.find('s:oper-state', namespaces=NS).text,
'port-state': e.find('s:port-state', namespaces=NS).text,
'type': e.find('s:type', namespaces=NS).text,
'if-index': e.find('s:if-index', namespaces=NS).text,
}
down_reason = e.find('s:down-reason', namespaces=NS)
if down_reason is not None:
pi['down-reason'] = down_reason.text
return pi
ports = state_doc.findall('./s:state/s:port', namespaces=NS)
for port in ports:
yield _port_info(port)
def get_lags_state(state_doc):
def _lag_info(e):
_name = e.find('s:lag-name', namespaces=NS).text
_state = e.find('s:oper-state', namespaces=NS).text
_if_index = e.find('s:if-index', namespaces=NS).text
_number_port_up = e.find('s:number-port-up', namespaces=NS).text
ports = [p.find('s:port-id', namespaces=NS).text for p in e.findall('s:port', namespaces=NS)]
return {
'name': _name,
'oper-state': _state,
'if-index': _if_index,
'number-port-up': _number_port_up,
'ports': ports,
}
lags = state_doc.findall('./s:state/s:lag', namespaces=NS)
for lag in lags:
yield _lag_info(lag)
def get_interfaces_state(state_doc):
interfaces = state_doc.findall('./s:state/s:router/s:interface', namespaces=NS)
for interface_ in interfaces:
details = {
"interface-name": interface_.find('s:interface-name', namespaces=NS).text,
"if-index": int(interface_.find('s:if-index', namespaces=NS).text),
"oper-state": interface_.find('s:oper-state', namespaces=NS).text,
}
ipv4 = interface_.find('s:ipv4', namespaces=NS)
if ipv4 is not None:
details['ipv4-state'] = ipv4.find('s:oper-state', namespaces=NS).text
ipv6 = interface_.find('s:ipv6', namespaces=NS)
if ipv6 is not None:
details['ipv6-state'] = ipv6.find('s:oper-state', namespaces=NS).text
yield details
def get_ports_config(netconf_config):
def _port_info(e):
pi = {
'port-id': e.find('n:port-id', namespaces=NS).text,
'admin-state': e.find('n:admin-state', namespaces=NS).text,
}
description = e.find('n:description', namespaces=NS)
pi['description'] = description.text if description is not None else ''
breakout = e.find('./n:connector/n:breakout', namespaces=NS)
if breakout is not None:
breakout = breakout.text
breakout_match = BREAKOUT_PATTERN.match(breakout)
pi['breakout'] = breakout
pi['speed'] = int(breakout_match.group('speed'))
pi['speed-unit'] = SPEED_UNITS.get(
breakout_match.group('unit'), 'Unknown')
return pi
# making the assumption that the breakout ports are listed directly before their
# child ports
ports = netconf_config.findall('./n:configure/n:port', namespaces=NS)
current_parent_port = None
for port in ports:
port_info = _port_info(port)
if 'breakout' in port_info:
current_parent_port = port_info
elif current_parent_port is not None and port_info['port-id'].startswith(
current_parent_port['port-id']):
port_info['speed'] = current_parent_port['speed']
port_info['speed-unit'] = current_parent_port['speed-unit']
yield port_info
def get_lags_config(netconf_config):
def _lag_info(e):
_name = e.find('./n:lag-name', namespaces=NS).text
def get_port(p):
return p.find('./n:port-id', namespaces=NS).text
port_elements = e.findall('./n:port', namespaces=NS)
ports = [get_port(p) for p in port_elements]
admin_state = e.find('./n:admin-state', namespaces=NS).text
description_e = e.find('n:description', namespaces=NS)
ifc = {
'name': _name,
'description': (
description_e.text if description_e is not None else ''
),
'admin-state': admin_state,
'ports': ports,
}
return ifc
lags = netconf_config.findall('./n:configure/n:lag', namespaces=NS)
for lag in lags:
yield _lag_info(lag)
def get_interfaces_config(netconf_config):
def _get_ip_address(e):
for details_parent in e:
# example element
# <primary> # details_parent - not always primary
# <address>62.40.119.9</address>
# <prefix-length>32</prefix-length>
# </primary>
address = details_parent[0].text
prefix_length = details_parent[1].text
ip_string = f'{address}/{prefix_length}'
yield ip_string
interfaces = netconf_config.findall(
'n:configure/n:router/n:interface', namespaces=NS)
for interface in interfaces:
details = {
"interface-name": interface.find('n:interface-name', namespaces=NS).text,
"ipv4": [],
"ipv6": [],
}
description = interface.find('n:description', namespaces=NS)
details["description"] = description.text if description is not None else ""
admin_state = interface.find('n:admin-state', namespaces=NS)
if admin_state is not None:
details["admin-state"] = admin_state.text
for element in interface.xpath('n:ipv4', namespaces=NS):
details["ipv4"].extend(_get_ip_address(element))
for element in interface.xpath('n:ipv6', namespaces=NS):
details["ipv6"].extend(_get_ip_address(element))
yield details