Skip to content
Snippets Groups Projects
Commit 53bf4e27 authored by Erik Reid's avatar Erik Reid
Browse files

renamed netconf module to juniper

parent b4fab396
No related branches found
No related tags found
No related merge requests found
File moved
......@@ -5,7 +5,7 @@ from flask import Blueprint, request, Response, current_app
from lxml import etree
import redis
from inventory_provider import netconf
from inventory_provider import juniper
routes = Blueprint("inventory-data-query-routes", __name__)
......@@ -70,7 +70,7 @@ def router_interfaces(hostname):
status=404,
mimetype="text/html")
interfaces = list(netconf.list_interfaces(
interfaces = list(juniper.list_interfaces(
etree.XML(netconf_string.decode('utf-8'))))
if not interfaces:
return Response(
......@@ -149,7 +149,7 @@ def bgp_configs(hostname):
status=404,
mimetype="text/html")
routes = list(netconf.list_bgp_routes(
routes = list(juniper.list_bgp_routes(
etree.XML(netconf_string.decode('utf-8'))))
if not routes:
return Response(
......
......@@ -8,7 +8,7 @@ from lxml import etree
from inventory_provider.tasks.app import app
from inventory_provider import config
from inventory_provider import snmp, netconf
from inventory_provider import snmp, juniper
from inventory_provider import constants
logging.basicConfig(level=logging.WARNING)
......@@ -90,4 +90,4 @@ def netconf_refresh_config(self, hostname):
InventoryTask.save_key_etree(
hostname,
"netconf",
netconf.load_config(hostname, InventoryTask.config["ssh"]))
juniper.load_config(hostname, InventoryTask.config["ssh"]))
import os
from lxml import etree
from inventory_provider import netconf
from inventory_provider import juniper
def load_config_schema(hostname, ssh_params):
return netconf._rpc(hostname, ssh_params).get_xnm_information(
return juniper._rpc(hostname, ssh_params).get_xnm_information(
type='xml-schema',
namespace='junos-configuration')
......
def test_todo():
"""
refactor before using new netconf api
:return:
"""
pass
# import os
# import re
#
# import pytest
#
# from inventory_provider import juniper
#
# TEST_DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
import os
import jsonschema
from lxml import etree
import pytest
from inventory_provider import juniper
TEST_DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
class MockedJunosRpc(object):
def __init__(self, hostname):
filename = os.path.join(TEST_DATA_DIR, "%s-netconf.xml" % hostname)
self.config = etree.parse(filename)
def get_config(self):
return self.config
class MockedJunosDevice(object):
def __init__(self, **kwargs):
self.rpc = MockedJunosRpc(kwargs['host'])
def open(self):
pass
@pytest.fixture
def netconf_doc(mocker, router, data_config):
mocker.patch(
'inventory_provider.netconf.Device',
MockedJunosDevice)
return juniper.load_config(router, data_config['ssh'])
# def test_interface_list(netconf_doc):
#
# ROUTER_HOSTNAMES = [
# 'mx2.ath.gr.geant.net',
# 'mx1.tal.ee.geant.net',
# 'mx2.tal.ee.geant.net',
# 'mx2.rig.lv.geant.net',
# 'mx1.kau.lt.geant.net',
# 'mx2.kau.lt.geant.net',
# 'mx2.zag.hr.geant.net',
# 'mx2.lju.si.geant.net',
# 'mx1.bud.hu.geant.net',
# 'mx1.pra.cz.geant.net',
# 'mx2.bra.sk.geant.net',
# 'mx1.lon.uk.geant.net',
# 'mx1.vie.at.geant.net',
# 'mx2.bru.be.geant.net',
# 'mx1.poz.pl.geant.net',
# 'mx1.ams.nl.geant.net',
# 'mx1.fra.de.geant.net',
# 'mx1.par.fr.geant.net',
# 'mx1.gen.ch.geant.net',
# 'mx1.mil2.it.geant.net',
# 'mx1.lis.pt.geant.net',
# 'mx2.lis.pt.geant.net',
# 'mx1.mad.es.geant.net',
# 'mx1.sof.bg.geant.net',
# 'mx1.buc.ro.geant.net',
# 'mx1.ham.de.geant.net',
# 'mx1.dub.ie.geant.net',
# 'mx1.dub2.ie.geant.net',
# 'mx1.mar.fr.geant.net',
# 'mx1.lon2.uk.geant.net',
# 'mx1.ath2.gr.geant.net',
# ]
# CACHE_SCHEMA = {
# "$schema": "http://json-schema.org/draft-07/schema#",
# "type": "object",
# "patternProperties": {
# "^.*\\.geant\\.net$": {
# schema = {
# "$schema": "http://json-schema.org/draft-07/schema#",
# "type": "array",
# "items": {
# "type": "object",
# "properties": {
# "bgp": {"type": "object"},
# "vrr": {"type": "object"},
# "interfaces": {"type": "object"},
# "snmp-interfaces": {
# "type": "array",
# "items": {
# "type": "object",
# "properties": {
# "v4Address": {"type": "string"},
# "v4Mask": {"type": "string"},
# "v4InterfaceName": {"type": "string"},
# "v6Address": {"type": "string"},
# "v6Mask": {"type": "string"},
# "v6InterfaceName": {"type": "string"},
# },
# "additionalProperties": False
# }
# }
# "name": {"type": "string"},
# "description": {"type": "string"}
# },
# "required": ["bgp", "vrr", "interfaces", "snmp-interfaces"],
# "required": ["name", "description"],
# "additionalProperties": False
# }
# },
# "additionalProperties": False
# }
#
#
# def _parsed_old_style_output_data(s):
# for l in s.splitlines():
# if not l:
# continue
#
# m = re.match(
# (r'^set\s+routing-instances\s+(\S+)+\s+'
# r'protocols\s+(\S+)\s+'
# r'group\s+(\S+)\s+'
# r'neighbor\s+([a-f\d\.:]+)\s+'
# r'description\s+"?([^"]+)"?\s*$'), l)
# assert m
#
# yield {
# "routing-instances": m.group(1),
# "protocols": m.group(2),
# "group": m.group(3),
# "neighbor": m.group(4),
# "description": m.group(5)
# }
#
#
# @pytest.fixture(params=ROUTER_HOSTNAMES)
# def router_output(request):
# output = {}
# for key in ["bgpv4", "bgpv6", "bgp", "interfaces", "vrr"]:
# filename = os.path.join(
# TEST_DATA_DIR,
# "%s-%s.output" % (request.param, key))
# with open(filename) as f:
# output[key] = f.read()
# return output
#
#
# def test_ipv4_neighbors(mocker, router_output):
# }
#
# old_v4_data = dict([
# (x["neighbor"], x)
# for x in
# _parsed_old_style_output_data(router_output["bgpv4"])])
#
# def _mocked_ssh_exec_commands(hostname, params, commands):
# assert len(commands) == 2 # the expected number
# return [None, router_output["bgp"]]
#
# mocker.patch(
# 'inventory_provider.juniper.ssh_exec_commands',
# _mocked_ssh_exec_commands)
#
# neighbors = juniper.fetch_bgp_config(
# None,
# None,
# group_expression=r'^GEANT-IX[\s-].*$')
#
# assert len(neighbors) == len(old_v4_data)
# for n in neighbors:
# address = n["name"]["data"]
# description = n["description"][0]["data"]
# assert old_v4_data[address]["description"] == description
#
#
# def test_ipv6_neighbors(mocker, router_output):
#
# old_v6_data = dict([
# (x["neighbor"], x)
# for x in
# _parsed_old_style_output_data(router_output["bgpv6"])])
#
# def _mocked_ssh_exec_commands(hostname, params, commands):
# assert len(commands) == 2 # the expected number
# return [None, router_output["bgp"]]
#
# mocker.patch(
# 'inventory_provider.juniper.ssh_exec_commands',
# _mocked_ssh_exec_commands)
#
# neighbors = juniper.fetch_bgp_config(
# None,
# None,
# group_expression=r'^GEANT-IXv6[\s-].*$')
#
# assert len(neighbors) == len(old_v6_data)
# for n in neighbors:
# address = n["name"]["data"]
# description = n["description"][0]["data"]
# assert old_v6_data[address]["description"] == description
#
#
# COMMAND_HANDLERS = {
# 'bgp': juniper.fetch_bgp_config,
# 'vrr': juniper.fetch_vrr_config,
# 'interfaces': juniper.fetch_interfaces
# }
#
#
# def test_juniper_shell_output_parsing(mocker, router_output):
# """
# just call the correct parser for each type of shell output
# (not a proper test ... just verifies there's no crash)
#
# TODO: add jsonschema validation
# :param router_output:
# :return:
# """
#
# output = None
#
# def _mocked_ssh_exec_commands(hostname, params, commands):
# assert len(commands) == 2 # the expected number
# return [None, output]
#
# mocker.patch(
# 'inventory_provider.juniper.ssh_exec_commands',
# _mocked_ssh_exec_commands)
#
# for key in ["bgp", "vrr", "interfaces"]:
# output = router_output[key]
# COMMAND_HANDLERS[key](None, None)
# interfaces = list(netconf.list_interfaces(netconf_doc))
# jsonschema.validate(interfaces, schema)
# assert interfaces # at least shouldn't be empty
def test_bgp_list(netconf_doc):
schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"description": {"type": "string"},
"as": {
"type": "object",
"properties": {
"local": {"type": "integer"},
"peer": {"type": "integer"},
},
"required": ["local", "peer"],
"additionalProperties": False
}
},
"required": ["name", "description", "as"],
"additionalProperties": False
}
}
routes = list(juniper.list_bgp_routes(netconf_doc))
jsonschema.validate(routes, schema)
def test_todo():
"""
refactor before using new netconf api
:return:
"""
pass
# import os
# import re
#
# import pytest
#
# from inventory_provider import juniper
#
# TEST_DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
#
# ROUTER_HOSTNAMES = [
# 'mx2.ath.gr.geant.net',
# 'mx1.tal.ee.geant.net',
# 'mx2.tal.ee.geant.net',
# 'mx2.rig.lv.geant.net',
# 'mx1.kau.lt.geant.net',
# 'mx2.kau.lt.geant.net',
# 'mx2.zag.hr.geant.net',
# 'mx2.lju.si.geant.net',
# 'mx1.bud.hu.geant.net',
# 'mx1.pra.cz.geant.net',
# 'mx2.bra.sk.geant.net',
# 'mx1.lon.uk.geant.net',
# 'mx1.vie.at.geant.net',
# 'mx2.bru.be.geant.net',
# 'mx1.poz.pl.geant.net',
# 'mx1.ams.nl.geant.net',
# 'mx1.fra.de.geant.net',
# 'mx1.par.fr.geant.net',
# 'mx1.gen.ch.geant.net',
# 'mx1.mil2.it.geant.net',
# 'mx1.lis.pt.geant.net',
# 'mx2.lis.pt.geant.net',
# 'mx1.mad.es.geant.net',
# 'mx1.sof.bg.geant.net',
# 'mx1.buc.ro.geant.net',
# 'mx1.ham.de.geant.net',
# 'mx1.dub.ie.geant.net',
# 'mx1.dub2.ie.geant.net',
# 'mx1.mar.fr.geant.net',
# 'mx1.lon2.uk.geant.net',
# 'mx1.ath2.gr.geant.net',
# ]
# CACHE_SCHEMA = {
# "$schema": "http://json-schema.org/draft-07/schema#",
# "type": "object",
# "patternProperties": {
# "^.*\\.geant\\.net$": {
# "type": "object",
# "properties": {
# "bgp": {"type": "object"},
# "vrr": {"type": "object"},
# "interfaces": {"type": "object"},
# "snmp-interfaces": {
# "type": "array",
# "items": {
# "type": "object",
# "properties": {
# "v4Address": {"type": "string"},
# "v4Mask": {"type": "string"},
# "v4InterfaceName": {"type": "string"},
# "v6Address": {"type": "string"},
# "v6Mask": {"type": "string"},
# "v6InterfaceName": {"type": "string"},
# },
# "additionalProperties": False
# }
# }
# },
# "required": ["bgp", "vrr", "interfaces", "snmp-interfaces"],
# "additionalProperties": False
# }
# },
# "additionalProperties": False
# }
#
#
# def _parsed_old_style_output_data(s):
# for l in s.splitlines():
# if not l:
# continue
#
# m = re.match(
# (r'^set\s+routing-instances\s+(\S+)+\s+'
# r'protocols\s+(\S+)\s+'
# r'group\s+(\S+)\s+'
# r'neighbor\s+([a-f\d\.:]+)\s+'
# r'description\s+"?([^"]+)"?\s*$'), l)
# assert m
#
# yield {
# "routing-instances": m.group(1),
# "protocols": m.group(2),
# "group": m.group(3),
# "neighbor": m.group(4),
# "description": m.group(5)
# }
#
#
# @pytest.fixture(params=ROUTER_HOSTNAMES)
# def router_output(request):
# output = {}
# for key in ["bgpv4", "bgpv6", "bgp", "interfaces", "vrr"]:
# filename = os.path.join(
# TEST_DATA_DIR,
# "%s-%s.output" % (request.param, key))
# with open(filename) as f:
# output[key] = f.read()
# return output
#
#
# def test_ipv4_neighbors(mocker, router_output):
#
# old_v4_data = dict([
# (x["neighbor"], x)
# for x in
# _parsed_old_style_output_data(router_output["bgpv4"])])
#
# def _mocked_ssh_exec_commands(hostname, params, commands):
# assert len(commands) == 2 # the expected number
# return [None, router_output["bgp"]]
#
# mocker.patch(
# 'inventory_provider.juniper.ssh_exec_commands',
# _mocked_ssh_exec_commands)
#
# neighbors = juniper.fetch_bgp_config(
# None,
# None,
# group_expression=r'^GEANT-IX[\s-].*$')
#
# assert len(neighbors) == len(old_v4_data)
# for n in neighbors:
# address = n["name"]["data"]
# description = n["description"][0]["data"]
# assert old_v4_data[address]["description"] == description
#
#
# def test_ipv6_neighbors(mocker, router_output):
#
# old_v6_data = dict([
# (x["neighbor"], x)
# for x in
# _parsed_old_style_output_data(router_output["bgpv6"])])
#
# def _mocked_ssh_exec_commands(hostname, params, commands):
# assert len(commands) == 2 # the expected number
# return [None, router_output["bgp"]]
#
# mocker.patch(
# 'inventory_provider.juniper.ssh_exec_commands',
# _mocked_ssh_exec_commands)
#
# neighbors = juniper.fetch_bgp_config(
# None,
# None,
# group_expression=r'^GEANT-IXv6[\s-].*$')
#
# assert len(neighbors) == len(old_v6_data)
# for n in neighbors:
# address = n["name"]["data"]
# description = n["description"][0]["data"]
# assert old_v6_data[address]["description"] == description
#
#
# COMMAND_HANDLERS = {
# 'bgp': juniper.fetch_bgp_config,
# 'vrr': juniper.fetch_vrr_config,
# 'interfaces': juniper.fetch_interfaces
# }
#
#
# def test_juniper_shell_output_parsing(mocker, router_output):
# """
# just call the correct parser for each type of shell output
# (not a proper test ... just verifies there's no crash)
#
# TODO: add jsonschema validation
# :param router_output:
# :return:
# """
#
# output = None
#
# def _mocked_ssh_exec_commands(hostname, params, commands):
# assert len(commands) == 2 # the expected number
# return [None, output]
#
# mocker.patch(
# 'inventory_provider.juniper.ssh_exec_commands',
# _mocked_ssh_exec_commands)
#
# for key in ["bgp", "vrr", "interfaces"]:
# output = router_output[key]
# COMMAND_HANDLERS[key](None, None)
import os
import jsonschema
from lxml import etree
import pytest
from inventory_provider import netconf
TEST_DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
class MockedJunosRpc(object):
def __init__(self, hostname):
filename = os.path.join(TEST_DATA_DIR, "%s-netconf.xml" % hostname)
self.config = etree.parse(filename)
def get_config(self):
return self.config
class MockedJunosDevice(object):
def __init__(self, **kwargs):
self.rpc = MockedJunosRpc(kwargs['host'])
def open(self):
pass
@pytest.fixture
def netconf_doc(mocker, router, data_config):
mocker.patch(
'inventory_provider.netconf.Device',
MockedJunosDevice)
return netconf.load_config(router, data_config['ssh'])
# def test_interface_list(netconf_doc):
#
# schema = {
# "$schema": "http://json-schema.org/draft-07/schema#",
# "type": "array",
# "items": {
# "type": "object",
# "properties": {
# "name": {"type": "string"},
# "description": {"type": "string"}
# },
# "required": ["name", "description"],
# "additionalProperties": False
# }
# }
#
# interfaces = list(netconf.list_interfaces(netconf_doc))
# jsonschema.validate(interfaces, schema)
# assert interfaces # at least shouldn't be empty
def test_bgp_list(netconf_doc):
schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"description": {"type": "string"},
"as": {
"type": "object",
"properties": {
"local": {"type": "integer"},
"peer": {"type": "integer"},
},
"required": ["local", "peer"],
"additionalProperties": False
}
},
"required": ["name", "description", "as"],
"additionalProperties": False
}
}
routes = list(netconf.list_bgp_routes(netconf_doc))
jsonschema.validate(routes, schema)
import os
from inventory_provider import netconf
from lxml import etree
TEST_DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
class MockedJunosRpc(object):
def __init__(self, hostname):
filename = os.path.join(TEST_DATA_DIR, "%s-netconf.xml" % hostname)
self.config = etree.parse(filename)
def get_config(self):
return self.config
class MockedJunosDevice(object):
def __init__(self, **kwargs):
self.rpc = MockedJunosRpc(kwargs['host'])
def open(self):
pass
def test_nr1(mocker, data_config):
mocker.patch(
'inventory_provider.netconf.Device',
MockedJunosDevice)
import json
for r in data_config['routers']:
print(r['hostname'])
x = netconf.load_interfaces(r['hostname'], data_config['ssh'])
print(json.dumps(list(x), indent=4))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment