Skip to content
Snippets Groups Projects
test_opsdb_queries.py 3.03 KiB
import os
import pytest

import jsonschema

from inventory_provider.db import db
from inventory_provider.db import opsdb


pytestmark = pytest.mark.skipif(
    'TEST_OPSDB_HOSTNAME' not in os.environ,
    reason='TEST_OPSDB_HOSTNAME environment variable not found')
pytestmark = pytest.mark.skipif(
    'TEST_OPSDB_DBNAME' not in os.environ,
    reason='TEST_OPSDB_DBNAME environment variable not found')
pytestmark = pytest.mark.skipif(
    'TEST_OPSDB_USERNAME' not in os.environ,
    reason='TEST_OPSDB_USERNAME environment variable not found')
pytestmark = pytest.mark.skipif(
    'TEST_OPSDB_PASSWORD' not in os.environ,
    reason='TEST_OPSDB_PASSWORD environment variable not found')


@pytest.fixture
def db_params():
    return {
        'hostname': os.environ['TEST_OPSDB_HOSTNAME'],
        'dbname': os.environ['TEST_OPSDB_DBNAME'],
        'username': os.environ['TEST_OPSDB_USERNAME'],
        'password': os.environ['TEST_OPSDB_PASSWORD'],
    }


@pytest.fixture
def connection(db_params):
    with db.connection(db_params) as c:
        yield c


CORIANT_PATH_METADATA = {
    "$schema": "http://json-schema.org/draft-07/schema#",
    "type": "object",

    "definitions": {
        "pop-info": {
            "type": "object",
            "properties": {
                "absid": {"type": "integer"},
                "name": {"type": "string"},
                "abbreviation": {"type": "string"},
                "country": {"type": "string"},
                "city": {"type": "string"}
            },
            "required": ["absid", "name", "abbreviation", "country", "city"],
            "additionalProperties": False
        },
        "endpoint": {
            "type": "object",
            "properties": {
                "name": {"type": "string"},
                "card id": {"type": "string"},
                "port number": {"type": "string"},
                "pop": {"$ref": "#/definitions/pop-info"}
            },
            "required": ["name", "port number", "pop"],
            "additionalProperties": False
        }
    },

    "type": "object",
    "properties": {
        'absid': {"type": "integer"},
        'category': {"type": "string"},
        'circuit_type': {"type": "string"},
        'service_type': {"type": "string"},
        'peering_type': {"type": "string"},
        'status': {"type": "string"},
        'name': {"type": "string"},
        'a': {"$ref": "#/definitions/endpoint"},
        'b': {"$ref": "#/definitions/endpoint"}
    },
    "required": [
        "absid",
        "category",
        "circuit_type",
        "service_type",
        "peering_type",
        "status",
        "a",
        "b"],
    "additionalProperties": False
}


@pytest.mark.parametrize('equipment,card,port', [
    ('grv3.lon.uk.geant.net', '1-1', '3'),
    ('grv3.lon.uk.geant.net', '1-1', '5'),
    ('grv1.ams.nl.geant.net', '1-1', '1'),
    ('grv3.lon.uk.geant.net', '1-1', '1'),
])
def test_query(connection, equipment, card, port):
    circuit = opsdb.get_coriant_path(connection, equipment, card, port)
    jsonschema.validate(circuit, CORIANT_PATH_METADATA)