import ast import contextlib import json import netifaces import os import re import tempfile import threading from lxml import etree import pytest import inventory_provider from inventory_provider.tasks import worker from inventory_provider import config TEST_DATA_DIRNAME = os.path.realpath(os.path.join( inventory_provider.__path__[0], "..", "test", "data")) @pytest.fixture def data_config_filename(): with tempfile.NamedTemporaryFile() as f: config = { "ops-db": { "hostname": "xxxxxxx.yyyyy.zzz", "dbname": "xxxxxx", "username": "xxxxxx", "password": "xxxxxxxx" }, "ssh": { "username": "uSeR-NaMe", "private-key": "private-key-filename", "known-hosts": "known-hosts=filename" }, "redis": { "hostname": "xxxxxx", "port": 6379, "socket_timeout": 2.8 }, "redis-databases": [0, 7], "otrs-export": { "username": "otrs_username", "private-key": "otrs_ky_loc", "destination": "otrs_dest", "known-hosts": "otrs_known_hosts" }, "ims": { "api": "ims_api", "username": "ims_username", "password": "ims_password" }, "managed-routers": "bogus url", "unmanaged-interfaces": [ { "address": "99.99.99.99", "network": "99.99.99.0/24", "interface": "ABC/0/0/0", "router": "bogus.host.name" }, { "address": "999.999.999.99", "network": "999.999.999.0/24", "interface": "ZZZ/9/a/x:0.123", "router": "another.bogus.host.name" } ], 'gws-direct': {} } with open(os.path.join(TEST_DATA_DIRNAME, 'gws-direct.json')) as gws: config['gws-direct'] = json.loads(gws.read()) f.write(json.dumps(config).encode('utf-8')) f.flush() yield f.name @pytest.fixture def data_config(data_config_filename): with open(data_config_filename) as f: return config.load(f) TEST_DATA_DIRNAME = os.path.realpath(os.path.join( inventory_provider.__path__[0], "..", "test", "data")) _bootstrap_semaphore = threading.Semaphore() class MockedRedis(object): db = None def __init__(self, *args, **kwargs): _bootstrap_semaphore.acquire() try: if MockedRedis.db is None: MockedRedis.prep() finally: _bootstrap_semaphore.release() # allows us to create other mocks using a different data source file @staticmethod def prep(data_source_file="router-info.json"): test_data_filename = os.path.join( TEST_DATA_DIRNAME, data_source_file) with open(test_data_filename) as f: MockedRedis.db = json.loads(f.read()) MockedRedis.db['db:latch'] = json.dumps({ 'current': 0, 'next': 0, 'this': 0, 'pending': False, 'failure': False }) # remove any cached data from the captured snapshot def _is_cache(s): if s.startswith('classifier-cache'): return True if s.startswith('joblog'): return True return False keys_to_delete = filter(_is_cache, MockedRedis.db.keys()) for k in list(keys_to_delete): del MockedRedis.db[k] def set(self, name, value): MockedRedis.db[name] = value def get(self, name): value = MockedRedis.db.get(name, None) if value is None: return None return value.encode('utf-8') def exists(self, name): return name in MockedRedis.db def delete(self, key): if isinstance(key, bytes): key = key.decode('utf-8') # redis ignores delete for keys that don't exist # ... but in our test environment we don't expect this del MockedRedis.db[key] def scan_iter(self, glob=None, count='unused'): if not glob: for k in list(MockedRedis.db.keys()): yield k.encode('utf-8') m = re.match(r'^([^*]+)\*$', glob) assert m # all expected globs are like this for k in list(MockedRedis.db.keys()): if k.startswith(m.group(1)): yield k.encode('utf-8') def keys(self, glob=None): return list(self.scan_iter(glob)) def flushdb(self): # only called from testing routes (hopefully) pass def execute(self): pass def pipeline(self, *args, **kwargs): return self @pytest.fixture def cached_test_data(): filename = os.path.join(TEST_DATA_DIRNAME, "router-info.json") with open(filename) as f: return json.loads(f.read()) @pytest.fixture def flask_config_filename(): with tempfile.NamedTemporaryFile() as f: f.write('ENABLE_TESTING_ROUTES = True\n'.encode('utf-8')) f.flush() yield f.name @pytest.fixture def mocked_redis(mocker): MockedRedis.db = None # force data to be reloaded mocker.patch( 'inventory_provider.tasks.common.redis.StrictRedis', MockedRedis) @pytest.fixture def client(flask_config_filename, data_config_filename, mocked_redis): os.environ['FLASK_SETTINGS_FILENAME'] = flask_config_filename os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME'] = data_config_filename with inventory_provider.create_app().test_client() as c: yield c NETIFACES_TEST_DATA_STRING = """{ 'lo0': {{AF_INET}: [{'addr': '127.0.0.1', 'netmask': '255.0.0.0', 'peer': '127.0.0.1'}], {AF_INET6}: [{'addr': '::1', 'netmask': 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/128', 'peer': '::1', 'flags': 0}, {'addr': 'fe80::1%lo0', 'netmask': 'ffff:ffff:ffff:ffff::/64', 'flags': 0}]}, 'eth0': {{AF_LINK}: [{'addr': '78:4f:43:76:73:ba'}], {AF_INET}: [{'addr': '83.97.92.239', 'netmask': '255.255.252.0', 'broadcast': '83.97.95.255'}], {AF_INET6}: [{'addr': 'fe80::250:56ff:fea1:8340', 'netmask': 'ffff:ffff:ffff:ffff::/64', 'flags': 1024}, {'addr': '2001:798:3::104', 'netmask': 'ffff:ffff:ffff:ffff::/64', 'flags': 1088}]} }""" # noqa E501 @pytest.fixture def mocked_netifaces(mocker): s = NETIFACES_TEST_DATA_STRING for k, v in { 'AF_INET': netifaces.AF_INET, 'AF_INET6': netifaces.AF_INET6, 'AF_LINK': netifaces.AF_LINK }.items(): s = s.replace('{%s}' % k, str(v)) data = ast.literal_eval(s) mocker.patch('netifaces.interfaces', lambda: data.keys()) mocker.patch('netifaces.ifaddresses', lambda n: data[n]) @contextlib.contextmanager def _mocked_db_connection(ignored): yield None @pytest.fixture def mocked_worker_module( mocker, mocked_redis, data_config_filename, cached_test_data, mocked_netifaces): os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME'] = data_config_filename with open(data_config_filename) as f: worker.InventoryTask.config = config.load(f) def _mocked_send_event(*kargs, **kwargs): pass mocker.patch( 'inventory_provider.tasks.worker.InventoryTask.send_event', _mocked_send_event) def _mocked_snmp_interfaces(hostname, community, logical_systems): return json.loads(cached_test_data['snmp-interfaces:' + hostname]) mocker.patch( 'inventory_provider.snmp.get_router_snmp_indexes', _mocked_snmp_interfaces) def _mocked_snmp_peerings(hostname, community, logical_systems): def _wanted(s): return re.match(r'^snmp-peerings:\d.*', s) keys = filter(_wanted, cached_test_data.keys()) return [json.loads(cached_test_data[k]) for k in keys] mocker.patch( 'inventory_provider.snmp.get_peer_state_info', _mocked_snmp_peerings) def _mocked_load_juniper_netconf_config(hostname, _): return etree.XML(cached_test_data['netconf:' + hostname]) mocker.patch( 'inventory_provider.juniper.load_config', _mocked_load_juniper_netconf_config)