-
Sam Roberts authoredSam Roberts authored
conftest.py 8.29 KiB
import ast
import contextlib
import json
import netifaces
import os
import re
import tempfile
import threading
from lxml import etree
import pytest
import inventory_provider
from inventory_provider.tasks import worker
from inventory_provider import config
TEST_DATA_DIRNAME = os.path.realpath(os.path.join(
inventory_provider.__path__[0],
"..",
"test",
"data"))
_bootstrap_semaphore = threading.Semaphore()
@pytest.fixture
def data_config_filename():
with tempfile.NamedTemporaryFile() as f:
config = {
"ssh": {
"username": "uSeR-NaMe",
"private-key": "private-key-filename",
"known-hosts": "known-hosts=filename"
},
"redis": {
"hostname": "xxxxxx",
"port": 6379,
"socket_timeout": 2.8
},
"redis-databases": [0, 7],
"ims": {
"api": "ims_api",
"username": "ims_username",
"password": "ims_password"
},
"managed-routers": "bogus url",
"unmanaged-interfaces": [
{
"address": "99.99.99.99",
"network": "99.99.99.0/24",
"interface": "ABC/0/0/0",
"router": "bogus.host.name"
},
{
"address": "999.999.999.99",
"network": "999.999.999.0/24",
"interface": "ZZZ/9/a/x:0.123",
"router": "another.bogus.host.name"
}
],
'gws-direct': {},
'nren-asn-map': [
{
"nren": "FOO",
"asn": 1930
},
{
"nren": "BAR",
"asn": 680
},
{
"nren": "BAT",
"asn": 2200
},
{
"nren": "BAZ",
"asn": 1853
}
]
}
with open(os.path.join(TEST_DATA_DIRNAME, 'gws-direct.json')) as gws:
config['gws-direct'] = json.loads(gws.read())
f.write(json.dumps(config).encode('utf-8'))
f.flush()
yield f.name
@pytest.fixture
def data_config(data_config_filename):
with open(data_config_filename) as f:
return config.load(f)
class MockedRedis(object):
db = None
def __init__(self, *args, **kwargs):
_bootstrap_semaphore.acquire()
try:
if MockedRedis.db is None:
MockedRedis.prep()
finally:
_bootstrap_semaphore.release()
# allows us to create other mocks using a different data source file
@staticmethod
def prep(data_source_file="router-info.json"):
test_data_filename = os.path.join(
TEST_DATA_DIRNAME,
data_source_file)
with open(test_data_filename) as f:
MockedRedis.db = json.loads(f.read())
MockedRedis.db['db:latch'] = json.dumps({
'current': 0,
'next': 0,
'this': 0,
'pending': False,
'failure': False
})
# remove any cached data from the captured snapshot
def _is_cache(s):
if s.startswith('classifier-cache'):
return True
if s.startswith('joblog'):
return True
return False
keys_to_delete = filter(_is_cache, MockedRedis.db.keys())
for k in list(keys_to_delete):
del MockedRedis.db[k]
def set(self, name, value):
MockedRedis.db[name] = value
def get(self, name):
value = MockedRedis.db.get(name, None)
if value is None:
return None
return value.encode('utf-8')
def exists(self, name):
return name in MockedRedis.db
def delete(self, key):
if isinstance(key, bytes):
key = key.decode('utf-8')
# redis ignores delete for keys that don't exist
# ... but in our test environment we don't expect this
del MockedRedis.db[key]
def scan_iter(self, glob=None, count='unused'):
if not glob:
for k in list(MockedRedis.db.keys()):
yield k.encode('utf-8')
m = re.match(r'^([^*]+)\*$', glob)
assert m # all expected globs are like this
for k in list(MockedRedis.db.keys()):
if k.startswith(m.group(1)):
yield k.encode('utf-8')
def keys(self, glob=None):
return list(self.scan_iter(glob))
def flushdb(self):
# only called from testing routes (hopefully)
pass
def execute(self):
pass
def pipeline(self, *args, **kwargs):
return self
@pytest.fixture
def cached_test_data():
filename = os.path.join(TEST_DATA_DIRNAME, "router-info.json")
with open(filename) as f:
return json.loads(f.read())
@pytest.fixture
def flask_config_filename():
with tempfile.NamedTemporaryFile() as f:
f.write('ENABLE_TESTING_ROUTES = True\n'.encode('utf-8'))
f.flush()
yield f.name
@pytest.fixture
def mocked_redis(mocker):
MockedRedis.db = None # force data to be reloaded
mocker.patch(
'inventory_provider.tasks.common.redis.StrictRedis',
MockedRedis)
@pytest.fixture
def client(flask_config_filename, data_config_filename, mocked_redis):
os.environ['FLASK_SETTINGS_FILENAME'] = flask_config_filename
os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME'] = data_config_filename
with inventory_provider.create_app().test_client() as c:
yield c
NETIFACES_TEST_DATA_STRING = """{
'lo0': {{AF_INET}: [{'addr': '127.0.0.1', 'netmask': '255.0.0.0', 'peer': '127.0.0.1'}],
{AF_INET6}: [{'addr': '::1', 'netmask': 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/128', 'peer': '::1', 'flags': 0},
{'addr': 'fe80::1%lo0', 'netmask': 'ffff:ffff:ffff:ffff::/64', 'flags': 0}]},
'eth0': {{AF_LINK}: [{'addr': '78:4f:43:76:73:ba'}],
{AF_INET}: [{'addr': '83.97.92.239', 'netmask': '255.255.252.0', 'broadcast': '83.97.95.255'}],
{AF_INET6}: [{'addr': 'fe80::250:56ff:fea1:8340', 'netmask': 'ffff:ffff:ffff:ffff::/64', 'flags': 1024},
{'addr': '2001:798:3::104', 'netmask': 'ffff:ffff:ffff:ffff::/64', 'flags': 1088}]}
}""" # noqa E501
@pytest.fixture
def mocked_netifaces(mocker):
s = NETIFACES_TEST_DATA_STRING
for k, v in {
'AF_INET': netifaces.AF_INET,
'AF_INET6': netifaces.AF_INET6,
'AF_LINK': netifaces.AF_LINK
}.items():
s = s.replace('{%s}' % k, str(v))
data = ast.literal_eval(s)
mocker.patch('netifaces.interfaces', lambda: data.keys())
mocker.patch('netifaces.ifaddresses', lambda n: data[n])
@contextlib.contextmanager
def _mocked_db_connection(ignored):
yield None
@pytest.fixture
def mocked_worker_module(
mocker, mocked_redis, data_config_filename,
cached_test_data, mocked_netifaces):
os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME'] = data_config_filename
with open(data_config_filename) as f:
worker.InventoryTask.config = config.load(f)
def _mocked_send_event(*kargs, **kwargs):
pass
mocker.patch(
'inventory_provider.tasks.worker.InventoryTask.send_event',
_mocked_send_event)
def _mocked_snmp_interfaces(hostname, community, logical_systems):
return json.loads(cached_test_data['snmp-interfaces:' + hostname])
mocker.patch(
'inventory_provider.snmp.get_router_snmp_indexes',
_mocked_snmp_interfaces)
def _mocked_snmp_peerings(hostname, community, logical_systems):
def _wanted(s):
return re.match(r'^snmp-peerings:\d.*', s)
keys = filter(_wanted, cached_test_data.keys())
return [json.loads(cached_test_data[k]) for k in keys]
mocker.patch(
'inventory_provider.snmp.get_peer_state_info',
_mocked_snmp_peerings)
def _mocked_load_juniper_netconf_config(hostname, _):
return etree.XML(cached_test_data['netconf:' + hostname])
mocker.patch(
'inventory_provider.juniper.load_config',
_mocked_load_juniper_netconf_config)