Skip to content
Snippets Groups Projects
Commit 05b9cd1a authored by Release Webservice's avatar Release Webservice
Browse files

Finished release 0.5.

parents e848bd3b 8420e7c3
Branches
Tags 0.5
No related merge requests found
Showing
with 1846 additions and 237 deletions
0.1: initial skeleton
0.2: use celery for task management
0.3: basic opsdb, alarmsdb coms & test api
0.4: added some further sample resources
\ No newline at end of file
0.4: added some further sample resources
0.5: added meaningful alarmsdb demo method
added snmp index to interface data to support sensu, prometheus
increased unit test coverage to 78%
\ No newline at end of file
import contextlib
import logging
import mysql.connector
from inventory_provider.constants import DATABASE_LOGGER_NAME
@contextlib.contextmanager
def connection(alarmsdb):
def connection(alarmsdb): # pragma: no cover
cx = None
try:
cx = mysql.connector.connect(
......@@ -22,7 +18,7 @@ def connection(alarmsdb):
@contextlib.contextmanager
def cursor(cnx):
def cursor(cnx): # pragma: no cover
csr = None
try:
csr = cnx.cursor()
......@@ -32,12 +28,61 @@ def cursor(cnx):
csr.close()
def _db_test(db, router):
database_logger = logging.getLogger(DATABASE_LOGGER_NAME)
def get_last_known_infinera_interface_status(db, equipment, interface):
query = "SELECT status FROM infinera_alarms" \
" WHERE" \
" CONCAT(ne_name, '-', REPLACE(object_name, 'T', '')) = %s" \
" ORDER BY ne_init_time DESC, ne_clear_time DESC LIMIT 1"
search_string = equipment + "-" + interface
with cursor(db) as crs:
crs.execute(query, (search_string,))
result = crs.fetchone()
if not result:
return "unknown"
elif result[0] == "Raised":
return "down"
else:
return "up"
def get_last_known_coriant_interface_status(db, equipment, interface):
query = "SELECT status FROM coriant_alarms" \
" WHERE ne_id_name = %s AND entity_string LIKE %s" \
" ORDER BY last_event_time DESC LIMIT 1"
with cursor(db) as crs:
database_logger.debug("_db_test: %r" % router)
query = "SELECT absid FROM routers WHERE hostname = %s"
crs.execute(query, (router['hostname'],))
for (absid,) in crs:
database_logger.debug("absid: %r" % absid)
yield absid
crs.execute(query, (equipment, interface + "-%"))
result = crs.fetchone()
if not result:
return "unknown"
elif result[0] == "Raised":
return "down"
else:
return "up"
def get_last_known_juniper_link_interface_status(db, equipment, interface):
query = "SELECT IF(link_admin_status = 'up'" \
" AND link_oper_status = 'up', 1, 0) AS up FROM juniper_alarms" \
" WHERE equipment_name = %s AND link_interface_name = %s" \
" ORDER BY alarm_id DESC LIMIT 1"
with cursor(db) as crs:
crs.execute(query, ('lo0.' + equipment, interface))
result = crs.fetchone()
if not result:
return "unknown"
elif result[0] == 0:
return "down"
else:
return "up"
def get_last_known_interface_status(db, equipment, interface):
result = get_last_known_infinera_interface_status(
db, equipment, interface)
if result == "unknown":
result = get_last_known_coriant_interface_status(
db, equipment, interface)
if result == "unknown":
result = get_last_known_juniper_link_interface_status(
db, equipment, interface)
return result
......@@ -7,7 +7,7 @@ from inventory_provider.constants import DATABASE_LOGGER_NAME
@contextlib.contextmanager
def connection(opsdb):
def connection(opsdb): # pragma: no cover
cx = None
try:
cx = mysql.connector.connect(
......@@ -22,7 +22,7 @@ def connection(opsdb):
@contextlib.contextmanager
def cursor(cnx):
def cursor(cnx): # pragma: no cover
csr = None
try:
csr = cnx.cursor()
......
......@@ -63,8 +63,8 @@ if __name__ == "__main__":
with open("config.json") as f:
params = config.load(f)
# update_network_details(params)
update_network_details(params)
network_info = load_network_details(params["redis"])
with open("./router-info.json", "w") as f:
f.write(json.dumps(network_info))
# network_info = load_network_details(params["redis"])
# with open("./router-info.json", "w") as f:
# f.write(json.dumps(network_info))
import json
import logging
import click
from inventory_provider import constants
from inventory_provider import router_details
from inventory_provider import config
def _validate_config(ctx, param, value):
return config.load(value)
@click.command()
@click.option(
"--params",
# required=True,
type=click.File(),
help="Configuration filename",
default=open("config.json"),
callback=_validate_config)
def cli(params):
router_details.update_network_details(params)
result = router_details.load_network_details(params["redis"])
filename = "/tmp/router-info.json"
logging.debug("writing output to: " + filename)
with open(filename, "w") as f:
f.write(json.dumps(result))
if __name__ == "__main__":
logging.basicConfig(level=logging.WARNING)
logging.getLogger(constants.SNMP_LOGGER_NAME).setLevel(logging.DEBUG)
logging.getLogger(constants.THREADING_LOGGER_NAME).setLevel(logging.INFO)
logging.getLogger(constants.JUNIPER_LOGGER_NAME).setLevel(logging.DEBUG)
logging.getLogger(constants.DATABASE_LOGGER_NAME).setLevel(logging.DEBUG)
cli()
......@@ -26,15 +26,17 @@ def require_accepts_json(f):
return decorated_function
@routes.route("/test", methods=['GET', 'POST'])
@routes.route("/interface-status", methods=['GET', 'POST'])
@require_accepts_json
def alarmsdb_test():
def get_interface_status():
config = current_app.config['INVENTORY_PROVIDER_CONFIG']
result = {}
equipment = request.args.get("equipment")
interface = request.args.get("interface")
with alarmsdb.connection(config['alarms-db']) as db:
for r in config['routers']:
result[r['hostname']] = list(alarmsdb._db_test(db, r))
result = {"status": alarmsdb.get_last_known_interface_status(
db, equipment, interface)}
return Response(
json.dumps(result),
......
......@@ -66,14 +66,17 @@ def router_interfaces(hostname):
status=404,
mimetype="text/html")
def _interfaces(s):
for ifc in json.loads(s):
if 'v4InterfaceName' in ifc:
yield ifc['v4InterfaceName']
if 'v6InterfaceName' in ifc:
yield ifc['v6InterfaceName']
interfaces = list(_interfaces(ifc_data_string.decode('utf-8')))
def _interfaces(d):
for ii in d['interface-information']:
for ifc_list in ii['physical-interface'] + ii['logical-interface']:
for ifc in ifc_list:
yield {
'name': ifc['name'][0]['data'],
'description': ifc['description'][0]['data']
}
ifc_data = json.loads(ifc_data_string.decode('utf-8'))
interfaces = list(_interfaces(ifc_data))
if not interfaces:
return Response(
response="no interfaces found for '%s'" % hostname,
......@@ -85,6 +88,36 @@ def router_interfaces(hostname):
mimetype="application/json")
@routes.route("/snmp/<hostname>", methods=['GET', 'POST'])
@require_accepts_json
def snmp_ids(hostname):
redis_config = current_app.config["INVENTORY_PROVIDER_CONFIG"]["redis"]
r = redis.StrictRedis(
host=redis_config["hostname"],
port=redis_config["port"])
ifc_data_string = r.hget(hostname, 'snmp-interfaces')
if not ifc_data_string:
return Response(
response="no available info for '%s'" % hostname,
status=404,
mimetype="text/html")
def _ifc_name(ifc):
if 'v4InterfaceName' in ifc:
return ifc['v4InterfaceName']
if 'v6InterfaceName' in ifc:
return ifc['v6InterfaceName']
assert False, "sanity failure: no interface name found"
ifc_data = json.loads(ifc_data_string.decode('utf-8'))
result = [
{'index': i['index'], 'name': _ifc_name(i)}
for i in ifc_data]
return Response(
json.dumps(result),
mimetype="application/json")
@routes.route("/debug-dump/<hostname>", methods=['GET', 'POST'])
@require_accepts_json
def debug_dump_router_info(hostname):
......@@ -123,10 +156,10 @@ def bgp_configs(hostname):
def _interfaces(s):
for ifc in json.loads(s):
yield {
"description": ifc["description"][0],
"description": ifc["description"][0]["data"],
"as": {
"peer": ifc["peer-as"][0],
"local": ifc["local-as"][0]["as-number"][0]
"peer": ifc["peer-as"][0]["data"],
"local": ifc["local-as"][0]["as-number"][0]["data"]
}
}
interfaces = list(_interfaces(bgp_data_string.decode('utf-8')))
......
......@@ -16,7 +16,7 @@ def _v6address_oid2str(dotted_decimal):
return ":".join(hex_params)
def walk(agent_hostname, community, base_oid):
def walk(agent_hostname, community, base_oid): # pragma: no cover
"""
https://stackoverflow.com/a/45001921
http://snmplabs.com/pysnmp/docs/hlapi/asyncore/sync/manager/cmdgen/nextcmd.html
......@@ -65,7 +65,9 @@ def walk(agent_hostname, community, base_oid):
# .resolveWithMib(mibViewController)
# for x in varBinds]
for oid, val in varBinds:
yield {"oid": "." + str(oid), "value": val.prettyPrint()}
result = {"oid": "." + str(oid), "value": val.prettyPrint()}
snmp_logger.debug(result)
yield result
def get_router_interfaces(hostname, community, config):
......@@ -89,7 +91,8 @@ def get_router_interfaces(hostname, community, config):
yield {
"v4Address": v4Address["value"],
"v4Mask": v4Mask["value"],
"v4InterfaceName": v4IfcNames[v4InterfaceOID["value"]]
"v4InterfaceName": v4IfcNames[v4InterfaceOID["value"]],
"index": v4InterfaceOID["value"]
}
v6IfcNames = {}
......@@ -109,5 +112,6 @@ def get_router_interfaces(hostname, community, config):
yield {
"v6Address": _v6address_oid2str(m.group(2)),
"v6Mask": v6AddressAndMask["value"],
"v6InterfaceName": v6IfcNames[m.group(1)]
"v6InterfaceName": v6IfcNames[m.group(1)],
"index": m.group(1)
}
import json
import logging
from celery import bootsteps, Task
import redis
......@@ -6,11 +7,19 @@ import redis
from inventory_provider.tasks.app import app
from inventory_provider import config
from inventory_provider import juniper, snmp
from inventory_provider import constants
logging.basicConfig(level=logging.WARNING)
logging.getLogger(constants.SNMP_LOGGER_NAME).setLevel(logging.INFO)
logging.getLogger(constants.TASK_LOGGER_NAME).setLevel(logging.INFO)
logging.getLogger(constants.JUNIPER_LOGGER_NAME).setLevel(logging.DEBUG)
logging.getLogger(constants.DATABASE_LOGGER_NAME).setLevel(logging.DEBUG)
class InventoryTask(Task):
config = None
logger = None
def __init__(self):
pass
......@@ -24,6 +33,8 @@ class InventoryTask(Task):
name=hostname,
key=key,
value=json.dumps(data))
InventoryTask.logger.debug(
"saved %s, key %s" % (hostname, key))
return "OK"
......@@ -31,6 +42,7 @@ class WorkerArgs(bootsteps.Step):
def __init__(self, worker, config_filename, **options):
with open(config_filename) as f:
InventoryTask.config = config.load(f)
InventoryTask.logger = logging.getLogger(constants.TASK_LOGGER_NAME)
def worker_args(parser):
......@@ -74,7 +86,7 @@ def juniper_refresh_interfaces(self, hostname):
def snmp_refresh_interfaces(self, hostname, community):
InventoryTask.save_key(
hostname,
"interfaces",
"snmp-interfaces",
list(snmp.get_router_interfaces(
hostname,
community,
......
......@@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup(
name='inventory-provider',
version="0.4",
version="0.5",
author='GEANT',
author_email='swd@geant.org',
description='Dashboard inventory provider',
......
import json
import os
import tempfile
import pytest
import inventory_provider
from inventory_provider import config
OID_LIST_CONF = """
#
# This file is located in dbupdates/conf and is used by scripts under dbupdates/scripts.
# It holds OID values for retrieving details of a router.
#
## IPv4
v4Address=.1.3.6.1.2.1.4.20.1.1
v4InterfaceOID=.1.3.6.1.2.1.4.20.1.2
v4InterfaceName=.1.3.6.1.2.1.31.1.1.1.1
v4Mask=.1.3.6.1.2.1.4.20.1.3
## IPv6
v6AddressAndMask=.1.3.6.1.2.1.55.1.8.1.2
v6InterfaceName=.1.3.6.1.2.1.55.1.5.1.2
""" # noqa E501
ROUTERS_COMMUNITY_CONF = """
######################################################################################################################################
## ##
## This is a configuration file that stores router names and the SNMP community name in <router>=<community>,<IP address> format. ##
## ##
######################################################################################################################################
mx2.ath.gr.geant.net=0pBiFbD,62.40.114.59
mx1.tal.ee.geant.net=0pBiFbD,62.40.96.1
mx2.tal.ee.geant.net=0pBiFbD,62.40.96.2
mx2.rig.lv.geant.net=0pBiFbD,62.40.96.4
mx1.kau.lt.geant.net=0pBiFbD,62.40.96.6
mx2.kau.lt.geant.net=0pBiFbD,62.40.96.5
mx2.zag.hr.geant.net=0pBiFbD,62.40.96.8
mx2.lju.si.geant.net=0pBiFbD,62.40.96.10
mx1.bud.hu.geant.net=0pBiFbD,62.40.97.1
mx1.pra.cz.geant.net=0pBiFbD,62.40.97.2
mx2.bra.sk.geant.net=0pBiFbD,62.40.97.4
mx1.lon.uk.geant.net=0pBiFbD,62.40.97.5
mx1.vie.at.geant.net=0pBiFbD,62.40.97.7
mx2.bru.be.geant.net=0pBiFbD,62.40.96.20
mx1.poz.pl.geant.net=0pBiFbD,62.40.97.10
mx1.ams.nl.geant.net=0pBiFbD,62.40.97.11
mx1.fra.de.geant.net=0pBiFbD,62.40.97.12
mx1.par.fr.geant.net=0pBiFbD,62.40.97.13
mx1.gen.ch.geant.net=0pBiFbD,62.40.97.14
mx1.mil2.it.geant.net=0pBiFbD,62.40.97.15
mx1.lis.pt.geant.net=0pBiFbD,62.40.96.16
mx2.lis.pt.geant.net=0pBiFbD,62.40.96.17
mx1.mad.es.geant.net=0pBiFbD,62.40.97.16
mx1.sof.bg.geant.net=0pBiFbD,62.40.96.21
mx1.buc.ro.geant.net=0pBiFbD,62.40.96.19
mx1.ham.de.geant.net=0pBiFbD,62.40.96.26
mx1.dub.ie.geant.net=0pBiFbD,62.40.96.3
mx1.dub2.ie.geant.net=0pBiFbD,62.40.96.25
mx1.mar.fr.geant.net=0pBiFbD,62.40.96.12
mx1.lon2.uk.geant.net=0pBiFbD,62.40.96.15
# rt1.clpk.us.geant.net=GEANT_RO,10.200.64.128
# rt1.denv.us.geant.net=GEANT_RO,10.200.67.128
mx1.ath2.gr.geant.net=0pBiFbD,62.40.96.39
# qfx.par.fr.geant.net=0pBiFbD,62.40.117.170
# qfx.fra.de.geant.net=0pBiFbD,62.40.117.162
""" # noqa E501
def data_config_filename(tmp_dir_name):
config = {
"alarms-db": {
"hostname": "xxxxxxx.yyyyy.zzz",
"dbname": "xxxxxx",
"username": "xxxxxx",
"password": "xxxxxxxx"
},
"ops-db": {
"hostname": "xxxxxxx.yyyyy.zzz",
"dbname": "xxxxxx",
"username": "xxxxxx",
"password": "xxxxxxxx"
},
"oid_list.conf": os.path.join(
tmp_dir_name,
"oid_list.conf"),
"routers_community.conf": os.path.join(
tmp_dir_name,
"routers_community.conf"),
"ssh": {
"private-key": "private-key-filename",
"known-hosts": "known-hosts=filename"
},
"redis": {
"hostname": "xxxxxx",
"port": 6379
}
}
with open(config["oid_list.conf"], "w") as f:
f.write(OID_LIST_CONF)
with open(config["routers_community.conf"], "w") as f:
f.write(ROUTERS_COMMUNITY_CONF)
filename = os.path.join(tmp_dir_name, "config.json")
with open(filename, "w") as f:
f.write(json.dumps(config))
return filename
@pytest.fixture
def data_config():
with tempfile.TemporaryDirectory() as tmpdir:
with open(data_config_filename(tmpdir)) as f:
return config.load(f)
@pytest.fixture
def cached_test_data():
filename = os.path.join(
os.path.dirname(__file__),
"router-info.json")
with open(filename) as f:
return json.loads(f.read())
@pytest.fixture
def app_config():
with tempfile.TemporaryDirectory() as tmpdir:
app_config_filename = os.path.join(tmpdir, "app.config")
with open(app_config_filename, "w") as f:
f.write("%s = '%s'\n" % (
"INVENTORY_PROVIDER_CONFIG_FILENAME",
data_config_filename(tmpdir)))
yield app_config_filename
@pytest.fixture
def client(app_config):
os.environ["SETTINGS_FILENAME"] = app_config
with inventory_provider.create_app().test_client() as c:
yield c
This diff is collapsed.
This diff is collapsed.
import json
import jsonschema
DEFAULT_REQUEST_HEADERS = {
"Content-type": "application/json",
"Accept": ["application/json"]
}
def test_get_interface_status(mocker, client):
mocked_conn = mocker.patch('inventory_provider.routes.alarmsdb'
'.alarmsdb.connection')
mocked_conn.return_value.__enter__.return_value = None
mocked_inteface_status = mocker.patch(
'inventory_provider.routes.alarmsdb.'
'alarmsdb.get_last_known_interface_status')
mocked_inteface_status.return_value = "up"
rv = client.get(
'/alarmsdb/interface-status?'
'equipment=mx1.lon.uk.geant.net&interface=xe-1/2/2',
headers=DEFAULT_REQUEST_HEADERS)
interfaces_list_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"status": {
"type": "string",
}
}
}
response = json.loads(rv.data.decode("utf-8"))
jsonschema.validate(response, interfaces_list_schema)
assert response == {"status": "up"}
import inventory_provider.alarmsdb as alarmsdb
def test_infinera_interface_status(mocker):
mocked_get_cursor = mocker.patch('inventory_provider.alarmsdb.cursor')
mocked_execute = mocked_get_cursor. \
return_value.__enter__.return_value.execute
mocked_fetchone = mocked_get_cursor.return_value.__enter__. \
return_value.fetchone
mocked_fetchone.return_value = ('Raised',)
assert alarmsdb.get_last_known_infinera_interface_status(
None, 'eq1', 'intfc1'
) == "down"
mocked_fetchone.return_value = ("Clear",)
assert alarmsdb.get_last_known_infinera_interface_status(
None, 'eq1', 'intfc1'
) == "up"
mocked_fetchone.return_value = ()
assert alarmsdb.get_last_known_infinera_interface_status(
None, 'eq1', 'intfc1'
) == "unknown"
mocked_execute.assert_called_with(
"SELECT status FROM infinera_alarms WHERE"
" CONCAT(ne_name, '-', REPLACE(object_name, 'T', '')) = %s"
" ORDER BY ne_init_time DESC, ne_clear_time DESC LIMIT 1",
("eq1-intfc1",))
def test_coriant_interface_status(mocker):
mocked_get_cursor = mocker.patch('inventory_provider.alarmsdb.cursor')
mocked_execute = mocked_get_cursor. \
return_value.__enter__.return_value.execute
mocked_fetchone = mocked_get_cursor.return_value.__enter__. \
return_value.fetchone
mocked_fetchone.return_value = ('Raised',)
assert alarmsdb.get_last_known_coriant_interface_status(
None, 'eq1', 'intfc1'
) == "down"
mocked_fetchone.return_value = ("Clear",)
assert alarmsdb.get_last_known_coriant_interface_status(
None, 'eq1', 'intfc1'
) == "up"
mocked_fetchone.return_value = ()
assert alarmsdb.get_last_known_coriant_interface_status(
None, 'eq1', 'intfc1'
) == "unknown"
mocked_execute.assert_called_with(
"SELECT status FROM coriant_alarms"
" WHERE ne_id_name = %s AND entity_string LIKE %s"
" ORDER BY last_event_time DESC LIMIT 1",
("eq1", "intfc1-%"))
def test_juniper_interface_status(mocker):
mocked_get_cursor = mocker.patch('inventory_provider.alarmsdb.cursor')
mocked_execute = mocked_get_cursor. \
return_value.__enter__.return_value.execute
mocked_fetchone = mocked_get_cursor.return_value.__enter__. \
return_value.fetchone
mocked_fetchone.return_value = (0,)
assert alarmsdb.get_last_known_juniper_link_interface_status(
None, 'eq1', 'intfc1'
) == "down"
mocked_fetchone.return_value = (1,)
assert alarmsdb.get_last_known_juniper_link_interface_status(
None, 'eq1', 'intfc1'
) == "up"
mocked_fetchone.return_value = ()
assert alarmsdb.get_last_known_juniper_link_interface_status(
None, 'eq1', 'intfc1'
) == "unknown"
mocked_execute.assert_called_with(
"SELECT IF(link_admin_status = 'up'"
" AND link_oper_status = 'up', 1, 0)"
" AS up FROM juniper_alarms"
" WHERE equipment_name = %s AND link_interface_name = %s"
" ORDER BY alarm_id DESC LIMIT 1",
("lo0.eq1", "intfc1"))
def test_interface_status(mocker):
mocked_infinera = mocker.patch(
'inventory_provider.alarmsdb.get_last_known_infinera_interface_status')
mocked_infinera.return_value = "unknown"
mocked_coriant = mocker.patch(
'inventory_provider.alarmsdb.get_last_known_coriant_interface_status')
mocked_coriant.return_value = "unknown"
mocked_juniper = mocker.patch(
'inventory_provider.alarmsdb.'
'get_last_known_juniper_link_interface_status')
mocked_juniper.return_value = "unknown"
assert alarmsdb.get_last_known_interface_status(None, '', '') == "unknown"
"""
just checks that the worker methods call the right functions
and some data ends up in the right place ... otherwise not very detailed
"""
import logging
import pytest
from inventory_provider.tasks import worker
class MockedRedis(object):
db = {}
def __init__(self, *args, **kwargs):
pass
def hset(self, name, key, value):
MockedRedis.db.setdefault(name, {})[key] = value
@pytest.fixture
def mocked_worker_module(mocker, data_config):
worker.InventoryTask.config = data_config
worker.InventoryTask.logger = logging.getLogger()
MockedRedis.db = {}
mocker.patch(
'inventory_provider.tasks.worker.redis.StrictRedis',
MockedRedis)
def test_juniper_refresh_bgp(
mocked_worker_module, mocker, cached_test_data):
def _mocked_fetch_bpg_config(hostname, _):
return cached_test_data[hostname]["bgp"]
mocker.patch(
'inventory_provider.tasks.worker.juniper.fetch_bgp_config',
_mocked_fetch_bpg_config)
for hostname in cached_test_data.keys():
assert 'hostname' not in MockedRedis.db
worker.juniper_refresh_bgp(hostname)
assert MockedRedis.db[hostname]['bgp']
def test_juniper_refresh_interfaces(
mocked_worker_module, mocker, cached_test_data):
def _mocked_fetch_interfaces(hostname, _):
return cached_test_data[hostname]["interfaces"]
mocker.patch(
'inventory_provider.tasks.worker.juniper.fetch_interfaces',
_mocked_fetch_interfaces)
for hostname in cached_test_data.keys():
assert 'hostname' not in MockedRedis.db
worker.juniper_refresh_interfaces(hostname)
assert MockedRedis.db[hostname]['interfaces']
def test_juniper_refresh_vrr(
mocked_worker_module, mocker, cached_test_data):
def _mocked_fetch_vrr_config(hostname, _):
return cached_test_data[hostname]["vrr"]
mocker.patch(
'inventory_provider.tasks.worker.juniper.fetch_vrr_config',
_mocked_fetch_vrr_config)
for hostname in cached_test_data.keys():
assert 'hostname' not in MockedRedis.db
worker.juniper_refresh_vrr(hostname)
assert MockedRedis.db[hostname]['vrr']
def test_snmp_refresh_interfaces(
mocked_worker_module, mocker, cached_test_data):
def _mocked_snmp_interfaces(hostname, community, _):
return cached_test_data[hostname]["snmp-interfaces"]
mocker.patch(
'inventory_provider.tasks.worker.snmp.get_router_interfaces',
_mocked_snmp_interfaces)
for hostname in cached_test_data.keys():
assert 'hostname' not in MockedRedis.db
worker.snmp_refresh_interfaces(hostname, 'fake-community')
assert MockedRedis.db[hostname]['snmp-interfaces']
import json
# import logging
import os
import tempfile
import pytest
import jsonschema
import inventory_provider
# logging.basicConfig(level=logging.DEBUG)
DEFAULT_REQUEST_HEADERS = {
"Content-type": "application/json",
"Accept": ["application/json"]
......@@ -20,130 +15,6 @@ MODULE_DIR = os.path.realpath(os.path.join(
"..",
"inventory_provider"))
OID_LIST_CONF = """
#
# This file is located in dbupdates/conf and is used by scripts under dbupdates/scripts.
# It holds OID values for retrieving details of a router.
#
## IPv4
v4Address=.1.3.6.1.2.1.4.20.1.1
v4InterfaceOID=.1.3.6.1.2.1.4.20.1.2
v4InterfaceName=.1.3.6.1.2.1.31.1.1.1.1
v4Mask=.1.3.6.1.2.1.4.20.1.3
## IPv6
v6AddressAndMask=.1.3.6.1.2.1.55.1.8.1.2
v6InterfaceName=.1.3.6.1.2.1.55.1.5.1.2
""" # noqa E501
ROUTERS_COMMUNITY_CONF = """
######################################################################################################################################
## ##
## This is a configuration file that stores router names and the SNMP community name in <router>=<community>,<IP address> format. ##
## ##
######################################################################################################################################
mx2.ath.gr.geant.net=0pBiFbD,62.40.114.59
mx1.tal.ee.geant.net=0pBiFbD,62.40.96.1
mx2.tal.ee.geant.net=0pBiFbD,62.40.96.2
mx2.rig.lv.geant.net=0pBiFbD,62.40.96.4
mx1.kau.lt.geant.net=0pBiFbD,62.40.96.6
mx2.kau.lt.geant.net=0pBiFbD,62.40.96.5
mx2.zag.hr.geant.net=0pBiFbD,62.40.96.8
mx2.lju.si.geant.net=0pBiFbD,62.40.96.10
mx1.bud.hu.geant.net=0pBiFbD,62.40.97.1
mx1.pra.cz.geant.net=0pBiFbD,62.40.97.2
mx2.bra.sk.geant.net=0pBiFbD,62.40.97.4
mx1.lon.uk.geant.net=0pBiFbD,62.40.97.5
mx1.vie.at.geant.net=0pBiFbD,62.40.97.7
mx2.bru.be.geant.net=0pBiFbD,62.40.96.20
mx1.poz.pl.geant.net=0pBiFbD,62.40.97.10
mx1.ams.nl.geant.net=0pBiFbD,62.40.97.11
mx1.fra.de.geant.net=0pBiFbD,62.40.97.12
mx1.par.fr.geant.net=0pBiFbD,62.40.97.13
mx1.gen.ch.geant.net=0pBiFbD,62.40.97.14
mx1.mil2.it.geant.net=0pBiFbD,62.40.97.15
mx1.lis.pt.geant.net=0pBiFbD,62.40.96.16
mx2.lis.pt.geant.net=0pBiFbD,62.40.96.17
mx1.mad.es.geant.net=0pBiFbD,62.40.97.16
mx1.sof.bg.geant.net=0pBiFbD,62.40.96.21
mx1.buc.ro.geant.net=0pBiFbD,62.40.96.19
mx1.ham.de.geant.net=0pBiFbD,62.40.96.26
mx1.dub.ie.geant.net=0pBiFbD,62.40.96.3
mx1.dub2.ie.geant.net=0pBiFbD,62.40.96.25
mx1.mar.fr.geant.net=0pBiFbD,62.40.96.12
mx1.lon2.uk.geant.net=0pBiFbD,62.40.96.15
# rt1.clpk.us.geant.net=GEANT_RO,10.200.64.128
# rt1.denv.us.geant.net=GEANT_RO,10.200.67.128
mx1.ath2.gr.geant.net=0pBiFbD,62.40.96.39
# qfx.par.fr.geant.net=0pBiFbD,62.40.117.170
# qfx.fra.de.geant.net=0pBiFbD,62.40.117.162
""" # noqa E501
def data_config_filename(tmp_dir_name):
config = {
"alarms-db": {
"hostname": "xxxxxxx.yyyyy.zzz",
"dbname": "xxxxxx",
"username": "xxxxxx",
"password": "xxxxxxxx"
},
"ops-db": {
"hostname": "xxxxxxx.yyyyy.zzz",
"dbname": "xxxxxx",
"username": "xxxxxx",
"password": "xxxxxxxx"
},
"oid_list.conf": os.path.join(
tmp_dir_name,
"oid_list.conf"),
"routers_community.conf": os.path.join(
tmp_dir_name,
"routers_community.conf"),
"ssh": {
"private-key": "private-key-filename",
"known-hosts": "known-hosts=filename"
},
"redis": {
"hostname": "xxxxxx",
"port": 6379
}
}
with open(config["oid_list.conf"], "w") as f:
f.write(OID_LIST_CONF)
with open(config["routers_community.conf"], "w") as f:
f.write(ROUTERS_COMMUNITY_CONF)
filename = os.path.join(tmp_dir_name, "config.json")
with open(filename, "w") as f:
f.write(json.dumps(config))
return filename
@pytest.fixture
def app_config():
with tempfile.TemporaryDirectory() as tmpdir:
app_config_filename = os.path.join(tmpdir, "app.config")
with open(app_config_filename, "w") as f:
f.write("%s = '%s'\n" % (
"INVENTORY_PROVIDER_CONFIG_FILENAME",
data_config_filename(tmpdir)))
yield app_config_filename
@pytest.fixture
def client(app_config):
os.environ["SETTINGS_FILENAME"] = app_config
with inventory_provider.create_app().test_client() as c:
yield c
def test_version_request(client):
version_schema = {
......@@ -172,41 +43,54 @@ def test_version_request(client):
version_schema)
TEST_DATA_FILENAME = os.path.join(
os.path.dirname(__file__),
"router-info.json")
class MockedRedis(object):
db = None
def __init__(self, *args, **kwargs):
if MockedRedis.db is None:
with open(TEST_DATA_FILENAME) as f:
test_data_filename = os.path.join(
os.path.dirname(__file__),
"router-info.json")
with open(test_data_filename) as f:
MockedRedis.db = json.loads(f.read())
def set(self, key, value):
MockedRedis.db[key] = value
def hget(self, key, field):
value = MockedRedis.db[key]
return json.dumps(value[field]).encode('utf-8')
def hgetall(self, key):
result = {}
for k, v in MockedRedis.db[key].items():
result[k.encode('utf-8')] \
= json.dumps(v).encode('utf-8')
return result
def keys(self, *args, **kwargs):
return list([k.encode("utf-8") for k in MockedRedis.db.keys()])
def test_routers_list(mocker, client):
@pytest.fixture
def client_with_mocked_data(mocker, client):
mocker.patch(
'inventory_provider.router_details.redis.StrictRedis',
MockedRedis)
mocker.patch(
'inventory_provider.routes.data.redis.StrictRedis',
MockedRedis)
return client
def _routers(client):
routers_list_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "array",
"items": {"type": "string"}
}
mocker.patch(
'inventory_provider.router_details.redis.StrictRedis',
MockedRedis)
mocker.patch(
'inventory_provider.routes.data.redis.StrictRedis',
MockedRedis)
rv = client.post(
"data/routers",
headers=DEFAULT_REQUEST_HEADERS)
......@@ -214,4 +98,141 @@ def test_routers_list(mocker, client):
response = json.loads(rv.data.decode("utf-8"))
jsonschema.validate(response, routers_list_schema)
assert response # shouldn't be empty
return response
def test_routers_list(client_with_mocked_data):
assert _routers(client_with_mocked_data)
def test_router_interfaces(client_with_mocked_data):
interfaces_list_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"description": {"type": "string"}
},
"required": ["name", "description"],
"additionalProperties": False
}
}
for router in _routers(client_with_mocked_data):
rv = client_with_mocked_data.post(
"/data/interfaces/" + router,
headers=DEFAULT_REQUEST_HEADERS)
response = json.loads(rv.data.decode("utf-8"))
jsonschema.validate(response, interfaces_list_schema)
assert response # at least shouldn't be empty
def test_snmp_ids(client_with_mocked_data):
snmp_id_list_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "array",
"items": {
"type": "object",
"properties": {
"index": {"type": "string"},
"name": {"type": "string"}
},
"required": ["index", "name"],
"additionalProperties": False
}
}
for hostname in _routers(client_with_mocked_data):
rv = client_with_mocked_data.post(
"/data/snmp/" + hostname,
headers=DEFAULT_REQUEST_HEADERS)
response = json.loads(rv.data.decode("utf-8"))
jsonschema.validate(response, snmp_id_list_schema)
assert response # at least shouldn't be empty
def test_router_bgp_route(client_with_mocked_data):
bgp_list_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "array",
"items": {
"type": "object",
"properties": {
"description": {"type": "string"},
"as": {
"type": "object",
"properties": {
"peer": {
"type": "string",
"pattern": r'^\d+$'
},
"local": {
"type": "string",
"pattern": r'^\d+$'
},
},
"required": ["peer", "local"],
"additionalProperties": False
},
},
"required": ["description", "as"],
"additionalProperties": False
}
}
routers_with_bpg_configs = [
"mx1.mil2.it.geant.net",
"mx1.vie.at.geant.net",
"mx1.fra.de.geant.net",
"mx1.ams.nl.geant.net",
"mx1.pra.cz.geant.net",
"mx1.dub.ie.geant.net",
"mx1.mad.es.geant.net",
"mx1.gen.ch.geant.net",
"mx1.mar.fr.geant.net",
"mx1.lon.uk.geant.net"
]
for router in _routers(client_with_mocked_data):
if router not in routers_with_bpg_configs:
continue
rv = client_with_mocked_data.post(
"/data/bgp/" + router,
headers=DEFAULT_REQUEST_HEADERS)
response = json.loads(rv.data.decode("utf-8"))
jsonschema.validate(response, bgp_list_schema)
assert response # at least shouldn't be empty
def test_router_debug_data_route(client_with_mocked_data):
debug_data_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"items": {
"type": "object",
"properties": {
"vrr": {"type": "object"},
"bgp": {"type": "array"},
"interfaces": {"type": "array"}
},
"required": ["vrr", "bgp", "interfaces"],
"additionalProperties": False
}
}
for router in _routers(client_with_mocked_data):
rv = client_with_mocked_data.post(
"/data/debug-dump/" + router,
headers=DEFAULT_REQUEST_HEADERS)
response = json.loads(rv.data.decode("utf-8"))
jsonschema.validate(response, debug_data_schema)
import json
import os
import re
import jsonschema
import pytest
from inventory_provider import juniper
......@@ -77,14 +75,6 @@ CACHE_SCHEMA = {
}
@pytest.fixture
def cached_test_data():
with open("router-info.json") as f:
cache = json.loads(f.read())
jsonschema.validate(cache, CACHE_SCHEMA)
return cache
def _parsed_old_style_output_data(s):
for l in s.splitlines():
if not l:
......
import json
import os
import jsonschema
import pytest
from inventory_provider import snmp
OID_TEST_CONFIG = """#
# This file is located in dbupdates/conf and is used by scripts under dbupdates/scripts.
# It holds OID values for retrieving details of a router.
#
## IPv4
v4Address=.1.3.6.1.2.1.4.20.1.1
v4InterfaceOID=.1.3.6.1.2.1.4.20.1.2
v4InterfaceName=.1.3.6.1.2.1.31.1.1.1.1
v4Mask=.1.3.6.1.2.1.4.20.1.3
## IPv6
v6AddressAndMask=.1.3.6.1.2.1.55.1.8.1.2
v6InterfaceName=.1.3.6.1.2.1.55.1.5.1.2
""" # noqa E501
@pytest.fixture
def snmp_walk_responses():
test_data_dir = os.path.join(os.path.dirname(__file__), "data")
test_data_filename = os.path.join(test_data_dir, "snmp-walk.json")
with open(test_data_filename) as f:
return json.loads(f.read())
def test_snmp_interfaces(mocker, data_config, snmp_walk_responses):
expected_result_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "array",
"items": {
"type": "object",
"properties": {
"v4Address": {"type": "string"},
"v4Mask": {"type": "string"},
"v4InterfaceName": {"type": "string"},
"v6Address": {"type": "string"},
"v6Mask": {"type": "string"},
"v6InterfaceName": {"type": "string"},
"index": {"type": "string"}
},
"required": ["index"],
"additionalProperties": False
}
}
def _mocked_walk(agent_hostname, community, base_oid):
return [e for e in snmp_walk_responses
if e['oid'].startswith(base_oid)]
mocker.patch(
'inventory_provider.snmp.walk',
_mocked_walk)
interfaces = snmp.get_router_interfaces(
'ignored', 'ignored', {'oids': data_config['oids']})
interfaces = list(interfaces)
jsonschema.validate(interfaces, expected_result_schema)
assert interfaces, "interface list isn't empty"
for ifc in interfaces:
if 'v4Address' in ifc \
and 'v4Mask' in ifc \
and 'v4InterfaceName' in ifc:
continue
if 'v6Address' in ifc \
and 'v6Mask' in ifc \
and 'v6InterfaceName' in ifc:
continue
assert False, "address details not found in interface dict"
......@@ -13,6 +13,6 @@ commands =
coverage run --source inventory_provider -m py.test
coverage xml
coverage html
coverage report
coverage report --fail-under 75
flake8
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment