Skip to content
Snippets Groups Projects
validate_subscriptions.py 1.70 KiB
"""Scheduled task that runs a validation workflow for all active subscriptions."""

import structlog
from orchestrator.services.processes import get_execution_context
from orchestrator.services.subscriptions import TARGET_DEFAULT_USABLE_MAP, WF_USABLE_MAP
from orchestrator.targets import Target

from gso.schedules.scheduling import CronScheduleConfig, scheduler
from gso.services.subscriptions import get_insync_subscriptions
from gso.worker import celery

logger = structlog.get_logger(__name__)


@celery.task
@scheduler(CronScheduleConfig(name="Subscriptions Validator", minute="10", hour="0"))
def validate_subscriptions() -> None:
    """Validate all subscriptions using their corresponding validation workflow."""
    subscriptions = get_insync_subscriptions()
    if not subscriptions:
        logger.info("No subscriptions to validate")
        return

    for subscription in subscriptions:
        validation_workflow = None

        for workflow in subscription.product.workflows:
            if workflow.target == Target.SYSTEM:
                validation_workflow = workflow.name

        if validation_workflow:
            default = TARGET_DEFAULT_USABLE_MAP[Target.SYSTEM]
            usable_when = WF_USABLE_MAP.get(validation_workflow, default)

            if subscription.status in usable_when:
                json = [{"subscription_id": str(subscription.subscription_id)}]

                validate_func = get_execution_context()["validate"]
                validate_func(validation_workflow, json=json)
        else:
            logger.warning(
                "SubscriptionTable has no validation workflow",
                subscription=subscription,
                product=subscription.product.name,
            )