-
Mohammad Torkashvand authoredMohammad Torkashvand authored
test_scheduling.py 3.53 KiB
from unittest.mock import MagicMock, patch
import pytest
from orchestrator.targets import Target
from gso.schedules.scheduling import scheduler
@pytest.fixture(scope="module")
def validate_subscriptions():
from gso.schedules.validate_subscriptions import validate_subscriptions as vs
return vs
@pytest.fixture
def mock_get_insync_subscriptions():
with patch("gso.schedules.validate_subscriptions.get_insync_subscriptions") as mock:
yield mock
@pytest.fixture
def mock_get_execution_context():
with patch("gso.schedules.validate_subscriptions.get_execution_context") as mock:
mock.return_value = {"validate": MagicMock()}
yield mock
@pytest.fixture
def mock_logger():
with patch("gso.schedules.validate_subscriptions.logger") as mock:
yield mock
@pytest.fixture
def mock_celery():
with patch("gso.schedules.scheduling.current_app") as mock_app:
yield mock_app
def test_scheduler_updates_beat_schedule(mock_celery):
mock_celery.conf.beat_schedule = {}
@scheduler(name="A cool task", minute="0", hour="0", day_of_week="*", day_of_month="*", month_of_year="*")
def mock_task():
return "task result"
assert "mock_task" in mock_celery.conf.beat_schedule
scheduled = mock_celery.conf.beat_schedule["mock_task"]
assert scheduled["schedule"].minute == {0}
assert scheduled["schedule"].hour == {0}
assert scheduled["task"] == "test.schedules.test_scheduling.mock_task"
assert scheduled["name"] == "A cool task"
def test_scheduled_task_still_works():
"""Ensure that the scheduler decorator does not change the behavior of the function it decorates."""
@scheduler(name="A cool task", minute="0", hour="0", day_of_week="*", day_of_month="*", month_of_year="*")
def mock_task():
return "task result"
result = mock_task()
assert result == "task result"
def test_no_subscriptions(mock_get_insync_subscriptions, mock_logger, validate_subscriptions):
mock_get_insync_subscriptions.return_value = []
validate_subscriptions()
mock_logger.info.assert_called_once_with("No subscriptions to validate")
def test_subscriptions_without_system_target_workflow(
mock_get_insync_subscriptions, mock_logger, validate_subscriptions
):
mock_get_insync_subscriptions.return_value = [MagicMock(product=MagicMock(workflows=[]))]
validate_subscriptions()
mock_logger.warning.assert_called_once()
def test_subscription_status_not_usable(
mock_get_insync_subscriptions, mock_get_execution_context, validate_subscriptions
):
subscription_mock = MagicMock()
subscription_mock.product.workflows = [MagicMock(target=Target.SYSTEM, name="workflow_name")]
subscription_mock.status = "Not Usable Status"
mock_get_insync_subscriptions.return_value = [subscription_mock]
validate_subscriptions()
validate_func = mock_get_execution_context()["validate"]
validate_func.assert_not_called()
def test_valid_subscriptions_for_validation(
mock_get_insync_subscriptions, mock_get_execution_context, validate_subscriptions
):
subscription_mock = MagicMock()
mocked_workflow = MagicMock(target=Target.SYSTEM, name="workflow_name")
subscription_mock.product.workflows = [mocked_workflow]
subscription_mock.status = "active"
mock_get_insync_subscriptions.return_value = [subscription_mock]
validate_subscriptions()
validate_func = mock_get_execution_context()["validate"]
validate_func.assert_called_once_with(
mocked_workflow.name, json=[{"subscription_id": str(subscription_mock.subscription_id)}]
)