-
Karel van Klink authored
Fix an issue where validation workflows would run out-of-order, or in duplicate.
Karel van Klink authoredFix an issue where validation workflows would run out-of-order, or in duplicate.
validate_subscriptions.py 3.77 KiB
"""Scheduled task that runs a validation workflow for all active subscriptions.
The list of workflows that should be executed is determined by multiple criteria. First, this task gathers a list of all
active subscriptions that are in sync. For each subscription, the list of workflows attached to its product is fetched.
From this list, each workflow is selected that meets the following:
* The target of the workflow is `SYSTEM`.
* The name of the workflow follows the pattern `validate_*`.
"""
import structlog
from celery import shared_task
from orchestrator.services.processes import get_execution_context
from orchestrator.services.subscriptions import TARGET_DEFAULT_USABLE_MAP, WF_USABLE_WHILE_OUT_OF_SYNC
from orchestrator.targets import Target
from gso.schedules.scheduling import CronScheduleConfig, scheduler
from gso.services.subscriptions import get_active_subscriptions
logger = structlog.get_logger(__name__)
@shared_task
@scheduler(CronScheduleConfig(name="Subscriptions Validator", minute="10", hour="3"))
def validate_subscriptions() -> None:
"""Validate all subscriptions using their corresponding validation workflow.
Validation workflows only run on subscriptions that are active, even when they could be run on provisioning
subscriptions. E.g. for routers, they can manually be validated when provisioning, but are not included in this
schedule.
Validation workflows will, in principle, only run on subscriptions that are in sync. Except when a specific
validation workflow is marked as usable while out of sync in the list `WF_USABLE_WHILE_OUT_OF_SYNC`.
"""
subscriptions = get_active_subscriptions()
if not subscriptions:
logger.info("No subscriptions to validate")
return
for subscription in subscriptions:
found_a_validation_workflow = False
for workflow in subscription.product.workflows:
if workflow.target == Target.SYSTEM and workflow.name.startswith("validate_"):
validation_workflow = workflow.name
found_a_validation_workflow = True
validation_workflow_usable = (subscription.status in TARGET_DEFAULT_USABLE_MAP[Target.SYSTEM]) and (
subscription.insync or (validation_workflow in WF_USABLE_WHILE_OUT_OF_SYNC)
)
if validation_workflow_usable:
logger.info(
"Found a usable validation workflow, scheduling task.",
product=subscription.product.name,
subscription_id=subscription.subscription_id,
subscription_description=subscription.description,
workflow=validation_workflow,
)
json = [{"subscription_id": str(subscription.subscription_id)}]
validate_func = get_execution_context()["validate"]
validate_func(validation_workflow, json=json)
else:
logger.info(
"Validation workflow is not usable on this subscription instance",
product=subscription.product.name,
subscription_id=subscription.subscription_id,
subscription_description=subscription.description,
status=subscription.status,
insync=subscription.insync,
workflow=validation_workflow,
)
if not found_a_validation_workflow:
logger.warning(
"SubscriptionTable has no validation workflow",
product=subscription.product.name,
subscription_id=subscription.subscription_id,
subscription_description=subscription.description,
)