Skip to content
Snippets Groups Projects
validate_subscriptions.py 1.37 KiB
import structlog
from orchestrator.services.processes import get_execution_context
from orchestrator.services.subscriptions import TARGET_DEFAULT_USABLE_MAP, WF_USABLE_MAP
from orchestrator.targets import Target

from gso.schedules.scheduling import scheduler
from gso.services.subscriptions import get_insync_subscriptions
from gso.worker import celery

logger = structlog.get_logger(__name__)


@celery.task
@scheduler(name="Subscriptions Validator", minute="10", hour="0")
def validate_subscriptions() -> None:
    for subscription in get_insync_subscriptions():
        validation_workflow = None

        for workflow in subscription.product.workflows:
            if workflow.target == Target.SYSTEM:
                validation_workflow = workflow.name

        if validation_workflow:
            default = TARGET_DEFAULT_USABLE_MAP[Target.SYSTEM]
            usable_when = WF_USABLE_MAP.get(validation_workflow, default)

            if subscription.status in usable_when:
                json = [{"subscription_id": str(subscription.subscription_id)}]

                validate_func = get_execution_context()["validate"]
                validate_func(validation_workflow, json=json)
        else:
            logger.warning(
                "SubscriptionTable has no validation workflow",
                subscription=subscription,
                product=subscription.product.name,
            )