juniper.py 7.25 KiB
import logging
from jnpr.junos import Device
from lxml import etree
from inventory_provider.constants import JUNIPER_LOGGER_NAME
CONFIG_SCHEMA = """<?xml version="1.1" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:complexType name="generic-sequence">
<xs:sequence>
<xs:any processContents="lax" minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
<xs:anyAttribute processContents="skip" />
</xs:complexType>
<!-- NOTE: 'unit' content isn't validated -->
<xs:complexType name="juniper-interface">
<xs:sequence>
<xs:choice minOccurs="1" maxOccurs="unbounded">
<xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string" />
<xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string" />
<xs:any processContents="lax" minOccurs="0" maxOccurs="unbounded" />
</xs:choice>
</xs:sequence>
<xs:attribute name="inactive" type="xs:string" />
</xs:complexType>
<xs:element name="configuration">
<xs:complexType>
<xs:sequence>
<xs:choice minOccurs="1" maxOccurs="unbounded">
<xs:element name="version" minOccurs="0" type="xs:string" />
<xs:element name="groups" minOccurs="0" type="generic-sequence" />
<xs:element name="apply-groups" minOccurs="0" type="xs:string" />
<xs:element name="system" minOccurs="0" type="generic-sequence" />
<xs:element name="logical-systems" minOccurs="0" type="generic-sequence" />
<xs:element name="chassis" minOccurs="0" type="generic-sequence" />
<xs:element name="services" minOccurs="0" type="generic-sequence" />
<xs:element name="interfaces" minOccurs="0">
<xs:complexType>
<xs:sequence>
<xs:choice minOccurs="1" maxOccurs="unbounded">
<xs:element name="apply-groups" minOccurs="0" type="xs:string" />
<xs:element name="interface" minOccurs="1" maxOccurs="unbounded" type="juniper-interface" />
</xs:choice>
</xs:sequence>
</xs:complexType>
</xs:element>
<xs:element name="snmp" minOccurs="0" type="generic-sequence" />
<xs:element name="forwarding-options" minOccurs="0" type="generic-sequence" />
<xs:element name="routing-options" minOccurs="0" type="generic-sequence" />
<xs:element name="protocols" minOccurs="0" type="generic-sequence" />
<xs:element name="policy-options" minOccurs="0" type="generic-sequence" />
<xs:element name="class-of-service" minOccurs="0" type="generic-sequence" />
<xs:element name="firewall" minOccurs="0" type="generic-sequence" />
<xs:element name="routing-instances" minOccurs="0" type="generic-sequence" />
<xs:element name="bridge-domains" minOccurs="0" type="generic-sequence" />
</xs:choice>
</xs:sequence>
<xs:attribute name="changed-seconds" type="xs:string" />
<xs:attribute name="changed-localtime" type="xs:string" />
</xs:complexType>
</xs:element>
</xs:schema>
""" # noqa: E501
UNIT_SCHEMA = """<?xml version="1.1" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:complexType name="generic-sequence">
<xs:sequence>
<xs:any processContents="lax" minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
<xs:anyAttribute processContents="skip" />
</xs:complexType>
<xs:element name="unit">
<xs:complexType>
<xs:sequence>
<xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:int" />
<xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string" />
<xs:any processContents="lax" minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
<xs:attribute name="inactive" type="xs:string" />
</xs:complexType>
</xs:element>
</xs:schema>
""" # noqa: E501
def _rpc(hostname, ssh):
dev = Device(
host=hostname,
user=ssh['username'],
ssh_private_key_file=ssh['private-key'])
dev.open()
return dev.rpc
def load_config(hostname, ssh_params):
"""
loads netconf data from the router, validates and
returns as an lxml etree doc
:param hostname: router hostname
:param ssh_params: 'ssh' config element(cf. config.py:CONFIG_SCHEMA)
:return:
"""
juniper_logger = logging.getLogger(JUNIPER_LOGGER_NAME)
juniper_logger.info("capturing netconf data for '%s'" % hostname)
def _validate(schema, doc):
if schema.validate(doc):
return
for e in schema.error_log:
juniper_logger.error("%d.%d: %s" % (e.line, e.column, e.message))
assert False
schema_doc = etree.XML(CONFIG_SCHEMA.encode('utf-8'))
config_schema = etree.XMLSchema(schema_doc)
config = _rpc(hostname, ssh_params).get_config()
_validate(config_schema, config)
# validate interfaces/interface/unit elements ...
schema_doc = etree.XML(UNIT_SCHEMA.encode('utf-8'))
unit_schema = etree.XMLSchema(schema_doc)
for i in config.xpath('//configuration/interfaces/interface'):
for u in i.xpath('./unit'):
_validate(unit_schema, u)
return config
def list_interfaces(netconf_config):
"""
generator that parses netconf output and
yields a list of interfaces
:param netconf_config: xml doc that was generated by load_config
:return:
"""
def _ifc_info(e):
name = e.find('name')
assert name is not None, "expected interface 'name' child element"
ifc = {
'name': name.text,
'description': ''
}
description = e.find('description')
if description is not None:
ifc['description'] = description.text
ifc['ipv4'] = e.xpath('./family/inet/address/name/text()')
ifc['ipv6'] = e.xpath('./family/inet6/address/name/text()')
return ifc
for i in netconf_config.xpath('//configuration/interfaces/interface'):
info = _ifc_info(i)
yield info
for u in i.xpath('./unit'):
unit_info = _ifc_info(u)
unit_info['name'] = "%s.%s" % (info['name'], unit_info['name'])
yield unit_info
def list_bgp_routes(netconf_config):
for r in netconf_config.xpath(
'//configuration/routing-instances/'
'instance[name/text()="IAS"]/protocols/bgp/'
'group[starts-with(name/text(), "GEANT-IX")]/'
'neighbor'):
name = r.find('name')
description = r.find('description')
local_as = r.find('local-as')
if local_as is not None:
local_as = local_as.find('as-number')
peer_as = r.find('peer-as')
yield {
'name': name.text,
'description': description.text,
'as': {
'local': int(local_as.text),
'peer': int(peer_as.text)
}
}
# note for enabling vrr data parsing ...
# def fetch_vrr_config(hostname, ssh_params):
#
# commands = [
# _DISABLE_PAGING_COMMAND,
# ('show configuration logical-systems '
# 'VRR protocols bgp | display json')
# ]
#
# output = list(ssh_exec_commands(hostname, ssh_params, commands))
# assert len(output) == len(commands)
#
# return _loads(output[1]) if output[1] else {}
#