scheduling.py 2.18 KiB
"""Definition of the decorator that schedules tasks in GSO that are to run periodically."""
import inspect
from collections.abc import Callable
from functools import wraps
from typing import Any
from celery import current_app
from celery.schedules import crontab
from pydantic import BaseModel
class CronScheduleConfig(BaseModel):
"""Configuration for scheduling a task using crontab-like timing parameters."""
name: str
minute: str = "*"
hour: str = "*"
day_of_week: str = "*"
day_of_month: str = "*"
month_of_year: str = "*"
def scheduler(cron_scheduler_config: CronScheduleConfig) -> Callable[[Callable], Callable]:
"""Schedule a Celery task using crontab-like timing.
Examples:
- `minute='*/15'`: Run every 15 minutes.
- `hour='*/3'`: Run every 3 hours.
- `day_of_week='mon-fri'`: Run on weekdays only.
- `day_of_month='1-7,15-21'`: Run on the first and third weeks of the month.
- `month_of_year='*/3'`: Run on the first month of each quarter.
All time units can be specified with lists of numbers or crontab pattern strings for advanced scheduling.
All specified time parts (minute, hour, day, etc.) must align for a task to run.
"""
def decorator(task_func: Callable) -> Callable:
@wraps(task_func)
def scheduled_task(*args: Any, **kwargs: Any) -> Any:
return task_func(*args, **kwargs)
module = inspect.getmodule(task_func)
if module is None:
msg = f"Module for the task function {task_func.__name__} could not be found."
raise ValueError(msg)
task_path = f"{module.__name__}.{task_func.__name__}"
current_app.conf.beat_schedule[task_func.__name__] = {
"name": cron_scheduler_config.name,
"task": task_path,
"schedule": crontab(
minute=cron_scheduler_config.minute,
hour=cron_scheduler_config.hour,
day_of_month=cron_scheduler_config.day_of_month,
month_of_year=cron_scheduler_config.month_of_year,
day_of_week=cron_scheduler_config.day_of_week,
),
}
return scheduled_task
return decorator