-
Robert Latta authoredRobert Latta authored
nokia.py 12.08 KiB
import ipaddress
import logging
import re
from functools import lru_cache
from lxml import etree
from ncclient import manager, xml_
from inventory_provider.tasks.common import asn_to_int
logger = logging.getLogger(__name__)
BREAKOUT_PATTERN = re.compile(
r'c(?P<count>\d+)-(?P<speed>\d+)(?P<unit>[a-zA-Z]+)'
)
'''
For translating the breakout string to a speed
example:
c1-400g - This has a single 400G port, groups would be as follows:
1 or 'count' - 1
2 or 'speed' - 400
3 or 'unit' - g
c4-100g - This has four 100G ports, not all of them may be used
groups would be as follows:
1 or 'count' - 4
2 or 'speed' - 100
3 or 'unit' - g
'''
SPEED_UNITS = {
'g': 'Gbps',
'G': 'Gbps',
}
STATE_FILTER = '''<filter>
<nokia-state:state xmlns:nokia-state="urn:nokia.com:sros:ns:yang:sr:state">
<nokia-state:port>
<nokia-state:port-id/>
<nokia-state:down-reason/>
<nokia-state:oper-state/>
<nokia-state:port-state/>
<nokia-state:type/>
<nokia-state:if-index/>
<nokia-state:ethernet>
<nokia-state:oper-speed/>
</nokia-state:ethernet>
</nokia-state:port>
<nokia-state:lag>
<nokia-state:lag-name/>
<nokia-state:oper-state/>
<nokia-state:if-index/>
<nokia-state:number-port-up/>
<nokia-state:port>
<nokia-state:port-id/>
</nokia-state:port>
</nokia-state:lag>
<nokia-state:router>
<nokia-state:interface>
<nokia-state:interface-name/>
<nokia-state:if-index/>
<nokia-state:oper-state/>
<nokia-state:protocol/>
<nokia-state:ipv4>
<nokia-state:oper-state/>
<nokia-state:down-reason/>
<nokia-state:primary>
<nokia-state:oper-address/>
</nokia-state:primary>
<nokia-state:secondary>
<nokia-state:address/>
<nokia-state:oper-address/>
</nokia-state:secondary>
</nokia-state:ipv4>
<nokia-state:ipv6>
<nokia-state:oper-state/>
<nokia-state:down-reason/>
<nokia-state:address>
<nokia-state:ipv6-address/>
<nokia-state:address-state/>
<nokia-state:oper-address/>
<nokia-state:primary-preferred/>
</nokia-state:address>
</nokia-state:ipv6>
</nokia-state:interface>
</nokia-state:router>
</nokia-state:state>
</filter>'''
def remove_xml_namespaces(etree_doc):
for elem in etree_doc.iter():
elem.tag = etree.QName(elem).localname
etree.cleanup_namespaces(etree_doc)
return etree_doc
def _get_admin_state_from_element(element):
admin_state_element = element.find('admin-state')
return admin_state_element.text if admin_state_element is not None else 'enable'
def load_docs(hostname, ssh_params):
"""
Load the running and state docs for the given hostname
:param hostname: str
:param ssh_params: dict - parameters to pass to the ncclient manager.connect call, see https://github.com/ncclient/ncclient/blob/master/ncclient/transport/ssh.py#L156 # noqa
:return: tuple of xml.etree.ElementTree.Element - running configuration, state data
"""
logger.info(f'capturing netconf data for "{hostname}"')
params = {
'host': hostname,
'device_params': {'name': 'sros'},
'nc_params': {'capabilities': ['urn:nokia.com:nc:pysros:pc']},
'timeout': 60
}
params.update(ssh_params)
with manager.connect(**params) as m:
# when adding the device_params to the connect call, the object returned from
# manager.get() and manager.get_config is a ncclient.xml_.NCElement object,
# which does not provide the data property this is a workaround to get the data
# property. The alternative would be to access the private _NCElement__result
# which is of type ncclient.operations.retrieve.GetReply and has a data property
running = xml_.to_ele(m.get_config(source='running').data_xml).getchildren()[0]
state = xml_.to_ele(m.get(filter=STATE_FILTER).data_xml).getchildren()[0]
return remove_xml_namespaces(running), remove_xml_namespaces(state)
def get_ports_state(state_doc):
def _port_info(e):
pi = {
'port-id': e.find('port-id').text,
'oper-state': e.find('oper-state').text,
'port-state': e.find('port-state').text,
'type': e.find('type').text,
'if-index': e.find('if-index').text,
}
down_reason = e.find('down-reason')
if down_reason is not None:
pi['down-reason'] = down_reason.text
return pi
for port in state_doc.xpath('./state/port'):
yield _port_info(port)
def get_lags_state(state_doc):
def _lag_info(e):
_name = e.find('lag-name').text
_state = e.find('oper-state').text
_if_index = e.find('if-index').text
_number_port_up = e.find('number-port-up').text
ports = [p.find('port-id').text for p in e.findall('port')]
return {
'name': _name,
'oper-state': _state,
'if-index': _if_index,
'number-port-up': _number_port_up,
'ports': ports,
}
for lag in state_doc.findall('./state/lag'):
yield _lag_info(lag)
def get_interfaces_state(state_doc):
for interface_ in state_doc.findall('./state/router/interface'):
details = {
"interface-name": interface_.find('interface-name').text,
"if-index": int(interface_.find('if-index').text),
"oper-state": interface_.find('oper-state').text,
}
ipv4 = interface_.find('ipv4')
if ipv4 is not None:
details['ipv4-state'] = ipv4.find('oper-state').text
ipv6 = interface_.find('ipv6')
if ipv6 is not None:
details['ipv6-state'] = ipv6.find('oper-state').text
yield details
@lru_cache
def get_pxc_ports(netconf_config):
# these ports will be ignored for the purposes of the update
pxc_ports = set()
for port in netconf_config.findall('./configure/port-xc/pxc'):
pxc_ports.add(port.find('port-id').text)
for port in netconf_config.findall('./configure/port'):
port_id = port.find('port-id').text
if port_id.startswith('pxc'):
pxc_ports.add(port_id)
return pxc_ports
def get_ports_config(netconf_config):
def _port_info(e):
pi = {
'port-id': e.find('port-id').text,
'admin-state': _get_admin_state_from_element(e)
}
description = e.find('description')
pi['description'] = description.text if description is not None else ''
breakout = e.find('./connector/breakout')
if breakout is not None:
breakout = breakout.text
breakout_match = BREAKOUT_PATTERN.match(breakout)
pi['breakout'] = breakout
pi['speed'] = int(breakout_match.group('speed'))
pi['speed-unit'] = SPEED_UNITS.get(
breakout_match.group('unit'), 'Unknown')
return pi
pxc_ports = get_pxc_ports(netconf_config)
# making the assumption that the breakout ports are listed directly before their
current_parent_port = None
# child ports
for port in netconf_config.findall('./configure/port'):
port_id = port.find('port-id').text
if port_id in pxc_ports:
continue
port_info = _port_info(port)
if 'breakout' in port_info:
current_parent_port = port_info
elif current_parent_port is not None and port_info['port-id'].startswith(
current_parent_port['port-id']):
port_info['speed'] = current_parent_port['speed']
port_info['speed-unit'] = current_parent_port['speed-unit']
yield port_info
def get_lags_config(netconf_config):
enabled_ports = {
p['port-id'] for p in get_ports_config(netconf_config) if p['admin-state'] == 'enable'
}
pxc_ports = get_pxc_ports(netconf_config)
def _lag_info(e):
_name = e.find('./lag-name').text
port_elements = e.findall('./port')
port_ids = (p.find('./port-id').text for p in port_elements)
description_e = e.find('description')
ifc = {
'name': _name,
'description': description_e.text if description_e is not None else '',
'admin-state': _get_admin_state_from_element(e),
'ports': [p_id for p_id in port_ids if p_id in enabled_ports],
}
return ifc
for lag in netconf_config.findall('./configure/lag'):
ports = {p.find('./port-id').text for p in lag.findall('./port')}
if pxc_ports > ports:
continue
yield _lag_info(lag)
def get_interfaces_config(netconf_config):
for interface in netconf_config.xpath('configure/router/interface '
'| configure/service/vprn/interface '
'| configure/service/ies/interface'):
details = {
"interface-name": interface.find('interface-name').text,
"ipv4": [],
"ipv6": [],
'admin-state': _get_admin_state_from_element(interface)
}
description = interface.find('description')
details["description"] = description.text if description is not None else ""
for element in interface.xpath('ipv4/primary | ipv4/secondary'):
address = element.find('address').text
prefix_length = element.find('prefix-length').text
details["ipv4"].append(f'{address}/{prefix_length}')
for element in interface.xpath('ipv6/address'):
address = element.find('ipv6-address').text
prefix_length = element.find('prefix-length').text
details["ipv6"].append(f'{address}/{prefix_length}')
yield details
def _get_neighbors_by_group(neighbor_elements):
neighbors_by_group = {}
for neighbor in neighbor_elements:
# admin_state = _get_admin_state_from_element(neighbor) # do we want to do anything with this?
group_name = neighbor.find('group').text
address = neighbor.find('ip-address').text
info = {
'address': ipaddress.ip_address(address).exploded
}
peer_as = neighbor.find('peer-as')
if peer_as is not None:
info['remote-asn'] = asn_to_int(peer_as.text)
description = neighbor.find('description')
if description is not None:
info['description'] = description.text
neighbors_by_group.setdefault(group_name, []).append(info)
return neighbors_by_group
def get_peer_info(neighbors_by_group, group_elements):
for bgp_group in group_elements:
# admin_state = _get_admin_state_from_element(bgp_group) # do we want to do anything with this?
group_name = bgp_group.find('group-name').text
details = {
'group': group_name
}
local_as = bgp_group.find('local-as')
if local_as is not None:
asn_value_node = local_as.find('as-number')
details['local-asn'] = asn_to_int(asn_value_node.text)
for neighbor in neighbors_by_group.get(group_name, []):
neighbor.update(details)
yield neighbor
def get_router_peers(netconf_config):
neighbors_by_group = _get_neighbors_by_group(netconf_config.xpath('configure/router/bgp/neighbor'))
group_elements = netconf_config.xpath('configure/router/bgp/group')
yield from get_peer_info(neighbors_by_group, group_elements)
def get_all_vprn_peers(netconf_config):
for vprn in netconf_config.xpath('configure/service/vprn'):
service_name = vprn.find('service-name').text
neighbors_by_group = _get_neighbors_by_group(vprn.xpath('bgp/neighbor'))
group_elements = vprn.xpath('bgp/group')
for peer in get_peer_info(neighbors_by_group, group_elements):
peer['instance'] = service_name # just to match the data from Juniper
yield peer
def get_all_bgp_peers(netconf_config):
yield from get_router_peers(netconf_config)
yield from get_all_vprn_peers(netconf_config)