from unittest.mock import MagicMock, patch import pytest from orchestrator.targets import Target from gso.schedules.scheduling import CronScheduleConfig, scheduler @pytest.fixture(scope="module") def validate_subscriptions(): from gso.schedules.validate_subscriptions import validate_subscriptions as vs return vs @pytest.fixture() def mock_get_active_insync_subscriptions(): with patch("gso.schedules.validate_subscriptions.get_active_insync_subscriptions") as mock: yield mock @pytest.fixture() def mock_get_execution_context(): with patch("gso.schedules.validate_subscriptions.get_execution_context") as mock: mock.return_value = {"validate": MagicMock()} yield mock @pytest.fixture() def mock_logger(): with patch("gso.schedules.validate_subscriptions.logger") as mock: yield mock @pytest.fixture() def mock_celery(): with patch("gso.schedules.scheduling.current_app") as mock_app: yield mock_app def test_scheduler_updates_beat_schedule(mock_celery): mock_celery.conf.beat_schedule = {} @scheduler( CronScheduleConfig( name="A cool task", minute="0", hour="0", day_of_week="*", day_of_month="*", month_of_year="*", ) ) def mock_task(): return "task result" assert "mock_task" in mock_celery.conf.beat_schedule scheduled = mock_celery.conf.beat_schedule["mock_task"] assert scheduled["schedule"].minute == {0} assert scheduled["schedule"].hour == {0} assert scheduled["task"] == "test.schedules.test_scheduling.mock_task" assert scheduled["name"] == "A cool task" def test_scheduled_task_still_works(): """Ensure that the scheduler decorator does not change the behavior of the function it decorates.""" @scheduler( CronScheduleConfig( name="A cool task", minute="0", hour="0", day_of_week="*", day_of_month="*", month_of_year="*", ) ) def mock_task(): return "task result" result = mock_task() assert result == "task result" def test_no_subscriptions(mock_get_active_insync_subscriptions, mock_logger, validate_subscriptions): mock_get_active_insync_subscriptions.return_value = [] validate_subscriptions() mock_logger.info.assert_called_once_with("No subscriptions to validate") def test_subscriptions_without_system_target_workflow( mock_get_active_insync_subscriptions, mock_logger, validate_subscriptions, ): mock_get_active_insync_subscriptions.return_value = [MagicMock(product=MagicMock(workflows=[]))] validate_subscriptions() mock_logger.warning.assert_called_once() def test_subscription_status_not_usable( mock_get_active_insync_subscriptions, mock_get_execution_context, validate_subscriptions, ): subscription_mock = MagicMock() subscription_mock.product.workflows = [MagicMock(target=Target.SYSTEM, name="workflow_name")] subscription_mock.status = "Not Usable Status" mock_get_active_insync_subscriptions.return_value = [subscription_mock] validate_subscriptions() validate_func = mock_get_execution_context()["validate"] validate_func.assert_not_called() def test_valid_subscriptions_for_validation( mock_get_active_insync_subscriptions, mock_get_execution_context, validate_subscriptions, ): subscription_mock = MagicMock() mocked_workflow = MagicMock(target=Target.SYSTEM, name="workflow_name") subscription_mock.product.workflows = [mocked_workflow] subscription_mock.status = "active" mock_get_active_insync_subscriptions.return_value = [subscription_mock] validate_subscriptions() validate_func = mock_get_execution_context()["validate"] validate_func.assert_called_once_with( mocked_workflow.name, json=[{"subscription_id": str(subscription_mock.subscription_id)}], )