Skip to content
Snippets Groups Projects
Commit a5464b7d authored by Erik Reid's avatar Erik Reid
Browse files

Finished feature more-worker-tests.

parents b58af6fe ef664df2
No related branches found
No related tags found
No related merge requests found
......@@ -144,14 +144,8 @@ def update_equipment_locations():
for k in r.scan_iter('opsdb:location:*'):
r.delete(k)
hostnames = []
for k in r.keys('netconf:*'):
m = re.match('^netconf:(.+)$', k.decode('utf-8'))
assert m
hostnames.append(m.group(1))
with db.connection(InventoryTask.config["ops-db"]) as cx:
for h in hostnames:
for h in _derive_router_hostnames(InventoryTask.config):
for ld in opsdb.lookup_pop_info(cx, h):
r.set('opsdb:location:%s' % h, json.dumps(ld))
......
......@@ -6,9 +6,12 @@ import re
import shutil
import tempfile
from lxml import etree
import pytest
import inventory_provider
from inventory_provider import config
from inventory_provider.tasks import worker
TEST_DATA_DIRNAME = os.path.realpath(os.path.join(
inventory_provider.__path__[0],
......@@ -106,14 +109,24 @@ class MockedRedis(object):
return None
return value.encode('utf-8')
def keys(self, glob=None):
def delete(self, key):
if isinstance(key, bytes):
key = key.decode('utf-8')
del MockedRedis.db[key]
def scan_iter(self, glob=None):
if not glob:
return list([k.encode("utf-8") for k in MockedRedis.db.keys()])
for k in MockedRedis.db.keys():
yield k.encode('utf-8')
m = re.match(r'^([^*]+)\*$', glob)
assert m # all expected global are like this
return list([
k.encode("utf-8") for k in MockedRedis.db.keys()
if k.startswith(m.group(1))])
assert m # all expected globs are like this
for k in MockedRedis.db.keys():
if k.startswith(m.group(1)):
yield k.encode('utf-8')
def keys(self, glob=None):
return list(self.scan_iter(glob))
def flushdb(self):
# only called from testing routes (hopefully)
......@@ -197,3 +210,23 @@ def mocked_netifaces(mocker):
data = ast.literal_eval(s)
mocker.patch('netifaces.interfaces', lambda: data.keys())
mocker.patch('netifaces.ifaddresses', lambda n: data[n])
@pytest.fixture
def mocked_worker_module(
mocker, mocked_redis, data_config,
cached_test_data, mocked_netifaces):
worker.InventoryTask.config = data_config
def _mocked_snmp_interfaces(hostname, community, _):
return json.loads(cached_test_data['snmp-interfaces:' + hostname])
mocker.patch(
'inventory_provider.snmp.get_router_interfaces',
_mocked_snmp_interfaces)
def _mocked_load_juniper_netconf_config(hostname, _):
return etree.XML(cached_test_data['netconf:' + hostname])
mocker.patch(
'inventory_provider.juniper.load_config',
_mocked_load_juniper_netconf_config)
......@@ -15,42 +15,6 @@ TEST_DATA_DIRNAME = os.path.realpath(os.path.join(
"data"))
class MockedRedis(object):
db = None
def __init__(self, *args, **kwargs):
if MockedRedis.db is None:
test_data_filename = os.path.join(
TEST_DATA_DIRNAME,
"router-info.json")
with open(test_data_filename) as f:
MockedRedis.db = json.loads(f.read())
def set(self, name, value):
MockedRedis.db[name] = value
def get(self, name):
value = MockedRedis.db.get(name, None)
if value is None:
return None
return value.encode('utf-8')
def keys(self, glob=None):
if not glob:
return list([k.encode("utf-8") for k in MockedRedis.db.keys()])
m = re.match(r'^([^*]+)\*$', glob)
assert m # all expected global are like this
return list([
k.encode("utf-8") for k in MockedRedis.db.keys()
if k.startswith(m.group(1))])
def delete(self, key):
if isinstance(key, bytes):
key = key.decode('utf-8')
del MockedRedis.db[key]
@pytest.fixture
def classifier_cache_test_entries():
filename = os.path.join(
......@@ -59,14 +23,6 @@ def classifier_cache_test_entries():
return json.loads(f.read())
@pytest.fixture
def mocked_redis(mocker):
mocker.patch(
'inventory_provider.tasks.common.redis.StrictRedis',
MockedRedis)
return MockedRedis()
def pytest_generate_tests(metafunc):
def _junosspace_hosts():
......
......@@ -2,10 +2,6 @@
just checks that the worker methods call the right functions
and some data ends up in the right place ... otherwise not very detailed
"""
import json
from lxml import etree
import pytest
from inventory_provider.tasks import worker
from inventory_provider.tasks.common import get_redis
......@@ -16,26 +12,6 @@ def backend_db():
}).db
@pytest.fixture
def mocked_worker_module(
mocker, mocked_redis, data_config,
cached_test_data, mocked_netifaces):
worker.InventoryTask.config = data_config
def _mocked_snmp_interfaces(hostname, community, _):
return json.loads(cached_test_data['snmp-interfaces:' + hostname])
mocker.patch(
'inventory_provider.snmp.get_router_interfaces',
_mocked_snmp_interfaces)
def _mocked_load_juniper_netconf_config(hostname, _):
return etree.XML(cached_test_data['netconf:' + hostname])
mocker.patch(
'inventory_provider.juniper.load_config',
_mocked_load_juniper_netconf_config)
def test_netconf_refresh_config(mocked_worker_module, router):
del backend_db()['netconf:' + router]
worker.netconf_refresh_config(router)
......
"""
just checks that the worker methods call the right functions
and some data ends up in the right place ... otherwise not very detailed
"""
import contextlib
from inventory_provider.tasks import worker
from inventory_provider.tasks.common import get_redis
def backend_db():
return get_redis({
'redis': {'hostname': None, 'port': None}
}).db
@contextlib.contextmanager
def _mocked_connection(x):
yield x
def test_update_locations(mocker, mocked_worker_module):
mocker.patch(
'inventory_provider.db.opsdb.lookup_pop_info',
lambda c, h: [{'C': c, 'H': h}])
mocker.patch(
'inventory_provider.db.db.connection',
_mocked_connection)
def _cached_locations():
db = backend_db()
for k in db.keys():
if k.startswith('opsdb:location:'):
yield k
db = backend_db()
for k in list(_cached_locations()):
del db[k]
assert len(list(_cached_locations())) == 0 # sanity
worker.update_equipment_locations()
assert len(list(_cached_locations())) > 0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment