diff --git a/gso/schedules/validate_subscriptions.py b/gso/schedules/validate_subscriptions.py index 0ac4e36f655fda58f108967095123ef17f4f90dc..7323b6086b948201da4a0f5b0ee1574030646076 100644 --- a/gso/schedules/validate_subscriptions.py +++ b/gso/schedules/validate_subscriptions.py @@ -10,11 +10,11 @@ From this list, each workflow is selected that meets the following: import structlog from celery import shared_task from orchestrator.services.processes import get_execution_context -from orchestrator.services.subscriptions import TARGET_DEFAULT_USABLE_MAP +from orchestrator.services.subscriptions import TARGET_DEFAULT_USABLE_MAP, WF_USABLE_WHILE_OUT_OF_SYNC from orchestrator.targets import Target from gso.schedules.scheduling import CronScheduleConfig, scheduler -from gso.services.subscriptions import get_active_insync_subscriptions +from gso.services.subscriptions import get_active_subscriptions logger = structlog.get_logger(__name__) @@ -22,8 +22,16 @@ logger = structlog.get_logger(__name__) @shared_task @scheduler(CronScheduleConfig(name="Subscriptions Validator", minute="10", hour="3")) def validate_subscriptions() -> None: - """Validate all subscriptions using their corresponding validation workflow.""" - subscriptions = get_active_insync_subscriptions() + """Validate all subscriptions using their corresponding validation workflow. + + Validation workflows only run on subscriptions that are active, even when they could be run on provisioning + subscriptions. E.g. for routers, they can manually be validated when provisioning, but are not included in this + schedule. + + Validation workflows will, in principle, only run on subscriptions that are in sync. Except when a specific + validation workflow is marked as usable while out of sync in the list `WF_USABLE_WHILE_OUT_OF_SYNC`. + """ + subscriptions = get_active_subscriptions() if not subscriptions: logger.info("No subscriptions to validate") return @@ -35,18 +43,18 @@ def validate_subscriptions() -> None: if workflow.target == Target.SYSTEM and workflow.name.startswith("validate_"): validation_workflow = workflow.name - if validation_workflow: - # Validation workflows only run on subscriptions that are active, even when they could be run on - # provisioning subscriptions. E.g. for routers, they can manually be validated when provisioning, but are - # not included in this schedule. - usable_when = TARGET_DEFAULT_USABLE_MAP[Target.SYSTEM] + if validation_workflow: + validation_workflow_usable = (subscription.status in TARGET_DEFAULT_USABLE_MAP[Target.SYSTEM]) and ( + subscription.insync or (workflow in WF_USABLE_WHILE_OUT_OF_SYNC) + ) + + if validation_workflow_usable: + json = [{"subscription_id": str(subscription.subscription_id)}] - if subscription.status in usable_when: - json = [{"subscription_id": str(subscription.subscription_id)}] + validate_func = get_execution_context()["validate"] + validate_func(validation_workflow, json=json) - validate_func = get_execution_context()["validate"] - validate_func(validation_workflow, json=json) - else: + if not validation_workflow: logger.warning( "SubscriptionTable has no validation workflow", subscription=subscription, diff --git a/gso/services/subscriptions.py b/gso/services/subscriptions.py index 6ae731410e60b1a60e0a80b3b5cd11508e101aab..93e849d892144299f932467501e8d82b1805ef89 100644 --- a/gso/services/subscriptions.py +++ b/gso/services/subscriptions.py @@ -343,11 +343,11 @@ def get_subscription_by_process_id(process_id: str) -> SubscriptionModel | None: return SubscriptionModel.from_subscription(subscription_table.subscription_id) if subscription_table else None -def get_active_insync_subscriptions() -> list[SubscriptionTable]: - """Retrieve all subscriptions that are currently active and in sync.""" +def get_active_subscriptions() -> list[SubscriptionTable]: + """Retrieve all subscriptions that are currently active.""" return ( SubscriptionTable.query.join(ProductTable) - .filter(SubscriptionTable.insync.is_(True), SubscriptionTable.status == SubscriptionLifecycle.ACTIVE.value) + .filter(SubscriptionTable.status == SubscriptionLifecycle.ACTIVE) .all() ) diff --git a/test/schedules/test_scheduling.py b/test/schedules/test_scheduling.py index 93f1b5d3326719c4faecc3485483402a848a0875..a1eb56b48a4d0023c527635701ba85dbfa5a4bab 100644 --- a/test/schedules/test_scheduling.py +++ b/test/schedules/test_scheduling.py @@ -14,8 +14,8 @@ def validate_subscriptions(): @pytest.fixture() -def mock_get_active_insync_subscriptions(): - with patch("gso.schedules.validate_subscriptions.get_active_insync_subscriptions") as mock: +def mock_get_active_subscriptions(): + with patch("gso.schedules.validate_subscriptions.get_active_subscriptions") as mock: yield mock @@ -82,24 +82,24 @@ def test_scheduled_task_still_works(): assert result == "task result" -def test_no_subscriptions(mock_get_active_insync_subscriptions, mock_logger, validate_subscriptions): - mock_get_active_insync_subscriptions.return_value = [] +def test_no_subscriptions(mock_get_active_subscriptions, mock_logger, validate_subscriptions): + mock_get_active_subscriptions.return_value = [] validate_subscriptions() mock_logger.info.assert_called_once_with("No subscriptions to validate") def test_subscriptions_without_system_target_workflow( - mock_get_active_insync_subscriptions, + mock_get_active_subscriptions, mock_logger, validate_subscriptions, ): - mock_get_active_insync_subscriptions.return_value = [MagicMock(product=MagicMock(workflows=[]))] + mock_get_active_subscriptions.return_value = [MagicMock(product=MagicMock(workflows=[]))] validate_subscriptions() mock_logger.warning.assert_called_once() def test_subscription_status_not_usable( - mock_get_active_insync_subscriptions, + mock_get_active_subscriptions, mock_get_execution_context, validate_subscriptions, ): @@ -107,7 +107,7 @@ def test_subscription_status_not_usable( subscription_mock.product.workflows = [MagicMock(target=Target.SYSTEM, name="workflow_name")] subscription_mock.status = "Not Usable Status" - mock_get_active_insync_subscriptions.return_value = [subscription_mock] + mock_get_active_subscriptions.return_value = [subscription_mock] validate_subscriptions() validate_func = mock_get_execution_context()["validate"] @@ -115,7 +115,7 @@ def test_subscription_status_not_usable( def test_valid_subscriptions_for_validation( - mock_get_active_insync_subscriptions, + mock_get_active_subscriptions, mock_get_execution_context, validate_subscriptions, ): @@ -123,7 +123,7 @@ def test_valid_subscriptions_for_validation( mocked_workflow = MagicMock(target=Target.SYSTEM, name="workflow_name") subscription_mock.product.workflows = [mocked_workflow] subscription_mock.status = "active" - mock_get_active_insync_subscriptions.return_value = [subscription_mock] + mock_get_active_subscriptions.return_value = [subscription_mock] validate_subscriptions() validate_func = mock_get_execution_context()["validate"] validate_func.assert_called_once_with(