juniper.py 3.17 KiB
import contextlib
import json
import logging
import re
import paramiko
from inventory_provider.constants import JUNIPER_LOGGER_NAME
def neighbors(router, routing_instances=["IAS"], group_expression=None):
for config in router["bgp"]["configuration"]:
for ri in config["routing-instances"]:
for inst in ri["instance"]:
if inst["name"]["data"] not in routing_instances:
continue
for prot in inst["protocols"]:
for bgp in prot.get("bgp", []):
for g in bgp.get("group", []):
if group_expression and not \
re.match(group_expression, g["name"]["data"]):
continue
for n in g["neighbor"]:
yield n
def interfaces(router):
for ifc_info in router["interfaces"]["interface-information"]:
for ifc_list in ifc_info["logical-interface"]:
for ifc in ifc_list:
yield {
"name": ifc["name"][0]["data"],
"status": "%s/%s" % (
ifc["admin-status"][0]["data"],
ifc["oper-status"][0]["data"]),
"description": ifc["description"][0]["data"],
"type": "logical"
}
@contextlib.contextmanager
def ssh_connection(router, ssh_params):
import os
key_filename = os.path.join(os.path.dirname(__file__), ssh_params["private-key"])
known_hosts = os.path.join(os.path.dirname(__file__), ssh_params["known-hosts"])
k = paramiko.DSSKey.from_private_key_file(key_filename)
router["hostname"] = "mx1.ams.nl.geant.net"
with paramiko.SSHClient() as ssh:
ssh.load_host_keys(known_hosts)
ssh.connect(
hostname=router["hostname"],
username="Monit0r",
pkey=k)
yield ssh
def exec_router_commands_json(router, ssh_params, commands):
juniper_logger = logging.getLogger(JUNIPER_LOGGER_NAME)
with ssh_connection(router, ssh_params) as ssh:
_, stdout, _ = ssh.exec_command("set cli screen-length 0")
assert stdout.channel.recv_exit_status() == 0
def _dups_to_list(pairs):
counter_map = {}
for k, v in pairs:
counter_map.setdefault(k, []).append(v)
result = {}
for k, v in counter_map.items():
if len(v) == 1:
result[k] = v[0]
else:
result[k] = v
return result
for c in commands:
juniper_logger.debug("command: '%s'" % (c + " | display json"))
_, stdout, _ = ssh.exec_command(c + " | display json")
assert stdout.channel.recv_exit_status() == 0
# TODO: error handling
output = stdout.read()
if output:
juniper_logger.debug("%r output: [%d] %r" % (router, len(output), output[:20]))
yield json.loads(output, object_pairs_hook=_dups_to_list)
else:
juniper_logger.debug("%r output empty" % router)
yield {}