""" just checks that the worker methods call the right functions and some data ends up in the right place ... otherwise not very detailed """ import os from inventory_provider.tasks import worker from inventory_provider.tasks import common from inventory_provider.tasks.common import _get_redis def backend_db(): return _get_redis({ 'redis': { 'hostname': None, 'port': None }, 'redis-databases': [0, 7] }).db def test_update_locations(mocker, mocked_worker_module, mocked_redis): mocker.patch( 'inventory_provider.db.opsdb.lookup_pop_info', lambda c, h: [{'C': c, 'H': h}]) def _cached_locations(): db = backend_db() for k in db.keys(): if k.startswith('opsdb:location:'): yield k db = backend_db() for k in list(_cached_locations()): del db[k] assert len(list(_cached_locations())) == 0 # sanity worker.update_equipment_locations() assert len(list(_cached_locations())) > 0 def test_access_services(mocker, mocked_worker_module, mocked_redis): opsdb_get_access_services = mocker.patch( 'inventory_provider.db.opsdb.get_access_services') opsdb_get_access_services.return_value = [ {'name': 'some service name', 'a': 1, 'b': 2}, {'name': 'another service name', 'c': 2, 'd': 3} ] def _cached_locations(): db = backend_db() for k in db.keys(): if k.startswith('opsdb:access_services:'): yield k db = backend_db() for k in list(_cached_locations()): del db[k] assert len(list(_cached_locations())) == 0 # sanity worker.update_access_services() assert len(list(_cached_locations())) > 0 def test_InventoryTask_obj(data_config_filename): os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME'] = data_config_filename task = worker.InventoryTask() assert task.config def test_next_redis(data_config, mocked_redis): """ not a very meaningful test ... basically only for sanity & coverage :param data_config: :param mocked_redis: :return: """ common.set_latch(data_config, 10, 20) r = common.get_next_redis(data_config) assert r # there's only one ... latch = common.get_latch(r) assert latch['current'] == 10 assert latch['next'] == 20 def test_next_redis_with_none(data_config, mocked_redis): """ not a very meaningful test ... basically only for sanity & coverage :param data_config: :param mocked_redis: :return: """ r = common._get_redis(data_config) assert r del r.db['db:latch'] # cf. conftest:MockedRedis r = common.get_next_redis(data_config) assert r