Skip to content
Snippets Groups Projects
Verified Commit 3db329d5 authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

Fix running validation workflows on all active subscriptions

Allow for running a validation workflow on a subscription that is out of sync, if the workflow is marked as able to run on one
Allow for running multiple validation workflows on one subscription
parent 271c4778
No related branches found
No related tags found
1 merge request!418Fix issues with validation workflow schedule
Pipeline #93976 passed
......@@ -10,11 +10,11 @@ From this list, each workflow is selected that meets the following:
import structlog
from celery import shared_task
from orchestrator.services.processes import get_execution_context
from orchestrator.services.subscriptions import TARGET_DEFAULT_USABLE_MAP
from orchestrator.services.subscriptions import TARGET_DEFAULT_USABLE_MAP, WF_USABLE_WHILE_OUT_OF_SYNC
from orchestrator.targets import Target
from gso.schedules.scheduling import CronScheduleConfig, scheduler
from gso.services.subscriptions import get_active_insync_subscriptions
from gso.services.subscriptions import get_active_subscriptions
logger = structlog.get_logger(__name__)
......@@ -22,8 +22,16 @@ logger = structlog.get_logger(__name__)
@shared_task
@scheduler(CronScheduleConfig(name="Subscriptions Validator", minute="10", hour="3"))
def validate_subscriptions() -> None:
"""Validate all subscriptions using their corresponding validation workflow."""
subscriptions = get_active_insync_subscriptions()
"""Validate all subscriptions using their corresponding validation workflow.
Validation workflows only run on subscriptions that are active, even when they could be run on provisioning
subscriptions. E.g. for routers, they can manually be validated when provisioning, but are not included in this
schedule.
Validation workflows will, in principle, only run on subscriptions that are in sync. Except when a specific
validation workflow is marked as usable while out of sync in the list `WF_USABLE_WHILE_OUT_OF_SYNC`.
"""
subscriptions = get_active_subscriptions()
if not subscriptions:
logger.info("No subscriptions to validate")
return
......@@ -35,18 +43,18 @@ def validate_subscriptions() -> None:
if workflow.target == Target.SYSTEM and workflow.name.startswith("validate_"):
validation_workflow = workflow.name
if validation_workflow:
# Validation workflows only run on subscriptions that are active, even when they could be run on
# provisioning subscriptions. E.g. for routers, they can manually be validated when provisioning, but are
# not included in this schedule.
usable_when = TARGET_DEFAULT_USABLE_MAP[Target.SYSTEM]
if validation_workflow:
validation_workflow_usable = (subscription.status in TARGET_DEFAULT_USABLE_MAP[Target.SYSTEM]) and (
subscription.insync or (workflow in WF_USABLE_WHILE_OUT_OF_SYNC)
)
if validation_workflow_usable:
json = [{"subscription_id": str(subscription.subscription_id)}]
if subscription.status in usable_when:
json = [{"subscription_id": str(subscription.subscription_id)}]
validate_func = get_execution_context()["validate"]
validate_func(validation_workflow, json=json)
validate_func = get_execution_context()["validate"]
validate_func(validation_workflow, json=json)
else:
if not validation_workflow:
logger.warning(
"SubscriptionTable has no validation workflow",
subscription=subscription,
......
......@@ -343,11 +343,11 @@ def get_subscription_by_process_id(process_id: str) -> SubscriptionModel | None:
return SubscriptionModel.from_subscription(subscription_table.subscription_id) if subscription_table else None
def get_active_insync_subscriptions() -> list[SubscriptionTable]:
"""Retrieve all subscriptions that are currently active and in sync."""
def get_active_subscriptions() -> list[SubscriptionTable]:
"""Retrieve all subscriptions that are currently active."""
return (
SubscriptionTable.query.join(ProductTable)
.filter(SubscriptionTable.insync.is_(True), SubscriptionTable.status == SubscriptionLifecycle.ACTIVE.value)
.filter(SubscriptionTable.status == SubscriptionLifecycle.ACTIVE)
.all()
)
......
......@@ -14,8 +14,8 @@ def validate_subscriptions():
@pytest.fixture()
def mock_get_active_insync_subscriptions():
with patch("gso.schedules.validate_subscriptions.get_active_insync_subscriptions") as mock:
def mock_get_active_subscriptions():
with patch("gso.schedules.validate_subscriptions.get_active_subscriptions") as mock:
yield mock
......@@ -82,24 +82,24 @@ def test_scheduled_task_still_works():
assert result == "task result"
def test_no_subscriptions(mock_get_active_insync_subscriptions, mock_logger, validate_subscriptions):
mock_get_active_insync_subscriptions.return_value = []
def test_no_subscriptions(mock_get_active_subscriptions, mock_logger, validate_subscriptions):
mock_get_active_subscriptions.return_value = []
validate_subscriptions()
mock_logger.info.assert_called_once_with("No subscriptions to validate")
def test_subscriptions_without_system_target_workflow(
mock_get_active_insync_subscriptions,
mock_get_active_subscriptions,
mock_logger,
validate_subscriptions,
):
mock_get_active_insync_subscriptions.return_value = [MagicMock(product=MagicMock(workflows=[]))]
mock_get_active_subscriptions.return_value = [MagicMock(product=MagicMock(workflows=[]))]
validate_subscriptions()
mock_logger.warning.assert_called_once()
def test_subscription_status_not_usable(
mock_get_active_insync_subscriptions,
mock_get_active_subscriptions,
mock_get_execution_context,
validate_subscriptions,
):
......@@ -107,7 +107,7 @@ def test_subscription_status_not_usable(
subscription_mock.product.workflows = [MagicMock(target=Target.SYSTEM, name="workflow_name")]
subscription_mock.status = "Not Usable Status"
mock_get_active_insync_subscriptions.return_value = [subscription_mock]
mock_get_active_subscriptions.return_value = [subscription_mock]
validate_subscriptions()
validate_func = mock_get_execution_context()["validate"]
......@@ -115,7 +115,7 @@ def test_subscription_status_not_usable(
def test_valid_subscriptions_for_validation(
mock_get_active_insync_subscriptions,
mock_get_active_subscriptions,
mock_get_execution_context,
validate_subscriptions,
):
......@@ -123,7 +123,7 @@ def test_valid_subscriptions_for_validation(
mocked_workflow = MagicMock(target=Target.SYSTEM, name="workflow_name")
subscription_mock.product.workflows = [mocked_workflow]
subscription_mock.status = "active"
mock_get_active_insync_subscriptions.return_value = [subscription_mock]
mock_get_active_subscriptions.return_value = [subscription_mock]
validate_subscriptions()
validate_func = mock_get_execution_context()["validate"]
validate_func.assert_called_once_with(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment