Skip to content
Snippets Groups Projects
conftest.py 8.39 KiB
import ast
import contextlib
import json
import netifaces
import os
import re
import tempfile
import threading

from lxml import etree
import pytest

import inventory_provider
from inventory_provider.tasks import worker
from inventory_provider import config

TEST_DATA_DIRNAME = os.path.realpath(os.path.join(
    inventory_provider.__path__[0],
    "..",
    "test",
    "data"))


@pytest.fixture
def data_config_filename():

    with tempfile.NamedTemporaryFile() as f:
        config = {
            "ops-db": {
                "hostname": "xxxxxxx.yyyyy.zzz",
                "dbname": "xxxxxx",
                "username": "xxxxxx",
                "password": "xxxxxxxx"
            },
            "ssh": {
                "username": "uSeR-NaMe",
                "private-key": "private-key-filename",
                "known-hosts": "known-hosts=filename"
            },
            "redis": {
                "hostname": "xxxxxx",
                "port": 6379,
                "socket_timeout": 2.8
            },
            "redis-databases": [0, 7],
            "otrs-export": {
                "username": "otrs_username",
                "private-key": "otrs_ky_loc",
                "destination": "otrs_dest",
                "known-hosts": "otrs_known_hosts"
              },
            "ims": {
                "api": "ims_api",
                "username": "ims_username",
                "password": "ims_password"
              },
            "managed-routers": "bogus url",
            "unmanaged-interfaces": [
                {
                    "address": "99.99.99.99",
                    "network": "99.99.99.0/24",
                    "interface": "ABC/0/0/0",
                    "router": "bogus.host.name"
                },
                {
                    "address": "999.999.999.99",
                    "network": "999.999.999.0/24",
                    "interface": "ZZZ/9/a/x:0.123",
                    "router": "another.bogus.host.name"
                }
            ],
            'gws-direct': {}
        }

        with open(os.path.join(TEST_DATA_DIRNAME, 'gws-direct.json')) as gws:
            config['gws-direct'] = json.loads(gws.read())

        f.write(json.dumps(config).encode('utf-8'))
        f.flush()
        yield f.name


@pytest.fixture
def data_config(data_config_filename):
    with open(data_config_filename) as f:
        return config.load(f)


TEST_DATA_DIRNAME = os.path.realpath(os.path.join(
    inventory_provider.__path__[0],
    "..",
    "test",
    "data"))


_bootstrap_semaphore = threading.Semaphore()



class MockedRedis(object):

    db = None

    def __init__(self, *args, **kwargs):
        _bootstrap_semaphore.acquire()
        try:
            if MockedRedis.db is None:
                MockedRedis.prep()
        finally:
            _bootstrap_semaphore.release()

    # allows us to create other mocks using a different data source file
    @staticmethod
    def prep(data_source_file="router-info.json"):
        test_data_filename = os.path.join(
            TEST_DATA_DIRNAME,
            data_source_file)
        with open(test_data_filename) as f:
            MockedRedis.db = json.loads(f.read())
            MockedRedis.db['db:latch'] = json.dumps({
                'current': 0,
                'next': 0,
                'this': 0,
                'pending': False,
                'failure': False
            })

            # remove any cached data from the captured snapshot
            def _is_cache(s):
                if s.startswith('classifier-cache'):
                    return True
                if s.startswith('joblog'):
                    return True
                return False
            keys_to_delete = filter(_is_cache, MockedRedis.db.keys())
            for k in list(keys_to_delete):
                del MockedRedis.db[k]

    def set(self, name, value):
        MockedRedis.db[name] = value

    def get(self, name):
        value = MockedRedis.db.get(name, None)
        if value is None:
            return None
        return value.encode('utf-8')

    def exists(self, name):
        return name in MockedRedis.db

    def delete(self, key):
        if isinstance(key, bytes):
            key = key.decode('utf-8')
        # redis ignores delete for keys that don't exist
        # ... but in our test environment we don't expect this
        del MockedRedis.db[key]

    def scan_iter(self, glob=None, count='unused'):
        if not glob:
            for k in list(MockedRedis.db.keys()):
                yield k.encode('utf-8')

        m = re.match(r'^([^*]+)\*$', glob)
        assert m  # all expected globs are like this
        for k in list(MockedRedis.db.keys()):
            if k.startswith(m.group(1)):
                yield k.encode('utf-8')

    def keys(self, glob=None):
        return list(self.scan_iter(glob))

    def flushdb(self):
        # only called from testing routes (hopefully)
        pass

    def execute(self):
        pass

    def pipeline(self, *args, **kwargs):
        return self


