Skip to content
Snippets Groups Projects
Commit 410b8de5 authored by Mohammad Torkashvand's avatar Mohammad Torkashvand
Browse files

add celery as EXECUTOR

parent 19aaac4f
No related branches found
No related tags found
1 merge request!280add celery as EXECUTOR
Pipeline #89482 passed
......@@ -4,9 +4,12 @@ import os
import sentry_sdk
import typer
from celery import Celery
from orchestrator import OrchestratorCore, app_settings
from orchestrator.cli.main import app as cli_app
from orchestrator.graphql import SCALAR_OVERRIDES
from orchestrator.services.tasks import initialise_celery
from orchestrator.settings import ExecutorType
# noinspection PyUnresolvedReferences
import gso.products
......@@ -28,6 +31,20 @@ def init_gso_app() -> OrchestratorCore:
app.register_graphql_authorization(graphql_opa_instance)
app.register_graphql()
app.include_router(api_router, prefix="/api")
if app_settings.EXECUTOR == ExecutorType.WORKER:
config = load_oss_params()
celery = Celery(
"geant-service-orchestrator",
broker=config.CELERY.broker_url,
backend=config.CELERY.result_backend,
include=["orchestrator.services.tasks"],
)
celery.conf.update(
result_expires=config.CELERY.result_expires,
)
initialise_celery(celery)
return app
......
"""Module that sets up :term:`GSO` as a Celery worker. This will allow for the scheduling of regular task workflows."""
from typing import Any
from uuid import UUID
from celery import Celery
from celery.signals import setup_logging, worker_shutting_down
from nwastdlib.logging import initialise_logging
from orchestrator import app_settings
from orchestrator.db import init_database
from orchestrator.domain import SUBSCRIPTION_MODEL_REGISTRY
from orchestrator.log_config import LOGGER_OVERRIDES, logger_config
from orchestrator.services.tasks import initialise_celery
from orchestrator.types import BroadcastFunc
from orchestrator.websocket import broadcast_process_update_to_websocket, init_websocket_manager
from orchestrator.websocket.websocket_manager import WebSocketManager
from orchestrator.workflows import ALL_WORKFLOWS
from structlog import get_logger
from gso import init_worker_app
from gso.settings import load_oss_params
logger = get_logger(__name__)
LOGGER_OVERRIDES_CELERY = LOGGER_OVERRIDES | dict([
logger_config("celery"),
logger_config("kombu"),
])
@setup_logging.connect # type: ignore[misc]
def on_setup_logging(**kwargs: Any) -> None: # noqa: ARG001
"""Set up logging for the Celery worker."""
initialise_logging(additional_loggers=LOGGER_OVERRIDES_CELERY)
class OrchestratorCelery(Celery):
def process_broadcast_fn(process_id: UUID) -> None:
"""Broadcast process update to WebSocket."""
# Catch all exceptions as broadcasting failure is noncritical to workflow completion
try:
broadcast_process_update_to_websocket(process_id)
except Exception as e:
logger.exception(e) # noqa: TRY401
class OrchestratorWorker(Celery):
"""A :term:`GSO` instance that functions as a Celery worker."""
def on_init(self) -> None: # noqa: PLR6301
websocket_manager: WebSocketManager
process_broadcast_fn: BroadcastFunc
def on_init(self) -> None:
"""Initialise a new Celery worker."""
init_database(app_settings)
# Prepare the wrapped_websocket_manager
# Note: cannot prepare the redis connections here as broadcasting is async
self.websocket_manager = init_websocket_manager(app_settings)
self.process_broadcast_fn = process_broadcast_fn
# Load the products and load the workflows
import gso.products # noqa: PLC0415
import gso.workflows # noqa: PLC0415,F401
logger.info(
"Loaded the workflows and products",
workflows=len(ALL_WORKFLOWS.values()),
products=len(SUBSCRIPTION_MODEL_REGISTRY.values()),
)
init_worker_app()
def close(self) -> None:
"""Close Celery worker cleanly."""
super().close()
settings = load_oss_params()
celery = OrchestratorCelery(
"worker",
celery = OrchestratorWorker(
"geant-service-orchestrator-worker",
broker=settings.CELERY.broker_url,
backend=settings.CELERY.result_backend,
include=[
......@@ -26,8 +87,27 @@ celery = OrchestratorCelery(
"gso.schedules.validate_subscriptions",
"gso.schedules.send_email_notifications",
"gso.schedules.clean_old_tasks",
"orchestrator.services.tasks",
],
)
celery.conf.update(result_expires=settings.CELERY.result_expires)
celery.conf.update(redbeat_redis_url=settings.CELERY.broker_url)
if app_settings.TESTING:
celery.conf.update(backend=settings.CELERY.result_backend, task_ignore_result=False)
else:
celery.conf.update(task_ignore_result=True)
celery.conf.update(
result_expires=settings.CELERY.result_expires,
worker_prefetch_multiplier=1,
worker_send_task_event=True,
task_send_sent_event=True,
redbeat_redis_url=settings.CELERY.broker_url,
)
initialise_celery(celery)
@worker_shutting_down.connect # type: ignore[misc]
def worker_shutting_down_handler(sig, how, exitcode, **kwargs) -> None: # type: ignore[no-untyped-def] # noqa: ARG001
"""Handle the Celery worker shutdown event."""
celery.close()
......@@ -4,4 +4,4 @@ set -o errexit
set -o nounset
cd /app
python -m celery -A gso.worker worker --loglevel=info --concurrency=1 --pool=solo
python -m celery -A gso.worker worker --loglevel=info --concurrency=1 --pool=solo --queues=new_tasks,resume_tasks,new_workflows,resume_workflows
......@@ -10,6 +10,8 @@ passenv = DATABASE_URI_TEST,SKIP_ALL_TESTS,ENVIRONMENT_IGNORE_MUTATION_DISABLED
setenv =
OAUTH2_ACTIVE = False
TRANSLATIONS_DIR = ./gso/translations
TESTING=true
EXECUTOR=threadpool
deps =
coverage
-r requirements.txt
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment