conftest.py 8.39 KiB
import ast
import contextlib
import json
import netifaces
import os
import re
import tempfile
import threading
from lxml import etree
import pytest
import inventory_provider
from inventory_provider.tasks import worker
from inventory_provider import config
TEST_DATA_DIRNAME = os.path.realpath(os.path.join(
inventory_provider.__path__[0],
"..",
"test",
"data"))
@pytest.fixture
def data_config_filename():
with tempfile.NamedTemporaryFile() as f:
config = {
"ops-db": {
"hostname": "xxxxxxx.yyyyy.zzz",
"dbname": "xxxxxx",
"username": "xxxxxx",
"password": "xxxxxxxx"
},
"ssh": {
"username": "uSeR-NaMe",
"private-key": "private-key-filename",
"known-hosts": "known-hosts=filename"
},
"redis": {
"hostname": "xxxxxx",
"port": 6379,
"socket_timeout": 2.8
},
"redis-databases": [0, 7],
"otrs-export": {
"username": "otrs_username",
"private-key": "otrs_ky_loc",
"destination": "otrs_dest",
"known-hosts": "otrs_known_hosts"
},
"ims": {
"api": "ims_api",
"username": "ims_username",
"password": "ims_password"
},
"managed-routers": "bogus url",
"unmanaged-interfaces": [
{
"address": "99.99.99.99",
"network": "99.99.99.0/24",
"interface": "ABC/0/0/0",
"router": "bogus.host.name"
},
{
"address": "999.999.999.99",
"network": "999.999.999.0/24",
"interface": "ZZZ/9/a/x:0.123",
"router": "another.bogus.host.name"
}
],
'gws-direct': {}
}
with open(os.path.join(TEST_DATA_DIRNAME, 'gws-direct.json')) as gws:
config['gws-direct'] = json.loads(gws.read())
f.write(json.dumps(config).encode('utf-8'))
f.flush()
yield f.name
@pytest.fixture
def data_config(data_config_filename):
with open(data_config_filename) as f:
return config.load(f)
TEST_DATA_DIRNAME = os.path.realpath(os.path.join(
inventory_provider.__path__[0],
"..",
"test",
"data"))
_bootstrap_semaphore = threading.Semaphore()
class MockedRedis(object):
db = None
def __init__(self, *args, **kwargs):
_bootstrap_semaphore.acquire()
try:
if MockedRedis.db is None:
MockedRedis.prep()
finally:
_bootstrap_semaphore.release()
# allows us to create other mocks using a different data source file
@staticmethod
def prep(data_source_file="router-info.json"):
test_data_filename = os.path.join(
TEST_DATA_DIRNAME,
data_source_file)
with open(test_data_filename) as f:
MockedRedis.db = json.loads(f.read())
MockedRedis.db['db:latch'] = json.dumps({
'current': 0,
'next': 0,
'this': 0,
'pending': False,
'failure': False
})
# remove any cached data from the captured snapshot
def _is_cache(s):
if s.startswith('classifier-cache'):
return True
if s.startswith('joblog'):
return True
return False
keys_to_delete = filter(_is_cache, MockedRedis.db.keys())
for k in list(keys_to_delete):
del MockedRedis.db[k]
def set(self, name, value):
MockedRedis.db[name] = value
def get(self, name):
value = MockedRedis.db.get(name, None)
if value is None:
return None
return value.encode('utf-8')
def exists(self, name):
return name in MockedRedis.db
def delete(self, key):
if isinstance(key, bytes):
key = key.decode('utf-8')
# redis ignores delete for keys that don't exist
# ... but in our test environment we don't expect this
del MockedRedis.db[key]
def scan_iter(self, glob=None, count='unused'):
if not glob:
for k in list(MockedRedis.db.keys()):
yield k.encode('utf-8')
m = re.match(r'^([^*]+)\*$', glob)
assert m # all expected globs are like this
for k in list(MockedRedis.db.keys()):
if k.startswith(m.group(1)):
yield k.encode('utf-8')
def keys(self, glob=None):
return list(self.scan_iter(glob))
def flushdb(self):
# only called from testing routes (hopefully)
pass
def execute(self):
pass
def pipeline(self, *args, **kwargs):
return self
@pytest.fixture
def cached_test_data():
filename = os.path.join(TEST_DATA_DIRNAME, "router-info.json")
with open(filename) as f:
return json.loads(f.read())
@pytest.fixture
def flask_config_filename():
with tempfile.NamedTemporaryFile() as f:
f.write('ENABLE_TESTING_ROUTES = True\n'.encode('utf-8'))
f.flush()
yield f.name
@pytest.fixture
def mocked_redis(mocker):
MockedRedis.db = None # force data to be reloaded
mocker.patch(
'inventory_provider.tasks.common.redis.StrictRedis',
MockedRedis)
@pytest.fixture
def client(flask_config_filename, data_config_filename, mocked_redis):
os.environ['FLASK_SETTINGS_FILENAME'] = flask_config_filename
os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME'] = data_config_filename
with inventory_provider.create_app().test_client() as c:
yield c
NETIFACES_TEST_DATA_STRING = """{
'lo0': {{AF_INET}: [{'addr': '127.0.0.1', 'netmask': '255.0.0.0', 'peer': '127.0.0.1'}],
{AF_INET6}: [{'addr': '::1', 'netmask': 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/128', 'peer': '::1', 'flags': 0},
{'addr': 'fe80::1%lo0', 'netmask': 'ffff:ffff:ffff:ffff::/64', 'flags': 0}]},
'eth0': {{AF_LINK}: [{'addr': '78:4f:43:76:73:ba'}],
{AF_INET}: [{'addr': '83.97.92.239', 'netmask': '255.255.252.0', 'broadcast': '83.97.95.255'}],
{AF_INET6}: [{'addr': 'fe80::250:56ff:fea1:8340', 'netmask': 'ffff:ffff:ffff:ffff::/64', 'flags': 1024},
{'addr': '2001:798:3::104', 'netmask': 'ffff:ffff:ffff:ffff::/64', 'flags': 1088}]}
}""" # noqa E501
@pytest.fixture
def mocked_netifaces(mocker):
s = NETIFACES_TEST_DATA_STRING
for k, v in {
'AF_INET': netifaces.AF_INET,
'AF_INET6': netifaces.AF_INET6,
'AF_LINK': netifaces.AF_LINK
}.items():
s = s.replace('{%s}' % k, str(v))
data = ast.literal_eval(s)
mocker.patch('netifaces.interfaces', lambda: data.keys())
mocker.patch('netifaces.ifaddresses', lambda n: data[n])
@contextlib.contextmanager
def _mocked_db_connection(ignored):
yield None
@pytest.fixture
def mocked_worker_module(
mocker, mocked_redis, data_config_filename,
cached_test_data, mocked_netifaces):
os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME'] = data_config_filename
with open(data_config_filename) as f:
worker.InventoryTask.config = config.load(f)
def _mocked_send_event(*kargs, **kwargs):
pass
mocker.patch(
'inventory_provider.tasks.worker.InventoryTask.send_event',
_mocked_send_event)
def _mocked_snmp_interfaces(hostname, community, logical_systems):
return json.loads(cached_test_data['snmp-interfaces:' + hostname])
mocker.patch(
'inventory_provider.snmp.get_router_snmp_indexes',
_mocked_snmp_interfaces)
def _mocked_snmp_peerings(hostname, community, logical_systems):
def _wanted(s):
return re.match(r'^snmp-peerings:\d.*', s)
keys = filter(_wanted, cached_test_data.keys())
return [json.loads(cached_test_data[k]) for k in keys]
mocker.patch(
'inventory_provider.snmp.get_peer_state_info',
_mocked_snmp_peerings)
def _mocked_load_juniper_netconf_config(hostname, _):
return etree.XML(cached_test_data['netconf:' + hostname])
mocker.patch(
'inventory_provider.juniper.load_config',
_mocked_load_juniper_netconf_config)