import logging import re from ncclient import manager, xml_ logger = logging.getLogger(__name__) ROOT_NS = 'urn:ietf:params:xml:ns:netconf:base:1.0' NOKIA_NS = 'urn:nokia.com:sros:ns:yang:sr:conf' NOKIA_STATE_NS = 'urn:nokia.com:sros:ns:yang:sr:state' NS = { 'r': ROOT_NS, 'n': NOKIA_NS, 's': NOKIA_STATE_NS, } BREAKOUT_PATTERN = re.compile( r'c(?P<count>\d+)-(?P<speed>\d+)(?P<unit>[a-zA-Z]+)' ) ''' For translating the breakout string to a speed example: c1-400g - This has a single 400G port, groups would be as follows: 1 or 'count' - 1 2 or 'speed' - 400 3 or 'unit' - g c4-100g - This has four 100G ports, not all of them may be used groups would be as follows: 1 or 'count' - 4 2 or 'speed' - 100 3 or 'unit' - g ''' SPEED_UNITS = { 'g': 'Gbps', 'G': 'Gbps', } STATE_FILTER = '''<filter> <nokia-state:state xmlns:nokia-state="urn:nokia.com:sros:ns:yang:sr:state"> <nokia-state:port> <nokia-state:port-id/> <nokia-state:down-reason/> <nokia-state:oper-state/> <nokia-state:port-state/> <nokia-state:type/> <nokia-state:if-index/> <nokia-state:ethernet> <nokia-state:oper-speed/> </nokia-state:ethernet> </nokia-state:port> <nokia-state:lag> <nokia-state:lag-name/> <nokia-state:oper-state/> <nokia-state:if-index/> <nokia-state:number-port-up/> <nokia-state:port> <nokia-state:port-id/> </nokia-state:port> </nokia-state:lag> <nokia-state:router> <nokia-state:interface> <nokia-state:interface-name/> <nokia-state:if-index/> <nokia-state:oper-state/> <nokia-state:protocol/> <nokia-state:ipv4> <nokia-state:oper-state/> <nokia-state:down-reason/> <nokia-state:primary> <nokia-state:oper-address/> </nokia-state:primary> <nokia-state:secondary> <nokia-state:address/> <nokia-state:oper-address/> </nokia-state:secondary> </nokia-state:ipv4> <nokia-state:ipv6> <nokia-state:oper-state/> <nokia-state:down-reason/> <nokia-state:address> <nokia-state:ipv6-address/> <nokia-state:address-state/> <nokia-state:oper-address/> <nokia-state:primary-preferred/> </nokia-state:address> </nokia-state:ipv6> </nokia-state:interface> </nokia-state:router> </nokia-state:state> </filter>''' def load_docs(hostname, ssh_params, hostkey_verify=False): logger.info(f'capturing netconf data for "{hostname}"') params = { 'host': hostname, 'hostkey_verify': hostkey_verify, 'device_params': {'name': 'sros'}, 'nc_params': {'capabilities': ['urn:nokia.com:nc:pysros:pc']}, 'timeout': 60 } params.update(ssh_params) with manager.connect(**params) as m: # when adding the device_params to the connect call, the object returned from # manager.get() and manager.get_config is a ncclient.xml_.NCElement object, # which does not provide the data property this is a workaround to get the data # property. The alternative would be to access the private _NCElement__result # which is of type ncclient.operations.retrieve.GetReply and has a data property running = xml_.to_ele(m.get_config(source='running').data_xml).getchildren()[0] state = xml_.to_ele(m.get(filter=STATE_FILTER).data_xml).getchildren()[0] return running, state def get_ports_state(state_doc): def _port_info(e): pi = { 'port-id': e.find('s:port-id', namespaces=NS).text, 'oper-state': e.find('s:oper-state', namespaces=NS).text, 'port-state': e.find('s:port-state', namespaces=NS).text, 'type': e.find('s:type', namespaces=NS).text, 'if-index': e.find('s:if-index', namespaces=NS).text, } down_reason = e.find('s:down-reason', namespaces=NS) if down_reason is not None: pi['down-reason'] = down_reason.text return pi ports = state_doc.findall('./s:state/s:port', namespaces=NS) for port in ports: yield _port_info(port) def get_lags_state(state_doc): def _lag_info(e): _name = e.find('s:lag-name', namespaces=NS).text _state = e.find('s:oper-state', namespaces=NS).text _if_index = e.find('s:if-index', namespaces=NS).text _number_port_up = e.find('s:number-port-up', namespaces=NS).text ports = [p.find('s:port-id', namespaces=NS).text for p in e.findall('s:port', namespaces=NS)] return { 'name': _name, 'oper-state': _state, 'if-index': _if_index, 'number-port-up': _number_port_up, 'ports': ports, } lags = state_doc.findall('./s:state/s:lag', namespaces=NS) for lag in lags: yield _lag_info(lag) def get_interfaces_state(state_doc): interfaces = state_doc.findall('./s:state/s:router/s:interface', namespaces=NS) for interface_ in interfaces: details = { "interface-name": interface_.find('s:interface-name', namespaces=NS).text, "if-index": int(interface_.find('s:if-index', namespaces=NS).text), "oper-state": interface_.find('s:oper-state', namespaces=NS).text, } ipv4 = interface_.find('s:ipv4', namespaces=NS) if ipv4 is not None: details['ipv4-state'] = ipv4.find('s:oper-state', namespaces=NS).text ipv6 = interface_.find('s:ipv6', namespaces=NS) if ipv6 is not None: details['ipv6-state'] = ipv6.find('s:oper-state', namespaces=NS).text yield details def get_ports_config(netconf_config): def _port_info(e): pi = { 'port-id': e.find('n:port-id', namespaces=NS).text, 'admin-state': e.find('n:admin-state', namespaces=NS).text, } description = e.find('n:description', namespaces=NS) pi['description'] = description.text if description is not None else '' breakout = e.find('./n:connector/n:breakout', namespaces=NS) if breakout is not None: breakout = breakout.text breakout_match = BREAKOUT_PATTERN.match(breakout) pi['breakout'] = breakout pi['speed'] = int(breakout_match.group('speed')) pi['speed-unit'] = SPEED_UNITS.get( breakout_match.group('unit'), 'Unknown') return pi # making the assumption that the breakout ports are listed directly before their # child ports ports = netconf_config.findall('./n:configure/n:port', namespaces=NS) current_parent_port = None for port in ports: port_info = _port_info(port) if 'breakout' in port_info: current_parent_port = port_info elif current_parent_port is not None and port_info['port-id'].startswith( current_parent_port['port-id']): port_info['speed'] = current_parent_port['speed'] port_info['speed-unit'] = current_parent_port['speed-unit'] yield port_info def get_lags_config(netconf_config): def _lag_info(e): _name = e.find('./n:lag-name', namespaces=NS).text def get_port(p): return p.find('./n:port-id', namespaces=NS).text port_elements = e.findall('./n:port', namespaces=NS) ports = [get_port(p) for p in port_elements] admin_state = e.find('./n:admin-state', namespaces=NS).text description_e = e.find('n:description', namespaces=NS) ifc = { 'name': _name, 'description': ( description_e.text if description_e is not None else '' ), 'admin-state': admin_state, 'ports': ports, } return ifc lags = netconf_config.findall('./n:configure/n:lag', namespaces=NS) for lag in lags: yield _lag_info(lag) def get_interfaces_config(netconf_config): def _get_ip_address(e): for details_parent in e: # example element # <primary> # details_parent - not always primary # <address>62.40.119.9</address> # <prefix-length>32</prefix-length> # </primary> address = details_parent[0].text prefix_length = details_parent[1].text ip_string = f'{address}/{prefix_length}' yield ip_string interfaces = netconf_config.findall( 'n:configure/n:router/n:interface', namespaces=NS) for interface in interfaces: details = { "interface-name": interface.find('n:interface-name', namespaces=NS).text, "ipv4": [], "ipv6": [], } description = interface.find('n:description', namespaces=NS) details["description"] = description.text if description is not None else "" admin_state = interface.find('n:admin-state', namespaces=NS) if admin_state is not None: details["admin-state"] = admin_state.text for element in interface.xpath('n:ipv4', namespaces=NS): details["ipv4"].extend(_get_ip_address(element)) for element in interface.xpath('n:ipv6', namespaces=NS): details["ipv6"].extend(_get_ip_address(element)) yield details