-
Mohammad Torkashvand authoredMohammad Torkashvand authored
validate_subscriptions.py 1.70 KiB
"""Scheduled task that runs a validation workflow for all active subscriptions."""
import structlog
from orchestrator.services.processes import get_execution_context
from orchestrator.services.subscriptions import TARGET_DEFAULT_USABLE_MAP, WF_USABLE_MAP
from orchestrator.targets import Target
from gso.schedules.scheduling import CronScheduleConfig, scheduler
from gso.services.subscriptions import get_insync_subscriptions
from gso.worker import celery
logger = structlog.get_logger(__name__)
@celery.task
@scheduler(CronScheduleConfig(name="Subscriptions Validator", minute="10", hour="0"))
def validate_subscriptions() -> None:
"""Validate all subscriptions using their corresponding validation workflow."""
subscriptions = get_insync_subscriptions()
if not subscriptions:
logger.info("No subscriptions to validate")
return
for subscription in subscriptions:
validation_workflow = None
for workflow in subscription.product.workflows:
if workflow.target == Target.SYSTEM:
validation_workflow = workflow.name
if validation_workflow:
default = TARGET_DEFAULT_USABLE_MAP[Target.SYSTEM]
usable_when = WF_USABLE_MAP.get(validation_workflow, default)
if subscription.status in usable_when:
json = [{"subscription_id": str(subscription.subscription_id)}]
validate_func = get_execution_context()["validate"]
validate_func(validation_workflow, json=json)
else:
logger.warning(
"SubscriptionTable has no validation workflow",
subscription=subscription,
product=subscription.product.name,
)