Skip to content
Snippets Groups Projects
Select Git revision
  • e4b3ad0bdd58a119537d4aae4fbff65079f23962
  • python3 default protected
  • feature/exabgp_support2
  • feature/exabgp_support2.bgpextcommunity
  • feature/exabgp_support2.django4.2
  • fix/existingcheck_honor_fragtype
  • feature/python3-authz_netmask
  • feature/authz_netmask
  • fix/wrong_ratelimit_stats
  • feature/requirements_version_update2024-01
  • feature/split_celery
  • feature/improved-warning-mails
  • fix/reenable_expireset_via_restapi
  • feature/admin_user_delete_with_owned_rule_reassigning1
  • feature/admin_user_delete_with_owned_rule_reassigning
  • feature/branded_doc
  • fix/forked_snmp_polling_worker_exit_issue
  • fix/false_user_activation_error
  • feature/exabgp_with_docker-compose
  • fix/prefix_overlap_handling
  • fix/js_security_issues-a
  • save1
  • rpm-1.5-7
  • working1
  • myv1.6
  • t12b1
  • v1.5_newnew2
  • merged_final
  • v1.5_newnew
  • startstop_old
  • myadd2
  • tomas3
  • merge_jra2t6_and_RESTAPI
  • mytomas2
  • mynew1
  • new_jra2t6
  • v1.5_final
  • fod16_ruleroutes-merged_old
  • merged_new
  • v1.6_new_old
  • v1.5_new_old_follower
41 results

tasks.py

Blame
  • test_opsdb_queries.py 4.73 KiB
    import os
    import pytest
    
    import jsonschema
    
    from inventory_provider.db import db
    from inventory_provider.db import opsdb
    
    
    pytestmark = pytest.mark.skipif(
        'TEST_OPSDB_HOSTNAME' not in os.environ,
        reason='TEST_OPSDB_HOSTNAME environment variable not found')
    pytestmark = pytest.mark.skipif(
        'TEST_OPSDB_DBNAME' not in os.environ,
        reason='TEST_OPSDB_DBNAME environment variable not found')
    pytestmark = pytest.mark.skipif(
        'TEST_OPSDB_USERNAME' not in os.environ,
        reason='TEST_OPSDB_USERNAME environment variable not found')
    pytestmark = pytest.mark.skipif(
        'TEST_OPSDB_PASSWORD' not in os.environ,
        reason='TEST_OPSDB_PASSWORD environment variable not found')
    
    
    @pytest.fixture
    def db_params():
        return {
            'hostname': os.environ['TEST_OPSDB_HOSTNAME'],
            'dbname': os.environ['TEST_OPSDB_DBNAME'],
            'username': os.environ['TEST_OPSDB_USERNAME'],
            'password': os.environ['TEST_OPSDB_PASSWORD'],
        }
    
    
    @pytest.fixture
    def connection(db_params):
        with db.connection(db_params) as c:
            yield c
    
    
    EQUIPMENT_LOCATION_METADATA = {
        "$schema": "http://json-schema.org/draft-07/schema#",
    
        "definitions": {
            "pop-info": {
                "type": "object",
                "properties": {
                    "name": {"type": "string"},
                    "abbreviation": {"type": "string"},
                    "country": {"type": "string"},
                    "city": {"type": "string"},
                    "longitude": {"type": "number"},
                    "latitude": {"type": "number"}
                },
                "required": [
                    "name",
                    "abbreviation",
                    "country",
                    "city",
                    "longitude",
                    "latitude"
                ],
                "additionalProperties": False
            },
            "equipment-info": {
                "type": "object",
                "properties": {
                    'equipment-name': {"type": "string"},
                    'status': {"type": "string"},
                    'pop': {"$ref": "#/definitions/pop-info"}
                },
                "required": [
                    "equipment-name",
                    "status",
                    "pop"
                ],
                "additionalProperties": False
    
            }
        },
    
        "type": "array",
        "items": {"$ref": "#/definitions/equipment-info"}
    }
    
    CORIANT_PATH_METADATA = {
        "$schema": "http://json-schema.org/draft-07/schema#",
    
        "definitions": {
            "pop-info": {
                "type": "object",
                "properties": {
                    "name": {"type": "string"},
                    "abbreviation": {"type": "string"},
                    "country": {"type": "string"},
                    "city": {"type": "string"},
                    "longitude": {"type": "number"},
                    "latitude": {"type": "number"}
                },
                "required": [
                    "name",
                    "abbreviation",
                    "country",
                    "city",
                    "longitude",
                    "latitude"
                ],
                "additionalProperties": False
            },
            "endpoint": {
                "type": "object",
                "properties": {
                    "equipment name": {"type": "string"},
                    "card id": {"type": "string"},
                    "port number": {"type": "string"},
                    "pop": {"$ref": "#/definitions/pop-info"}
                },
                "required": ["equipment name", "card id", "port number", "pop"],
                "additionalProperties": False
            }
        },
    
        "type": "object",
        "properties": {
            'category': {"type": "string"},
            'circuit_type': {"type": "string"},
            'service_type': {"type": "string"},
            'peering_type': {"type": "string"},
            'status': {"type": "string"},
            'name': {"type": "string"},
            'a': {"$ref": "#/definitions/endpoint"},
            'b': {"$ref": "#/definitions/endpoint"}
        },
        "required": [
            "category",
            "circuit_type",
            "service_type",
            "peering_type",
            "status",
            "a",
            "b"],
        "additionalProperties": False
    }
    
    
    @pytest.mark.parametrize('equipment', [
        'mx1.cbg.uk.geant.net',
        'grv3.lon.uk.geant.net',
        'mx1.gen.ch.geant.net'
    ])
    def test_equipment_location(connection, equipment):
        circuit = opsdb.lookup_pop_info(connection, equipment)
        jsonschema.validate(circuit, EQUIPMENT_LOCATION_METADATA)
        assert len(circuit) == 1
    
    
    @pytest.mark.parametrize('equipment,card,port', [
        ('grv3.lon.uk.geant.net', '1-1', '3'),
        ('grv3.lon.uk.geant.net', '1-1', '5'),
        ('grv1.ams.nl.geant.net', '1-1', '1'),
        ('grv3.lon.uk.geant.net', '1-1', '1'),
    ])
    def test_coriant_path(connection, equipment, card, port):
        circuit = opsdb.lookup_coriant_path(connection, equipment, card, port)
        jsonschema.validate(circuit, CORIANT_PATH_METADATA)