Newer
Older
from inventory_provider import config
TEST_DATA_DIRNAME = os.path.realpath(os.path.join(
inventory_provider.__path__[0],
"..",
"test",
"data"))
@pytest.fixture
def data_config_filename():
with tempfile.NamedTemporaryFile() as f:
config = {
"ops-db": {
"hostname": "xxxxxxx.yyyyy.zzz",
"dbname": "xxxxxx",
"username": "xxxxxx",
"password": "xxxxxxxx"
},
"ssh": {
"username": "uSeR-NaMe",
"private-key": "private-key-filename",
"known-hosts": "known-hosts=filename"
},
"redis": {
"hostname": "xxxxxx",
"junosspace": {
"api": "bogus-url",
"username": "bogus-username",
"password": "bogus-password"
"otrs-export": {
"username": "otrs_username",
"private-key": "otrs_ky_loc",
"destination": "otrs_dest"
},
"api": "ims_api",
"username": "ims_username",
"password": "ims_password"
f.write(json.dumps(config).encode('utf-8'))
f.flush()
yield f.name
@pytest.fixture
def data_config(data_config_filename):
with open(data_config_filename) as f:
return config.load(f)
TEST_DATA_DIRNAME = os.path.realpath(os.path.join(
inventory_provider.__path__[0],
"..",
"test",
"data"))
class MockedRedis(object):
db = None
def __init__(self, *args, **kwargs):
if MockedRedis.db is None:
test_data_filename = os.path.join(
TEST_DATA_DIRNAME,
"router-info.json")
with open(test_data_filename) as f:
MockedRedis.db = json.loads(f.read())
MockedRedis.db['db:latch'] = json.dumps({
'current': 0,
'next': 0,
'this': 0,
'pending': False,
'failure': False
def set(self, name, value):
MockedRedis.db[name] = value
def get(self, name):
value = MockedRedis.db.get(name, None)
if value is None:
return None
return value.encode('utf-8')
def delete(self, key):
if isinstance(key, bytes):
key = key.decode('utf-8')
del MockedRedis.db[key]
def scan_iter(self, glob=None):
if not glob:
yield k.encode('utf-8')
m = re.match(r'^([^*]+)\*$', glob)
assert m # all expected globs are like this
if k.startswith(m.group(1)):
yield k.encode('utf-8')
def keys(self, glob=None):
return list(self.scan_iter(glob))
def flushdb(self):
# only called from testing routes (hopefully)
pass
def execute(self):
pass
def pipeline(self, *args, **kwargs):
return self
@pytest.fixture
def cached_test_data():
filename = os.path.join(TEST_DATA_DIRNAME, "router-info.json")
with open(filename) as f:
return json.loads(f.read())
@pytest.fixture
with tempfile.NamedTemporaryFile() as f:
f.write('ENABLE_TESTING_ROUTES = True\n'.encode('utf-8'))
f.flush()
yield f.name
MockedRedis.db = None # force data to be reloaded
'inventory_provider.tasks.common.redis.StrictRedis',
def client(flask_config_filename, data_config_filename, mocked_redis):
os.environ['FLASK_SETTINGS_FILENAME'] = flask_config_filename
os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME'] = data_config_filename
with inventory_provider.create_app().test_client() as c:
yield c
#
# @pytest.fixture
# def client(client, mocked_redis):
# return client
NETIFACES_TEST_DATA_STRING = """{
'lo0': {{AF_INET}: [{'addr': '127.0.0.1', 'netmask': '255.0.0.0', 'peer': '127.0.0.1'}],
{AF_INET6}: [{'addr': '::1', 'netmask': 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/128', 'peer': '::1', 'flags': 0},
{'addr': 'fe80::1%lo0', 'netmask': 'ffff:ffff:ffff:ffff::/64', 'flags': 0}]},
'eth0': {{AF_LINK}: [{'addr': '78:4f:43:76:73:ba'}],
{AF_INET}: [{'addr': '83.97.92.239', 'netmask': '255.255.252.0', 'broadcast': '83.97.95.255'}],
{AF_INET6}: [{'addr': 'fe80::250:56ff:fea1:8340', 'netmask': 'ffff:ffff:ffff:ffff::/64', 'flags': 1024},
{'addr': '2001:798:3::104', 'netmask': 'ffff:ffff:ffff:ffff::/64', 'flags': 1088}]}
}""" # noqa E501
@pytest.fixture
def mocked_netifaces(mocker):
s = NETIFACES_TEST_DATA_STRING
for k, v in {
'AF_INET': netifaces.AF_INET,
'AF_INET6': netifaces.AF_INET6,
'AF_LINK': netifaces.AF_LINK
}.items():
s = s.replace('{%s}' % k, str(v))
data = ast.literal_eval(s)
mocker.patch('netifaces.interfaces', lambda: data.keys())
mocker.patch('netifaces.ifaddresses', lambda n: data[n])
@contextlib.contextmanager
def _mocked_db_connection(ignored):
yield None
mocker, mocked_redis, data_config_filename,
os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME'] = data_config_filename
with open(data_config_filename) as f:
worker.InventoryTask.config = config.load(f)
return json.loads(cached_test_data['snmp-interfaces:' + hostname])
mocker.patch(
_mocked_snmp_interfaces)
def _mocked_load_juniper_netconf_config(hostname, _):
return etree.XML(cached_test_data['netconf:' + hostname])
mocker.patch(
'inventory_provider.juniper.load_config',
_mocked_load_juniper_netconf_config)
def _mocked_get_service_users(cx, service_ids):
for id in service_ids:
yield {'service_id': id, 'user': 'AAAAA'}
yield {'service_id': id, 'user': 'BBBB'}
mocker.patch('inventory_provider.db.db.connection', _mocked_db_connection)
mocker.patch(
'inventory_provider.db.opsdb.get_service_users',
_mocked_get_service_users)