Skip to content
Snippets Groups Projects
Select Git revision
  • 7baef30a867d40e6dbc9bdbfe5c8e674199e88fb
  • develop default
  • master protected
  • feature/frontend-tests
  • 0.107
  • 0.106
  • 0.105
  • 0.104
  • 0.103
  • 0.102
  • 0.101
  • 0.100
  • 0.99
  • 0.98
  • 0.97
  • 0.96
  • 0.95
  • 0.94
  • 0.93
  • 0.92
  • 0.91
  • 0.90
  • 0.89
  • 0.88
24 results

test_survey.py

Blame
  • conftest.py 8.29 KiB
    import ast
    import contextlib
    import json
    import netifaces
    import os
    import re
    import tempfile
    import threading
    
    from lxml import etree
    import pytest
    
    import inventory_provider
    from inventory_provider.tasks import worker
    from inventory_provider import config
    
    TEST_DATA_DIRNAME = os.path.realpath(os.path.join(
        inventory_provider.__path__[0],
        "..",
        "test",
        "data"))
    
    _bootstrap_semaphore = threading.Semaphore()
    
    
    @pytest.fixture
    def data_config_filename():
    
        with tempfile.NamedTemporaryFile() as f:
            config = {
                "ssh": {
                    "username": "uSeR-NaMe",
                    "private-key": "private-key-filename",
                    "known-hosts": "known-hosts=filename"
                },
                "redis": {
                    "hostname": "xxxxxx",
                    "port": 6379,
                    "socket_timeout": 2.8
                },
                "redis-databases": [0, 7],
                "ims": {
                    "api": "ims_api",
                    "username": "ims_username",
                    "password": "ims_password"
                  },
                "managed-routers": "bogus url",
                "unmanaged-interfaces": [
                    {
                        "address": "99.99.99.99",
                        "network": "99.99.99.0/24",
                        "interface": "ABC/0/0/0",
                        "router": "bogus.host.name"
                    },
                    {
                        "address": "999.999.999.99",
                        "network": "999.999.999.0/24",
                        "interface": "ZZZ/9/a/x:0.123",
                        "router": "another.bogus.host.name"
                    }
                ],
                'gws-direct': {},
                'nren-asn-map': [
                    {
                        "nren": "FOO",
                        "asn": 1930
                    },
                    {
                        "nren": "BAR",
                        "asn": 680
                    },
                    {
                        "nren": "BAT",
                        "asn": 2200
                    },
                    {
                        "nren": "BAZ",
                        "asn": 1853
                    }
                ]
            }
    
            with open(os.path.join(TEST_DATA_DIRNAME, 'gws-direct.json')) as gws:
                config['gws-direct'] = json.loads(gws.read())
    
            f.write(json.dumps(config).encode('utf-8'))
            f.flush()
            yield f.name
    
    
    @pytest.fixture
    def data_config(data_config_filename):
        with open(data_config_filename) as f:
            return config.load(f)
    
    
    class MockedRedis(object):
    
        db = None
    
        def __init__(self, *args, **kwargs):
            _bootstrap_semaphore.acquire()
            try:
                if MockedRedis.db is None:
                    MockedRedis.prep()
            finally:
                _bootstrap_semaphore.release()
    
        # allows us to create other mocks using a different data source file
        @staticmethod
        def prep(data_source_file="router-info.json"):
            test_data_filename = os.path.join(
                TEST_DATA_DIRNAME,
                data_source_file)
            with open(test_data_filename) as f:
                MockedRedis.db = json.loads(f.read())
                MockedRedis.db['db:latch'] = json.dumps({
                    'current': 0,
                    'next': 0,
                    'this': 0,
                    'pending': False,
                    'failure': False
                })
    
                # remove any cached data from the captured snapshot
                def _is_cache(s):
                    if s.startswith('classifier-cache'):
                        return True
                    if s.startswith('joblog'):
                        return True
                    return False
                keys_to_delete = filter(_is_cache, MockedRedis.db.keys())
                for k in list(keys_to_delete):
                    del MockedRedis.db[k]
    
        def set(self, name, value):
            MockedRedis.db[name] = value
    
        def get(self, name):
            value = MockedRedis.db.get(name, None)
            if value is None:
                return None
            return value.encode('utf-8')
    
        def exists(self, name):
            return name in MockedRedis.db
    
        def delete(self, key):
            if isinstance(key, bytes):
                key = key.decode('utf-8')
            # redis ignores delete for keys that don't exist
            # ... but in our test environment we don't expect this
            del MockedRedis.db[key]
    
        def scan_iter(self, glob=None, count='unused'):
            if not glob:
                for k in list(MockedRedis.db.keys()):
                    yield k.encode('utf-8')
    
            m = re.match(r'^([^*]+)\*$', glob)
            assert m  # all expected globs are like this
            for k in list(MockedRedis.db.keys()):
                if k.startswith(m.group(1)):
                    yield k.encode('utf-8')
    
        def keys(self, glob=None):
            return list(self.scan_iter(glob))
    
        def flushdb(self):
            # only called from testing routes (hopefully)
            pass
    
        def execute(self):
            pass
    
        def pipeline(self, *args, **kwargs):
            return self
    
    
    @pytest.fixture
    def cached_test_data():
        filename = os.path.join(TEST_DATA_DIRNAME, "router-info.json")
        with open(filename) as f:
            return json.loads(f.read())
    
    
    @pytest.fixture
    def flask_config_filename():
    
        with tempfile.NamedTemporaryFile() as f:
            f.write('ENABLE_TESTING_ROUTES = True\n'.encode('utf-8'))
            f.flush()
            yield f.name
    
    
    @pytest.fixture
    def mocked_redis(mocker):
        MockedRedis.db = None  # force data to be reloaded
        mocker.patch(
            'inventory_provider.tasks.common.redis.StrictRedis',
            MockedRedis)
    
    
    @pytest.fixture
    def client(flask_config_filename, data_config_filename, mocked_redis):
        os.environ['FLASK_SETTINGS_FILENAME'] = flask_config_filename
        os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME'] = data_config_filename
        with inventory_provider.create_app().test_client() as c:
            yield c
    
    
    NETIFACES_TEST_DATA_STRING = """{
        'lo0':  {{AF_INET}: [{'addr': '127.0.0.1', 'netmask': '255.0.0.0', 'peer': '127.0.0.1'}],
                 {AF_INET6}: [{'addr': '::1', 'netmask': 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/128', 'peer': '::1', 'flags': 0},
                     {'addr': 'fe80::1%lo0', 'netmask': 'ffff:ffff:ffff:ffff::/64', 'flags': 0}]},
        'eth0': {{AF_LINK}: [{'addr': '78:4f:43:76:73:ba'}],
                 {AF_INET}: [{'addr': '83.97.92.239', 'netmask': '255.255.252.0', 'broadcast': '83.97.95.255'}],
                 {AF_INET6}: [{'addr': 'fe80::250:56ff:fea1:8340', 'netmask': 'ffff:ffff:ffff:ffff::/64', 'flags': 1024},
                     {'addr': '2001:798:3::104', 'netmask': 'ffff:ffff:ffff:ffff::/64', 'flags': 1088}]}
    }"""  # noqa E501
    
    
    @pytest.fixture
    def mocked_netifaces(mocker):
        s = NETIFACES_TEST_DATA_STRING
        for k, v in {
                    'AF_INET': netifaces.AF_INET,
                    'AF_INET6': netifaces.AF_INET6,
                    'AF_LINK': netifaces.AF_LINK
                }.items():
            s = s.replace('{%s}' % k, str(v))
        data = ast.literal_eval(s)
        mocker.patch('netifaces.interfaces', lambda: data.keys())
        mocker.patch('netifaces.ifaddresses', lambda n: data[n])
    
    
    @contextlib.contextmanager
    def _mocked_db_connection(ignored):
        yield None
    
    
    @pytest.fixture
    def mocked_worker_module(
            mocker, mocked_redis, data_config_filename,
            cached_test_data, mocked_netifaces):
    
        os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME'] = data_config_filename
    
        with open(data_config_filename) as f:
            worker.InventoryTask.config = config.load(f)
    
        def _mocked_send_event(*kargs, **kwargs):
            pass
        mocker.patch(
            'inventory_provider.tasks.worker.InventoryTask.send_event',
            _mocked_send_event)
    
        def _mocked_snmp_interfaces(hostname, community, logical_systems):
            return json.loads(cached_test_data['snmp-interfaces:' + hostname])
        mocker.patch(
            'inventory_provider.snmp.get_router_snmp_indexes',
            _mocked_snmp_interfaces)
    
        def _mocked_snmp_peerings(hostname, community, logical_systems):
            def _wanted(s):
                return re.match(r'^snmp-peerings:\d.*', s)
    
            keys = filter(_wanted, cached_test_data.keys())
            return [json.loads(cached_test_data[k]) for k in keys]
        mocker.patch(
            'inventory_provider.snmp.get_peer_state_info',
            _mocked_snmp_peerings)
    
        def _mocked_load_juniper_netconf_config(hostname, _):
            return etree.XML(cached_test_data['netconf:' + hostname])
        mocker.patch(
            'inventory_provider.juniper.load_config',
            _mocked_load_juniper_netconf_config)