test_opsdb_queries.py 3.03 KiB
import os
import pytest
import jsonschema
from inventory_provider.db import db
from inventory_provider.db import opsdb
pytestmark = pytest.mark.skipif(
'TEST_OPSDB_HOSTNAME' not in os.environ,
reason='TEST_OPSDB_HOSTNAME environment variable not found')
pytestmark = pytest.mark.skipif(
'TEST_OPSDB_DBNAME' not in os.environ,
reason='TEST_OPSDB_DBNAME environment variable not found')
pytestmark = pytest.mark.skipif(
'TEST_OPSDB_USERNAME' not in os.environ,
reason='TEST_OPSDB_USERNAME environment variable not found')
pytestmark = pytest.mark.skipif(
'TEST_OPSDB_PASSWORD' not in os.environ,
reason='TEST_OPSDB_PASSWORD environment variable not found')
@pytest.fixture
def db_params():
return {
'hostname': os.environ['TEST_OPSDB_HOSTNAME'],
'dbname': os.environ['TEST_OPSDB_DBNAME'],
'username': os.environ['TEST_OPSDB_USERNAME'],
'password': os.environ['TEST_OPSDB_PASSWORD'],
}
@pytest.fixture
def connection(db_params):
with db.connection(db_params) as c:
yield c
CORIANT_PATH_METADATA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"definitions": {
"pop-info": {
"type": "object",
"properties": {
"absid": {"type": "integer"},
"name": {"type": "string"},
"abbreviation": {"type": "string"},
"country": {"type": "string"},
"city": {"type": "string"}
},
"required": ["absid", "name", "abbreviation", "country", "city"],
"additionalProperties": False
},
"endpoint": {
"type": "object",
"properties": {
"name": {"type": "string"},
"card id": {"type": "string"},
"port number": {"type": "string"},
"pop": {"$ref": "#/definitions/pop-info"}
},
"required": ["name", "port number", "pop"],
"additionalProperties": False
}
},
"type": "object",
"properties": {
'absid': {"type": "integer"},
'category': {"type": "string"},
'circuit_type': {"type": "string"},
'service_type': {"type": "string"},
'peering_type': {"type": "string"},
'status': {"type": "string"},
'name': {"type": "string"},
'a': {"$ref": "#/definitions/endpoint"},
'b': {"$ref": "#/definitions/endpoint"}
},
"required": [
"absid",
"category",
"circuit_type",
"service_type",
"peering_type",
"status",
"a",
"b"],
"additionalProperties": False
}
@pytest.mark.parametrize('equipment,card,port', [
('grv3.lon.uk.geant.net', '1-1', '3'),
('grv3.lon.uk.geant.net', '1-1', '5'),
('grv1.ams.nl.geant.net', '1-1', '1'),
('grv3.lon.uk.geant.net', '1-1', '1'),
])
def test_query(connection, equipment, card, port):
circuit = opsdb.get_coriant_path(connection, equipment, card, port)
jsonschema.validate(circuit, CORIANT_PATH_METADATA)