import os import pytest import jsonschema from inventory_provider.db import db from inventory_provider.db import opsdb pytestmark = pytest.mark.skipif( 'TEST_OPSDB_HOSTNAME' not in os.environ, reason='TEST_OPSDB_HOSTNAME environment variable not found') pytestmark = pytest.mark.skipif( 'TEST_OPSDB_DBNAME' not in os.environ, reason='TEST_OPSDB_DBNAME environment variable not found') pytestmark = pytest.mark.skipif( 'TEST_OPSDB_USERNAME' not in os.environ, reason='TEST_OPSDB_USERNAME environment variable not found') pytestmark = pytest.mark.skipif( 'TEST_OPSDB_PASSWORD' not in os.environ, reason='TEST_OPSDB_PASSWORD environment variable not found') @pytest.fixture def db_params(): return { 'hostname': os.environ['TEST_OPSDB_HOSTNAME'], 'dbname': os.environ['TEST_OPSDB_DBNAME'], 'username': os.environ['TEST_OPSDB_USERNAME'], 'password': os.environ['TEST_OPSDB_PASSWORD'], } @pytest.fixture def connection(db_params): with db.connection(db_params) as c: yield c EQUIPMENT_LOCATION_METADATA = { "$schema": "http://json-schema.org/draft-07/schema#", "definitions": { "pop-info": { "type": "object", "properties": { "name": {"type": "string"}, "abbreviation": {"type": "string"}, "country": {"type": "string"}, "city": {"type": "string"}, "longitude": {"type": "number"}, "latitude": {"type": "number"} }, "required": [ "name", "abbreviation", "country", "city", "longitude", "latitude" ], "additionalProperties": False }, "equipment-info": { "type": "object", "properties": { 'equipment-name': {"type": "string"}, 'status': {"type": "string"}, 'pop': {"$ref": "#/definitions/pop-info"} }, "required": [ "equipment-name", "status", "pop" ], "additionalProperties": False } }, "type": "array", "items": {"$ref": "#/definitions/equipment-info"} } CORIANT_PATH_METADATA = { "$schema": "http://json-schema.org/draft-07/schema#", "definitions": { "pop-info": { "type": "object", "properties": { "absid": {"type": "integer"}, "name": {"type": "string"}, "abbreviation": {"type": "string"}, "country": {"type": "string"}, "city": {"type": "string"}, "longitude": {"type": "number"}, "latitude": {"type": "number"} }, "required": [ "absid", "name", "abbreviation", "country", "city", "longitude", "latitude" ], "additionalProperties": False }, "endpoint": { "type": "object", "properties": { "name": {"type": "string"}, "card id": {"type": "string"}, "port number": {"type": "string"}, "pop": {"$ref": "#/definitions/pop-info"} }, "required": ["name", "port number", "pop"], "additionalProperties": False } }, "type": "object", "properties": { 'absid': {"type": "integer"}, 'category': {"type": "string"}, 'circuit_type': {"type": "string"}, 'service_type': {"type": "string"}, 'peering_type': {"type": "string"}, 'status': {"type": "string"}, 'name': {"type": "string"}, 'a': {"$ref": "#/definitions/endpoint"}, 'b': {"$ref": "#/definitions/endpoint"} }, "required": [ "absid", "category", "circuit_type", "service_type", "peering_type", "status", "a", "b"], "additionalProperties": False } @pytest.mark.parametrize('equipment', [ 'mx1.cbg.uk.geant.net', 'grv3.lon.uk.geant.net', 'mx1.gen.ch.geant.net' ]) def test_equipment_location(connection, equipment): circuit = opsdb.lookup_pop_info(connection, equipment) jsonschema.validate(circuit, EQUIPMENT_LOCATION_METADATA) assert len(circuit) == 1 @pytest.mark.parametrize('equipment,card,port', [ ('grv3.lon.uk.geant.net', '1-1', '3'), ('grv3.lon.uk.geant.net', '1-1', '5'), ('grv1.ams.nl.geant.net', '1-1', '1'), ('grv3.lon.uk.geant.net', '1-1', '1'), ]) def test_coriant_path(connection, equipment, card, port): circuit = opsdb.lookup_coriant_path(connection, equipment, card, port) jsonschema.validate(circuit, CORIANT_PATH_METADATA)