Skip to content
Snippets Groups Projects
Select Git revision
  • d2ea56799c0a79620aa917f520f8af72cb364885
  • develop default
  • master protected
  • feature/DBOARD3-1156-move-whois/nren-naming-to-inventory-update
  • inventoryProvider-functional
  • inventoryProvider-morework2
  • circuit-service-details-fix
  • lookup-SPECTRUM-SCHF-ports
  • inventoryProvider-1267-cleanup
  • inventoryProvider-moreWork
  • feature/DBOARD3-958
  • release/0.110
  • fix-uuid-validation-error
  • docker-poc
  • 0.154
  • 0.153
  • 0.152
  • 0.151
  • 0.150
  • 0.149
  • 0.148
  • 0.147
  • 0.146
  • 0.145
  • 0.144
  • 0.143
  • 0.142
  • 0.141
  • 0.140
  • 0.139
  • 0.138
  • 0.137
  • 0.136
  • 0.135
34 results

test_opsdb_queries.py

Blame
  • test_worker_utils.py 1.46 KiB
    """
    tests of a few worker utilities
    """
    import json
    import re
    
    import jsonschema
    
    from inventory_provider.tasks import worker
    from inventory_provider.tasks import common
    
    
    def backend_db():
        return common._get_redis({
            'redis': {
                'hostname': None,
                'port': None
            },
            'redis-databases': [0, 7]
        }).db
    
    
    def test_build_subnet_db(mocked_worker_module):
        """
        checks that valid reverse subnet objects are created
        :param mocked_worker_module: fixture
        :return:
        """
    
        address_schema = {
            '$schema': 'http://json-schema.org/draft-07/schema#',
    
            'type': 'object',
            'properties': {
                'name': {'type': 'string'},
                'interface address': {'type': 'string'},
                'interface name': {'type': 'string'},
                'router': {'type': 'string'}
            },
            'required': ['name', 'interface address', 'interface name', 'router'],
            'additionalProperties': False
        }
    
        db = backend_db()  # also forces initialization
        worker._build_subnet_db()
    
        found_record = False
        for key, value in db.items():
    
            if not key.startswith('reverse_interface_addresses:'):
                continue
    
            found_record = True
    
            m = re.match('^reverse_interface_addresses:(.+)', key)
            assert m
            address = m.group(1)
    
            value = json.loads(value)
            jsonschema.validate(value, address_schema)
            assert value['name'] == address
    
        assert found_record