"""Definition of the decorator that schedules tasks in GSO that are to run periodically.""" import inspect from collections.abc import Callable from functools import wraps from typing import Any from celery import current_app from celery.schedules import crontab from pydantic import BaseModel class CronScheduleConfig(BaseModel): """Configuration for scheduling a task using crontab-like timing parameters.""" name: str minute: str = "*" hour: str = "*" day_of_week: str = "*" day_of_month: str = "*" month_of_year: str = "*" def scheduler(cron_scheduler_config: CronScheduleConfig) -> Callable[[Callable], Callable]: """Schedule a Celery task using crontab-like timing. Examples: - `minute='*/15'`: Run every 15 minutes. - `hour='*/3'`: Run every 3 hours. - `day_of_week='mon-fri'`: Run on weekdays only. - `day_of_month='1-7,15-21'`: Run on the first and third weeks of the month. - `month_of_year='*/3'`: Run on the first month of each quarter. All time units can be specified with lists of numbers or crontab pattern strings for advanced scheduling. All specified time parts (minute, hour, day, etc.) must align for a task to run. """ def decorator(task_func: Callable) -> Callable: @wraps(task_func) def scheduled_task(*args: Any, **kwargs: Any) -> Any: return task_func(*args, **kwargs) module = inspect.getmodule(task_func) if module is None: msg = f"Module for the task function {task_func.__name__} could not be found." raise ValueError(msg) task_path = f"{module.__name__}.{task_func.__name__}" current_app.conf.beat_schedule[task_func.__name__] = { "name": cron_scheduler_config.name, "task": task_path, "schedule": crontab( minute=cron_scheduler_config.minute, hour=cron_scheduler_config.hour, day_of_month=cron_scheduler_config.day_of_month, month_of_year=cron_scheduler_config.month_of_year, day_of_week=cron_scheduler_config.day_of_week, ), } return scheduled_task return decorator