Newer
Older
"""Module that sets up :term:`GSO` as a Celery worker. This will allow for the scheduling of regular task workflows."""
from celery import Celery
from gso import init_worker_app
from gso.settings import load_oss_params
class OrchestratorCelery(Celery):
"""A :term:`GSO` instance that functions as a Celery worker."""
def on_init(self) -> None: # noqa: PLR6301
"""Initialise a new Celery worker."""
init_worker_app()
settings = load_oss_params()
celery = OrchestratorCelery(
"worker",
broker=settings.CELERY.broker_url,
backend=settings.CELERY.result_backend,
include=[
"gso.schedules.task_vacuum",
"gso.schedules.validate_products",
"gso.schedules.validate_subscriptions",
],
)
celery.conf.update(result_expires=settings.CELERY.result_expires)
celery.conf.update(redbeat_redis_url=settings.CELERY.broker_url)