@pytest.fixture
def cached_test_data():
    filename = os.path.join(TEST_DATA_DIRNAME, "router-info.json")
    with open(filename) as f:
        return json.loads(f.read())


@pytest.fixture
def flask_config_filename():

    with tempfile.NamedTemporaryFile() as f:
        f.write('ENABLE_TESTING_ROUTES = True\n'.encode('utf-8'))
        f.flush()
        yield f.name


@pytest.fixture
def mocked_redis(mocker):
    MockedRedis.db = None  # force data to be reloaded
    mocker.patch(
        'inventory_provider.tasks.common.redis.StrictRedis',
        MockedRedis)


@pytest.fixture
def client(flask_config_filename, data_config_filename, mocked_redis):
    os.environ['FLASK_SETTINGS_FILENAME'] = flask_config_filename
    os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME'] = data_config_filename
    with inventory_provider.create_app().test_client() as c:
        yield c


NETIFACES_TEST_DATA_STRING = """{
    'lo0':  {{AF_INET}: [{'addr': '127.0.0.1', 'netmask': '255.0.0.0', 'peer': '127.0.0.1'}],
             {AF_INET6}: [{'addr': '::1', 'netmask': 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/128', 'peer': '::1', 'flags': 0},
                 {'addr': 'fe80::1%lo0', 'netmask': 'ffff:ffff:ffff:ffff::/64', 'flags': 0}]},
    'eth0': {{AF_LINK}: [{'addr': '78:4f:43:76:73:ba'}],
             {AF_INET}: [{'addr': '83.97.92.239', 'netmask': '255.255.252.0', 'broadcast': '83.97.95.255'}],
             {AF_INET6}: [{'addr': 'fe80::250:56ff:fea1:8340', 'netmask': 'ffff:ffff:ffff:ffff::/64', 'flags': 1024},
                 {'addr': '2001:798:3::104', 'netmask': 'ffff:ffff:ffff:ffff::/64', 'flags': 1088}]}
}"""  # noqa E501


@pytest.fixture
def mocked_netifaces(mocker):
    s = NETIFACES_TEST_DATA_STRING
    for k, v in {
                'AF_INET': netifaces.AF_INET,
                'AF_INET6': netifaces.AF_INET6,
                'AF_LINK': netifaces.AF_LINK
            }.items():
        s = s.replace('{%s}' % k, str(v))
    data = ast.literal_eval(s)
    mocker.patch('netifaces.interfaces', lambda: data.keys())
    mocker.patch('netifaces.ifaddresses', lambda n: data[n])


@contextlib.contextmanager
def _mocked_db_connection(ignored):
    yield None


@pytest.fixture
def mocked_worker_module(
        mocker, mocked_redis, data_config_filename,
        cached_test_data, mocked_netifaces):

    os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME'] = data_config_filename

    with open(data_config_filename) as f:
        worker.InventoryTask.config = config.load(f)

    def _mocked_send_event(*kargs, **kwargs):
        pass
    mocker.patch(
        'inventory_provider.tasks.worker.InventoryTask.send_event',
        _mocked_send_event)

    def _mocked_snmp_interfaces(hostname, community, logical_systems):
        return json.loads(cached_test_data['snmp-interfaces:' + hostname])
    mocker.patch(
        'inventory_provider.snmp.get_router_snmp_indexes',
        _mocked_snmp_interfaces)

    def _mocked_snmp_peerings(hostname, community, logical_systems):
        def _wanted(s):
            return re.match(r'^snmp-peerings:\d.*', s)

        keys = filter(_wanted, cached_test_data.keys())
        return [json.loads(cached_test_data[k]) for k in keys]
    mocker.patch(
        'inventory_provider.snmp.get_peer_state_info',
        _mocked_snmp_peerings)

    def _mocked_load_juniper_netconf_config(hostname, _):
        return etree.XML(cached_test_data['netconf:' + hostname])
    mocker.patch(
        'inventory_provider.juniper.load_config',
        _mocked_load_juniper_netconf_config)