Skip to content
Snippets Groups Projects
test_scheduling.py 3.53 KiB
from unittest.mock import MagicMock, patch

import pytest
from orchestrator.targets import Target

from gso.schedules.scheduling import scheduler


@pytest.fixture(scope="module")
def validate_subscriptions():
    from gso.schedules.validate_subscriptions import validate_subscriptions as vs

    return vs


@pytest.fixture
def mock_get_insync_subscriptions():
    with patch("gso.schedules.validate_subscriptions.get_insync_subscriptions") as mock:
        yield mock


@pytest.fixture
def mock_get_execution_context():
    with patch("gso.schedules.validate_subscriptions.get_execution_context") as mock:
        mock.return_value = {"validate": MagicMock()}
        yield mock


@pytest.fixture
def mock_logger():
    with patch("gso.schedules.validate_subscriptions.logger") as mock:
        yield mock


@pytest.fixture
def mock_celery():
    with patch("gso.schedules.scheduling.current_app") as mock_app:
        yield mock_app


def test_scheduler_updates_beat_schedule(mock_celery):
    mock_celery.conf.beat_schedule = {}

    @scheduler(name="A cool task", minute="0", hour="0", day_of_week="*", day_of_month="*", month_of_year="*")
    def mock_task():
        return "task result"

    assert "mock_task" in mock_celery.conf.beat_schedule
    scheduled = mock_celery.conf.beat_schedule["mock_task"]
    assert scheduled["schedule"].minute == {0}
    assert scheduled["schedule"].hour == {0}
    assert scheduled["task"] == "test.schedules.test_scheduling.mock_task"
    assert scheduled["name"] == "A cool task"


def test_scheduled_task_still_works():
    """Ensure that the scheduler decorator does not change the behavior of the function it decorates."""

    @scheduler(name="A cool task", minute="0", hour="0", day_of_week="*", day_of_month="*", month_of_year="*")
    def mock_task():
        return "task result"

    result = mock_task()
    assert result == "task result"


def test_no_subscriptions(mock_get_insync_subscriptions, mock_logger, validate_subscriptions):
    mock_get_insync_subscriptions.return_value = []
    validate_subscriptions()
    mock_logger.info.assert_called_once_with("No subscriptions to validate")


def test_subscriptions_without_system_target_workflow(
    mock_get_insync_subscriptions, mock_logger, validate_subscriptions
):
    mock_get_insync_subscriptions.return_value = [MagicMock(product=MagicMock(workflows=[]))]
    validate_subscriptions()
    mock_logger.warning.assert_called_once()


def test_subscription_status_not_usable(
    mock_get_insync_subscriptions, mock_get_execution_context, validate_subscriptions
):
    subscription_mock = MagicMock()
    subscription_mock.product.workflows = [MagicMock(target=Target.SYSTEM, name="workflow_name")]
    subscription_mock.status = "Not Usable Status"

    mock_get_insync_subscriptions.return_value = [subscription_mock]
    validate_subscriptions()

    validate_func = mock_get_execution_context()["validate"]
    validate_func.assert_not_called()


def test_valid_subscriptions_for_validation(
    mock_get_insync_subscriptions, mock_get_execution_context, validate_subscriptions
):
    subscription_mock = MagicMock()
    mocked_workflow = MagicMock(target=Target.SYSTEM, name="workflow_name")
    subscription_mock.product.workflows = [mocked_workflow]
    subscription_mock.status = "active"
    mock_get_insync_subscriptions.return_value = [subscription_mock]
    validate_subscriptions()
    validate_func = mock_get_execution_context()["validate"]
    validate_func.assert_called_once_with(
        mocked_workflow.name, json=[{"subscription_id": str(subscription_mock.subscription_id)}]
    )