Skip to content
Snippets Groups Projects
Commit ffd1e45d authored by Erik Reid's avatar Erik Reid
Browse files

first celery worker test

parent fecbb4b4
No related branches found
No related tags found
No related merge requests found
import logging
import pytest
from inventory_provider.tasks import worker
class MockedRedis(object):
db = {}
def __init__(self, *args, **kwargs):
pass
# def set(self, key, value):
# MockedRedis.db[key] = value
def hset(self, name, key, value):
MockedRedis.db.setdefault(name, {})[key] = value
# def hget(self, key, field):
# value = MockedRedis.db[key]
# return json.dumps(value[field]).encode('utf-8')
#
# def hgetall(self, key):
# result = {}
# for k, v in MockedRedis.db[key].items():
# result[k.encode('utf-8')] \
# = json.dumps(v).encode('utf-8')
# return result
#
# def keys(self, *args, **kwargs):
# return list([k.encode("utf-8") for k in MockedRedis.db.keys()])
@pytest.fixture
def mocked_worker_module(mocker, data_config):
worker.InventoryTask.config = data_config
worker.InventoryTask.logger = logging.getLogger()
mocker.patch(
'inventory_provider.tasks.worker.redis.StrictRedis',
MockedRedis)
def test_juniper_refresh_bgp(mocked_worker_module, mocker, cached_test_data):
def _mocked_fetch_bpg_config(hostname, _):
return cached_test_data[hostname]["bgp"]
mocker.patch(
'inventory_provider.tasks.worker.juniper.fetch_bgp_config',
_mocked_fetch_bpg_config)
for hostname in cached_test_data.keys():
worker.juniper_refresh_bgp(hostname)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment