Skip to content
Snippets Groups Projects

add celery as EXECUTOR

Merged Mohammad Torkashvand requested to merge feature/add-celery-executor into develop
All threads resolved!
4 files
+ 112
8
Compare changes
  • Side-by-side
  • Inline
Files
4
+ 23
0
@@ -4,9 +4,12 @@ import os
import sentry_sdk
import typer
from celery import Celery
from orchestrator import OrchestratorCore, app_settings
from orchestrator.cli.main import app as cli_app
from orchestrator.graphql import SCALAR_OVERRIDES
from orchestrator.services.tasks import initialise_celery
from orchestrator.settings import ExecutorType
# noinspection PyUnresolvedReferences
import gso.products
@@ -20,6 +23,12 @@ from gso.settings import load_oss_params
SCALAR_OVERRIDES.update(GSO_SCALAR_OVERRIDES)
def gso_initialise_celery(celery: Celery) -> None:
"""Initialise the :term:`Celery` app."""
initialise_celery(celery)
celery.conf.task_routes = {}
def init_gso_app() -> OrchestratorCore:
"""Initialise the :term:`GSO` app."""
app = OrchestratorCore(base_settings=app_settings)
@@ -28,6 +37,20 @@ def init_gso_app() -> OrchestratorCore:
app.register_graphql_authorization(graphql_opa_instance)
app.register_graphql()
app.include_router(api_router, prefix="/api")
if app_settings.EXECUTOR == ExecutorType.WORKER:
config = load_oss_params()
celery = Celery(
"geant-service-orchestrator",
broker=config.CELERY.broker_url,
backend=config.CELERY.result_backend,
include=["orchestrator.services.tasks"],
)
celery.conf.update(
result_expires=config.CELERY.result_expires,
)
gso_initialise_celery(celery)
return app
Loading