import logging import re import ipaddress from jnpr.junos import Device from jnpr.junos import exception as EzErrors from lxml import etree import netifaces import requests CONFIG_SCHEMA = """<?xml version="1.1" encoding="UTF-8" ?> <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"> <xs:complexType name="generic-sequence"> <xs:sequence> <xs:any processContents="lax" minOccurs="0" maxOccurs="unbounded"/> </xs:sequence> <xs:anyAttribute processContents="skip" /> </xs:complexType> <!-- NOTE: 'unit' content isn't validated --> <xs:complexType name="juniper-interface"> <xs:sequence> <xs:choice minOccurs="1" maxOccurs="unbounded"> <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string" /> <xs:element name="description" minOccurs="0" maxOccurs="1"> <xs:complexType> <xs:simpleContent> <xs:extension base="xs:string"> <xs:attribute name="inactive" type="xs:string" /> </xs:extension> </xs:simpleContent> </xs:complexType> </xs:element> <xs:any processContents="lax" minOccurs="0" maxOccurs="unbounded" /> </xs:choice> </xs:sequence> <xs:attribute name="inactive" type="xs:string" /> </xs:complexType> <xs:element name="configuration"> <xs:complexType> <xs:sequence> <xs:choice minOccurs="1" maxOccurs="unbounded"> <xs:element name="transfer-on-commit" minOccurs="0" type="xs:string" /> <xs:element name="archive-sites" minOccurs="0" type="generic-sequence" /> <xs:element name="version" minOccurs="0" type="xs:string" /> <xs:element name="groups" minOccurs="0" type="generic-sequence" /> <xs:element name="apply-groups" minOccurs="0" type="xs:string" /> <xs:element name="system" minOccurs="0" type="generic-sequence" /> <xs:element name="logical-systems" minOccurs="0" type="generic-sequence" /> <xs:element name="chassis" minOccurs="0" type="generic-sequence" /> <xs:element name="services" minOccurs="0" type="generic-sequence" /> <xs:element name="interfaces" minOccurs="0"> <xs:complexType> <xs:sequence> <xs:choice minOccurs="1" maxOccurs="unbounded"> <xs:element name="apply-groups" minOccurs="0" type="xs:string" /> <xs:element name="interface-range" minOccurs="0" type="generic-sequence" /> <xs:element name="interface" minOccurs="1" maxOccurs="unbounded" type="juniper-interface" /> </xs:choice> </xs:sequence> </xs:complexType> </xs:element> <xs:element name="snmp" minOccurs="0" type="generic-sequence" /> <xs:element name="forwarding-options" minOccurs="0" type="generic-sequence" /> <xs:element name="routing-options" minOccurs="0" type="generic-sequence" /> <xs:element name="protocols" minOccurs="0" type="generic-sequence" /> <xs:element name="policy-options" minOccurs="0" type="generic-sequence" /> <xs:element name="class-of-service" minOccurs="0" type="generic-sequence" /> <xs:element name="firewall" minOccurs="0" type="generic-sequence" /> <xs:element name="routing-instances" minOccurs="0" type="generic-sequence" /> <xs:element name="bridge-domains" minOccurs="0" type="generic-sequence" /> <xs:element name="virtual-chassis" minOccurs="0" type="generic-sequence" /> <xs:element name="vlans" minOccurs="0" type="generic-sequence" /> <xs:element name="comment" minOccurs="0" type="xs:string" /> </xs:choice> </xs:sequence> <xs:attribute name="changed-seconds" type="xs:string" /> <xs:attribute name="changed-localtime" type="xs:string" /> </xs:complexType> </xs:element> </xs:schema> """ # noqa: E501 UNIT_SCHEMA = """<?xml version="1.1" encoding="UTF-8" ?> <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"> <xs:complexType name="generic-sequence"> <xs:sequence> <xs:any processContents="lax" minOccurs="0" maxOccurs="unbounded"/> </xs:sequence> <xs:anyAttribute processContents="skip" /> </xs:complexType> <xs:element name="unit"> <xs:complexType> <xs:sequence> <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:int" /> <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string" /> <xs:any processContents="lax" minOccurs="0" maxOccurs="unbounded"/> </xs:sequence> <xs:attribute name="inactive" type="xs:string" /> </xs:complexType> </xs:element> </xs:schema> """ # noqa: E501 PEERING_LIST_SCHEMA = { "$schema": "http://json-schema.org/draft-07/schema#", "definitions": { "peering": { "type": "object", "properties": { "group": {"type": "string"}, "description": {"type": "string"}, "address": {"type": "string"}, "remote-asn": {"type": "integer"}, "local-asn": {"type": "integer"}, "instance": {"type": "string"}, "logical-system": {"type": "string"}, }, # lots of internal peerings - so maybe no explicit asn's "required": ["group", "address"], "additionalProperties": False } }, "type": "array", "items": {"$ref": "#/definitions/peering"} } class NetconfHandlingError(Exception): pass def _rpc(hostname, ssh): dev = Device( host=hostname, user=ssh['username'], ssh_private_key_file=ssh['private-key']) try: dev.open() except (EzErrors.ConnectError, EzErrors.RpcError) as e: raise ConnectionError(str(e)) return dev.rpc def validate_netconf_config(config_doc): """ :param config_doc: :return: :raises: NetconfHandlingError in case of validation errors """ logger = logging.getLogger(__name__) def _validate(schema, doc): if schema.validate(doc): return messages = [] for e in schema.error_log: msg = f'{e.line}.{e.column}: {e.message}' messages.append(msg) logger.error(msg) raise NetconfHandlingError('\n'.join(messages)) schema_doc = etree.XML(CONFIG_SCHEMA.encode('utf-8')) config_schema = etree.XMLSchema(schema_doc) _validate(config_schema, config_doc) # validate interfaces/interface/unit elements ... schema_doc = etree.XML(UNIT_SCHEMA.encode('utf-8')) unit_schema = etree.XMLSchema(schema_doc) for i in config_doc.xpath('//configuration/interfaces/interface'): for u in i.xpath('./unit'): _validate(unit_schema, u) def load_config(hostname, ssh_params, validate=True): """ loads netconf data from the router, validates (by default) and returns as an lxml etree doc :param hostname: router hostname :param ssh_params: 'ssh' config element(cf. config.py:CONFIG_SCHEMA) :param validate: whether or not to validate netconf data (default True) :return: :raises: NetconfHandlingError from validate_netconf_config """ logger = logging.getLogger(__name__) logger.info("capturing netconf data for '%s'" % hostname) config = _rpc(hostname, ssh_params).get_config() if validate: validate_netconf_config(config) return config def list_interfaces(netconf_config): """ generator that parses netconf output and yields a list of interfaces :param netconf_config: xml doc that was generated by load_config :return: """ def _ifc_info(e): # warning: this structure should match the default # returned from routes.classifier.juniper_link_info name = e.find('name') assert name is not None, "expected interface 'name' child element" ifc = { 'name': name.text, 'description': '', 'bundle': [] } description = e.find('description') if description is not None: ifc['description'] = description.text for b in i.iterfind(".//bundle"): ifc['bundle'].append(b.text) ifc['ipv4'] = e.xpath('./family/inet/address/name/text()') ifc['ipv6'] = e.xpath('./family/inet6/address/name/text()') return ifc def _units(base_name, node): for u in node.xpath('./unit'): if u.get('inactive', None) == 'inactive': continue unit_info = _ifc_info(u) unit_info['name'] = "%s.%s" % (base_name, unit_info['name']) yield unit_info for i in netconf_config.xpath('//configuration/interfaces/interface'): info = _ifc_info(i) yield info for u in _units(info['name'], i): yield u for i in netconf_config.xpath( '//configuration/logical-systems/interfaces/interface'): name = i.find('name') assert name is not None, 'expected interface ''name'' child element' for u in _units(name.text, i): yield u def _system_bgp_peers(system_node): def _peering_params(neighbor_node): address = neighbor_node.find('name').text info = {'address': ipaddress.ip_address(address).exploded} peer_as = neighbor_node.find('peer-as') if peer_as is not None: # lxml usage warning: can't just test `if peer_as:` info['remote-asn'] = int(peer_as.text) local_as = neighbor_node.find('local-as') if local_as is not None: asn_value_node = local_as.find('as-number') info['local-asn'] = int(asn_value_node.text) description = neighbor_node.find('description') if description is not None: # lxml usage warning: can't just test `if description:` info['description'] = description.text return info def _neighbors(group_node): for neighbor in group_node.xpath('./neighbor'): inactive = neighbor.get('inactive') if inactive == 'inactive': continue yield _peering_params(neighbor) for group in system_node.xpath('./protocols/bgp/group'): group_name = group.find('name').text for peer in _neighbors(group): peer['group'] = group_name yield peer for instance in system_node.xpath( './routing-instances/instance'): instance_name = instance.find('name').text for peer in _system_bgp_peers(instance): peer['instance'] = instance_name yield peer def all_bgp_peers(netconf_config): for base_system in netconf_config.xpath('//configuration'): # there should only be one yield from _system_bgp_peers(base_system) for logical_system in netconf_config.xpath( '//configuration/logical-systems'): logical_system_name = logical_system.find('name').text for peer in _system_bgp_peers(logical_system): peer['logical-system'] = logical_system_name yield peer def interface_addresses(netconf_config): """ yields a list of all distinct interface addresses :param netconf_config: :return: """ for ifc in list_interfaces(netconf_config): for address in ifc['ipv4'] + ifc['ipv6']: yield { "name": ipaddress.ip_interface(address).ip.exploded, "interface address": address, "interface name": ifc['name'] } def load_routers_from_netdash(url): """ query url for a linefeed-delmitted list of managed router hostnames :param url: url of alldevices.txt file :return: list of router hostnames """ r = requests.get(url=url) r.raise_for_status() return [ ln.strip() for ln in r.text.splitlines() if ln.strip() ] def local_interfaces( type=netifaces.AF_INET, omit_link_local=True, omit_loopback=True): """ generator yielding IPv4Interface or IPv6Interface objects, depending on the value of type :param type: hopefully AF_INET or AF_INET6 :param omit_link_local: skip v6 fe80* addresses if true :param omit_loopback: skip lo* interfaces if true :return: """ for n in netifaces.interfaces(): if omit_loopback and re.match(r'^lo\d+', n): continue am = netifaces.ifaddresses(n) for a in am.get(type, []): if omit_link_local and a['addr'].startswith('fe80:'): continue m = re.match(r'^(.+?)(%.*)?$', a['addr']) assert m addr = m.group(1) m = re.match(r'.*/(\d+)$', a['netmask']) if m: mask = m.group(1) else: mask = a['netmask'] yield ipaddress.ip_interface('%s/%s' % (addr, mask)) def snmp_community_string(netconf_config): my_addressess = list([i.ip for i in local_interfaces()]) for community in netconf_config.xpath('//configuration/snmp/community'): for subnet in community.xpath('./clients/name/text()'): allowed_network = ipaddress.ip_network(subnet, strict=False) for me in my_addressess: if me in allowed_network: return community.xpath('./name/text()')[0] return None def netconf_changed_timestamp(netconf_config): ''' return the last change timestamp published by the config document :param netconf_config: netconf lxml etree document :return: an epoch timestamp (integer number of seconds) or None ''' for ts in netconf_config.xpath('/configuration/@changed-seconds'): if re.match(r'^\d+$', ts): return int(ts) logger = logging.getLogger(__name__) logger.warning('no valid timestamp found in netconf configuration') return None def logical_systems(netconf_config): """ Return a list of logical system names for the router. It's not an error if a router has no defined logical systems. :param netconf_config: netconf lxml etree document :return: a list of strings """ return netconf_config.xpath('//configuration/logical-systems/name/text()')