-
Mohammad Torkashvand authored
Added celery beat Added workflow, task validation schedulling
Mohammad Torkashvand authoredAdded celery beat Added workflow, task validation schedulling
validate_subscriptions.py 1.37 KiB
import structlog
from orchestrator.services.processes import get_execution_context
from orchestrator.services.subscriptions import TARGET_DEFAULT_USABLE_MAP, WF_USABLE_MAP
from orchestrator.targets import Target
from gso.schedules.scheduling import scheduler
from gso.services.subscriptions import get_insync_subscriptions
from gso.worker import celery
logger = structlog.get_logger(__name__)
@celery.task
@scheduler(name="Subscriptions Validator", minute="10", hour="0")
def validate_subscriptions() -> None:
for subscription in get_insync_subscriptions():
validation_workflow = None
for workflow in subscription.product.workflows:
if workflow.target == Target.SYSTEM:
validation_workflow = workflow.name
if validation_workflow:
default = TARGET_DEFAULT_USABLE_MAP[Target.SYSTEM]
usable_when = WF_USABLE_MAP.get(validation_workflow, default)
if subscription.status in usable_when:
json = [{"subscription_id": str(subscription.subscription_id)}]
validate_func = get_execution_context()["validate"]
validate_func(validation_workflow, json=json)
else:
logger.warning(
"SubscriptionTable has no validation workflow",
subscription=subscription,
product=subscription.product.name,
)