Skip to content
Snippets Groups Projects
Select Git revision
  • 998db1823ae52237c69c68ed4075bf2522ada37e
  • develop default
  • master protected
  • feature/frontend-tests
  • 0.106
  • 0.105
  • 0.104
  • 0.103
  • 0.102
  • 0.101
  • 0.100
  • 0.99
  • 0.98
  • 0.97
  • 0.96
  • 0.95
  • 0.94
  • 0.93
  • 0.92
  • 0.91
  • 0.90
  • 0.89
  • 0.88
  • 0.87
24 results

conftest.py

Blame
  • worker.py 937 B
    """Module that sets up :term:`GSO` as a Celery worker. This will allow for the scheduling of regular task workflows."""
    
    from celery import Celery
    
    from gso import init_worker_app
    from gso.settings import load_oss_params
    
    
    class OrchestratorCelery(Celery):
        """A :term:`GSO` instance that functions as a Celery worker."""
    
        def on_init(self) -> None:  # noqa: PLR6301
            """Initialise a new Celery worker."""
            init_worker_app()
    
    
    settings = load_oss_params()
    
    celery = OrchestratorCelery(
        "worker",
        broker=settings.CELERY.broker_url,
        backend=settings.CELERY.result_backend,
        include=[
            "gso.schedules.task_vacuum",
            "gso.schedules.validate_products",
            "gso.schedules.validate_subscriptions",
            "gso.schedules.send_email_notifications",
        ],
    )
    
    celery.conf.update(result_expires=settings.CELERY.result_expires)
    celery.conf.update(redbeat_redis_url=settings.CELERY.broker_url)