import logging from jnpr.junos import Device from lxml import etree from inventory_provider.constants import JUNIPER_LOGGER_NAME CONFIG_SCHEMA = """<?xml version="1.1" encoding="UTF-8" ?> <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"> <xs:complexType name="generic-sequence"> <xs:sequence> <xs:any processContents="lax" minOccurs="0" maxOccurs="unbounded"/> </xs:sequence> <xs:anyAttribute processContents="skip" /> </xs:complexType> <!-- NOTE: 'unit' content isn't validated --> <xs:complexType name="juniper-interface"> <xs:sequence> <xs:choice minOccurs="1" maxOccurs="unbounded"> <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string" /> <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string" /> <xs:any processContents="lax" minOccurs="0" maxOccurs="unbounded" /> </xs:choice> </xs:sequence> <xs:attribute name="inactive" type="xs:string" /> </xs:complexType> <xs:element name="configuration"> <xs:complexType> <xs:sequence> <xs:choice minOccurs="1" maxOccurs="unbounded"> <xs:element name="version" minOccurs="0" type="xs:string" /> <xs:element name="groups" minOccurs="0" type="generic-sequence" /> <xs:element name="apply-groups" minOccurs="0" type="xs:string" /> <xs:element name="system" minOccurs="0" type="generic-sequence" /> <xs:element name="logical-systems" minOccurs="0" type="generic-sequence" /> <xs:element name="chassis" minOccurs="0" type="generic-sequence" /> <xs:element name="services" minOccurs="0" type="generic-sequence" /> <xs:element name="interfaces" minOccurs="0"> <xs:complexType> <xs:sequence> <xs:choice minOccurs="1" maxOccurs="unbounded"> <xs:element name="apply-groups" minOccurs="0" type="xs:string" /> <xs:element name="interface" minOccurs="1" maxOccurs="unbounded" type="juniper-interface" /> </xs:choice> </xs:sequence> </xs:complexType> </xs:element> <xs:element name="snmp" minOccurs="0" type="generic-sequence" /> <xs:element name="forwarding-options" minOccurs="0" type="generic-sequence" /> <xs:element name="routing-options" minOccurs="0" type="generic-sequence" /> <xs:element name="protocols" minOccurs="0" type="generic-sequence" /> <xs:element name="policy-options" minOccurs="0" type="generic-sequence" /> <xs:element name="class-of-service" minOccurs="0" type="generic-sequence" /> <xs:element name="firewall" minOccurs="0" type="generic-sequence" /> <xs:element name="routing-instances" minOccurs="0" type="generic-sequence" /> <xs:element name="bridge-domains" minOccurs="0" type="generic-sequence" /> </xs:choice> </xs:sequence> <xs:attribute name="changed-seconds" type="xs:string" /> <xs:attribute name="changed-localtime" type="xs:string" /> </xs:complexType> </xs:element> </xs:schema> """ # noqa: E501 UNIT_SCHEMA = """<?xml version="1.1" encoding="UTF-8" ?> <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"> <xs:complexType name="generic-sequence"> <xs:sequence> <xs:any processContents="lax" minOccurs="0" maxOccurs="unbounded"/> </xs:sequence> <xs:anyAttribute processContents="skip" /> </xs:complexType> <xs:element name="unit"> <xs:complexType> <xs:sequence> <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:int" /> <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string" /> <xs:any processContents="lax" minOccurs="0" maxOccurs="unbounded"/> </xs:sequence> <xs:attribute name="inactive" type="xs:string" /> </xs:complexType> </xs:element> </xs:schema> """ # noqa: E501 def _rpc(hostname, ssh): dev = Device( host=hostname, user=ssh['username'], ssh_private_key_file=ssh['private-key']) dev.open() return dev.rpc def load_config(hostname, ssh_params): """ loads netconf data from the router, validates and returns as an lxml etree doc :param hostname: router hostname :param ssh_params: 'ssh' config element(cf. config.py:CONFIG_SCHEMA) :return: """ juniper_logger = logging.getLogger(JUNIPER_LOGGER_NAME) juniper_logger.info("capturing netconf data for '%s'" % hostname) def _validate(schema, doc): if schema.validate(doc): return for e in schema.error_log: juniper_logger.error("%d.%d: %s" % (e.line, e.column, e.message)) assert False schema_doc = etree.XML(CONFIG_SCHEMA.encode('utf-8')) config_schema = etree.XMLSchema(schema_doc) config = _rpc(hostname, ssh_params).get_config() _validate(config_schema, config) # validate interfaces/interface/unit elements ... schema_doc = etree.XML(UNIT_SCHEMA.encode('utf-8')) unit_schema = etree.XMLSchema(schema_doc) for i in config.xpath('//configuration/interfaces/interface'): for u in i.xpath('./unit'): _validate(unit_schema, u) return config def list_interfaces(netconf_config): """ generator that parses netconf output and yields a list of interfaces :param netconf_config: xml doc that was generated by load_config :return: """ def _ifc_info(e): name = e.find('name') assert name is not None, "expected interface 'name' child element" ifc = { 'name': name.text, 'description': '' } description = e.find('description') if description is not None: ifc['description'] = description.text ifc['ipv4'] = e.xpath('./family/inet/address/name/text()') ifc['ipv6'] = e.xpath('./family/inet6/address/name/text()') return ifc for i in netconf_config.xpath('//configuration/interfaces/interface'): info = _ifc_info(i) yield info for u in i.xpath('./unit'): unit_info = _ifc_info(u) unit_info['name'] = "%s.%s" % (info['name'], unit_info['name']) yield unit_info def list_bgp_routes(netconf_config): for r in netconf_config.xpath( '//configuration/routing-instances/' 'instance[name/text()="IAS"]/protocols/bgp/' 'group[starts-with(name/text(), "GEANT-IX")]/' 'neighbor'): name = r.find('name') description = r.find('description') local_as = r.find('local-as') if local_as is not None: local_as = local_as.find('as-number') peer_as = r.find('peer-as') yield { 'name': name.text, 'description': description.text, 'as': { 'local': int(local_as.text), 'peer': int(peer_as.text) } } # note for enabling vrr data parsing ... # def fetch_vrr_config(hostname, ssh_params): # # commands = [ # _DISABLE_PAGING_COMMAND, # ('show configuration logical-systems ' # 'VRR protocols bgp | display json') # ] # # output = list(ssh_exec_commands(hostname, ssh_params, commands)) # assert len(output) == len(commands) # # return _loads(output[1]) if output[1] else {} #