Skip to content
Snippets Groups Projects
conftest.py 8.26 KiB
Newer Older
import ast
import contextlib
import json
import netifaces
import os
import re
import tempfile
Erik Reid's avatar
Erik Reid committed
import threading
Erik Reid's avatar
Erik Reid committed
from lxml import etree
import pytest
import inventory_provider
Erik Reid's avatar
Erik Reid committed
from inventory_provider.tasks import worker
from inventory_provider import config
TEST_DATA_DIRNAME = os.path.realpath(os.path.join(
    inventory_provider.__path__[0],
    "..",
    "test",
    "data"))
Erik Reid's avatar
Erik Reid committed
_bootstrap_semaphore = threading.Semaphore()

@pytest.fixture
def data_config_filename():

    with tempfile.NamedTemporaryFile() as f:
        config = {
            "ops-db": {
                "hostname": "xxxxxxx.yyyyy.zzz",
                "dbname": "xxxxxx",
                "username": "xxxxxx",
                "password": "xxxxxxxx"
            },
            "ssh": {
                "username": "uSeR-NaMe",
                "private-key": "private-key-filename",
                "known-hosts": "known-hosts=filename"
            },
            "redis": {
                "hostname": "xxxxxx",
                "port": 6379,
                "socket_timeout": 2.8
Erik Reid's avatar
Erik Reid committed
            "redis-databases": [0, 7],
            "otrs-export": {
                "username": "otrs_username",
                "private-key": "otrs_ky_loc",
                "destination": "otrs_dest",
                "known-hosts": "otrs_known_hosts"
                "api": "ims_api",
                "username": "ims_username",
                "password": "ims_password"
            "managed-routers": "bogus url",
            "unmanaged-interfaces": [
                {
                    "address": "99.99.99.99",
                    "network": "99.99.99.0/24",
                    "interface": "ABC/0/0/0",
                    "router": "bogus.host.name"
                },
                {
                    "address": "999.999.999.99",
                    "network": "999.999.999.0/24",
                    "interface": "ZZZ/9/a/x:0.123",
                    "router": "another.bogus.host.name"
                }
Erik Reid's avatar
Erik Reid committed
            ],
            'gws-direct': {}
Erik Reid's avatar
Erik Reid committed
        with open(os.path.join(TEST_DATA_DIRNAME, 'gws-direct.json')) as gws:
            config['gws-direct'] = json.loads(gws.read())

        f.write(json.dumps(config).encode('utf-8'))
        f.flush()
        yield f.name
@pytest.fixture
def data_config(data_config_filename):
    with open(data_config_filename) as f:
        return config.load(f)
class MockedRedis(object):

    db = None

    def __init__(self, *args, **kwargs):
Erik Reid's avatar
Erik Reid committed
        _bootstrap_semaphore.acquire()
        try:
            if MockedRedis.db is None:
                MockedRedis.prep()
        finally:
            _bootstrap_semaphore.release()
Robert Latta's avatar
Robert Latta committed

    # allows us to create other mocks using a different data source file
Robert Latta's avatar
Robert Latta committed
    @staticmethod
    def prep(data_source_file="router-info.json"):
        test_data_filename = os.path.join(
            TEST_DATA_DIRNAME,
            data_source_file)
        with open(test_data_filename) as f:
            MockedRedis.db = json.loads(f.read())
            MockedRedis.db['db:latch'] = json.dumps({
                'current': 0,
                'next': 0,
                'this': 0,
                'pending': False,
                'failure': False
            })

            # remove any cached data from the captured snapshot
            def _is_cache(s):
                if s.startswith('classifier-cache'):
                    return True
                if s.startswith('joblog'):
                    return True
                return False
            keys_to_delete = filter(_is_cache, MockedRedis.db.keys())
            for k in list(keys_to_delete):
                del MockedRedis.db[k]
    def set(self, name, value):
        MockedRedis.db[name] = value

    def get(self, name):
        value = MockedRedis.db.get(name, None)
        if value is None:
            return None
        return value.encode('utf-8')

    def exists(self, name):
        return name in MockedRedis.db

    def delete(self, key):
        if isinstance(key, bytes):
            key = key.decode('utf-8')
        # redis ignores delete for keys that don't exist
        # ... but in our test environment we don't expect this
        del MockedRedis.db[key]

    def scan_iter(self, glob=None, count='unused'):
        if not glob:
            for k in list(MockedRedis.db.keys()):
                yield k.encode('utf-8')

        m = re.match(r'^([^*]+)\*$', glob)
        assert m  # all expected globs are like this
        for k in list(MockedRedis.db.keys()):
            if k.startswith(m.group(1)):
                yield k.encode('utf-8')

    def keys(self, glob=None):
        return list(self.scan_iter(glob))

Erik Reid's avatar
Erik Reid committed
    def flushdb(self):
        # only called from testing routes (hopefully)
        pass

    def execute(self):
        pass

    def pipeline(self, *args, **kwargs):
        return self

@pytest.fixture
def cached_test_data():
    filename = os.path.join(TEST_DATA_DIRNAME, "router-info.json")
    with open(filename) as f:
        return json.loads(f.read())


@pytest.fixture
def flask_config_filename():
    with tempfile.NamedTemporaryFile() as f:
        f.write('ENABLE_TESTING_ROUTES = True\n'.encode('utf-8'))
        f.flush()
        yield f.name


@pytest.fixture
def mocked_redis(mocker):
    MockedRedis.db = None  # force data to be reloaded
    mocker.patch(
        'inventory_provider.tasks.common.redis.StrictRedis',
        MockedRedis)
Erik Reid's avatar
Erik Reid committed


@pytest.fixture
def client(flask_config_filename, data_config_filename, mocked_redis):
    os.environ['FLASK_SETTINGS_FILENAME'] = flask_config_filename
    os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME'] = data_config_filename
    with inventory_provider.create_app().test_client() as c:
        yield c

Robert Latta's avatar
Robert Latta committed

NETIFACES_TEST_DATA_STRING = """{
    'lo0':  {{AF_INET}: [{'addr': '127.0.0.1', 'netmask': '255.0.0.0', 'peer': '127.0.0.1'}],
             {AF_INET6}: [{'addr': '::1', 'netmask': 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/128', 'peer': '::1', 'flags': 0},
                 {'addr': 'fe80::1%lo0', 'netmask': 'ffff:ffff:ffff:ffff::/64', 'flags': 0}]},
    'eth0': {{AF_LINK}: [{'addr': '78:4f:43:76:73:ba'}],
             {AF_INET}: [{'addr': '83.97.92.239', 'netmask': '255.255.252.0', 'broadcast': '83.97.95.255'}],
             {AF_INET6}: [{'addr': 'fe80::250:56ff:fea1:8340', 'netmask': 'ffff:ffff:ffff:ffff::/64', 'flags': 1024},
                 {'addr': '2001:798:3::104', 'netmask': 'ffff:ffff:ffff:ffff::/64', 'flags': 1088}]}
}"""  # noqa E501


@pytest.fixture
def mocked_netifaces(mocker):
    s = NETIFACES_TEST_DATA_STRING
    for k, v in {
                'AF_INET': netifaces.AF_INET,
                'AF_INET6': netifaces.AF_INET6,
                'AF_LINK': netifaces.AF_LINK
            }.items():
        s = s.replace('{%s}' % k, str(v))
    data = ast.literal_eval(s)
    mocker.patch('netifaces.interfaces', lambda: data.keys())
    mocker.patch('netifaces.ifaddresses', lambda n: data[n])
Erik Reid's avatar
Erik Reid committed

@contextlib.contextmanager
def _mocked_db_connection(ignored):
    yield None


Erik Reid's avatar
Erik Reid committed
@pytest.fixture
def mocked_worker_module(
        mocker, mocked_redis, data_config_filename,
Erik Reid's avatar
Erik Reid committed
        cached_test_data, mocked_netifaces):

    os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME'] = data_config_filename
    with open(data_config_filename) as f:
        worker.InventoryTask.config = config.load(f)
Erik Reid's avatar
Erik Reid committed
    def _mocked_send_event(*kargs, **kwargs):
        pass
    mocker.patch(
        'inventory_provider.tasks.worker.InventoryTask.send_event',
        _mocked_send_event)

    def _mocked_snmp_interfaces(hostname, community, logical_systems):
Erik Reid's avatar
Erik Reid committed
        return json.loads(cached_test_data['snmp-interfaces:' + hostname])
    mocker.patch(
Erik Reid's avatar
Erik Reid committed
        'inventory_provider.snmp.get_router_snmp_indexes',
Erik Reid's avatar
Erik Reid committed
        _mocked_snmp_interfaces)

    def _mocked_snmp_peerings(hostname, community, logical_systems):
        def _wanted(s):
Erik Reid's avatar
Erik Reid committed
            return re.match(r'^snmp-peerings:\d.*', s)

        keys = filter(_wanted, cached_test_data.keys())
        return [json.loads(cached_test_data[k]) for k in keys]
    mocker.patch(
        'inventory_provider.snmp.get_peer_state_info',
        _mocked_snmp_peerings)

Erik Reid's avatar
Erik Reid committed
    def _mocked_load_juniper_netconf_config(hostname, _):
        return etree.XML(cached_test_data['netconf:' + hostname])
    mocker.patch(
        'inventory_provider.juniper.load_config',
        _mocked_load_juniper_netconf_config)