Skip to content
Snippets Groups Projects
Commit 7a30623c authored by Erik Reid's avatar Erik Reid
Browse files

omit interfaces with no snmp index

parent 810afa3e
No related branches found
No related tags found
No related merge requests found
......@@ -54,9 +54,13 @@ def poller_interface_oids(hostname):
if not ifc['description']:
continue
snmp_index = snmp_indexes.get(ifc['name'], None)
if not snmp_index:
continue
ifc_data = {
'name': ifc['name'],
'snmp-index': snmp_indexes.get(ifc['name'], None),
'snmp-index': snmp_index,
'description': ifc['description'],
'circuits': []
}
......
import json
import os
import pytest
import jsonschema
import inventory_provider
TEST_DATA_DIRNAME = os.path.realpath(os.path.join(
inventory_provider.__path__[0],
"..",
"test",
"data"))
DEFAULT_REQUEST_HEADERS = {
"Content-type": "application/json",
"Accept": ["application/json"]
}
class MockedRedis(object):
db = None
def __init__(self, *args, **kwargs):
if MockedRedis.db is None:
test_data_filename = os.path.join(
TEST_DATA_DIRNAME,
"router-info.json")
with open(test_data_filename) as f:
MockedRedis.db = json.loads(f.read())
def set(self, key, value):
MockedRedis.db[key] = value
def hget(self, key, field):
value = MockedRedis.db[key]
return value[field].encode('utf-8')
# return json.dumps(value[field]).encode('utf-8')
def hgetall(self, key):
result = {}
for k, v in MockedRedis.db[key].items():
result[k.encode('utf-8')] \
= json.dumps(v).encode('utf-8')
return result
def keys(self, *args, **kwargs):
return list([k.encode("utf-8") for k in MockedRedis.db.keys()])
@pytest.fixture
def client_with_mocked_data(mocker, client):
mocker.patch(
'inventory_provider.routes.data.redis.StrictRedis',
MockedRedis)
return client
def test_router_interfaces(router, client_with_mocked_data):
interfaces_list_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"description": {"type": "string"},
"ipv4": {
"type": "array",
"items": {"type": "string"}
},
"ipv6": {
"type": "array",
"items": {"type": "string"}
}
},
"required": ["name", "description", "ipv4", "ipv6"],
"additionalProperties": False
}
}
rv = client_with_mocked_data.post(
"/data/interfaces/" + router,
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
response = json.loads(rv.data.decode("utf-8"))
jsonschema.validate(response, interfaces_list_schema)
assert response # at least shouldn't be empty
def test_snmp_ids(router, client_with_mocked_data):
snmp_id_list_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "array",
"items": {
"type": "object",
"properties": {
"index": {"type": "string"},
"name": {"type": "string"}
},
"required": ["index", "name"],
"additionalProperties": False
}
}
rv = client_with_mocked_data.post(
"/data/snmp/" + router,
headers=DEFAULT_REQUEST_HEADERS)
response = json.loads(rv.data.decode("utf-8"))
jsonschema.validate(response, snmp_id_list_schema)
assert response # at least shouldn't be empty
def test_router_bgp_routes(router, client_with_mocked_data):
ROUTERS_WITH_BGP_CONFIG = [
"mx1.bud.hu.geant.net",
"mx1.pra.cz.geant.net",
"mx1.lon.uk.geant.net",
"mx1.vie.at.geant.net",
"mx1.ams.nl.geant.net",
"mx1.fra.de.geant.net",
"mx1.gen.ch.geant.net",
"mx1.mil2.it.geant.net",
"mx1.mad.es.geant.net",
"mx1.dub.ie.geant.net",
"mx1.mar.fr.geant.net"
]
if router not in ROUTERS_WITH_BGP_CONFIG:
pytest.skip('%s is not expected to have bgp peers' % router)
return
bgp_list_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"description": {"type": "string"},
"as": {
"type": "object",
"properties": {
"peer": {"type": "integer"},
"local": {"type": "integer"}
},
"required": ["peer", "local"],
"additionalProperties": False
},
},
"required": ["description", "as", "name"],
"additionalProperties": False
}
}
rv = client_with_mocked_data.post(
"/data/bgp/" + router,
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
response = json.loads(rv.data.decode("utf-8"))
jsonschema.validate(response, bgp_list_schema)
assert response # at least shouldn't be empty
def test_router_debug_data_route(router, client_with_mocked_data):
"""
not really a test ... just providing coverage of temporary code used
for debugging (should be removed eventually)
:param router:
:param client_with_mocked_data:
:return:
"""
debug_data_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"items": {"type": "object"}
}
rv = client_with_mocked_data.post(
"/data/debug-dump/" + router,
headers=DEFAULT_REQUEST_HEADERS)
response = json.loads(rv.data.decode("utf-8"))
jsonschema.validate(response, debug_data_schema)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment