From 410b8de54b793dd55fd4ce07c9d3195739ffc3c3 Mon Sep 17 00:00:00 2001
From: Mohammad Torkashvand <mohammad.torkashvand@geant.org>
Date: Wed, 25 Sep 2024 18:17:28 +0200
Subject: [PATCH] add celery as EXECUTOR

---
 gso/__init__.py | 17 +++++++++
 gso/worker.py   | 92 +++++++++++++++++++++++++++++++++++++++++++++----
 start-worker.sh |  2 +-
 tox.ini         |  2 ++
 4 files changed, 106 insertions(+), 7 deletions(-)

diff --git a/gso/__init__.py b/gso/__init__.py
index f1d1debd..2a4c56d7 100644
--- a/gso/__init__.py
+++ b/gso/__init__.py
@@ -4,9 +4,12 @@ import os
 
 import sentry_sdk
 import typer
+from celery import Celery
 from orchestrator import OrchestratorCore, app_settings
 from orchestrator.cli.main import app as cli_app
 from orchestrator.graphql import SCALAR_OVERRIDES
+from orchestrator.services.tasks import initialise_celery
+from orchestrator.settings import ExecutorType
 
 # noinspection PyUnresolvedReferences
 import gso.products
@@ -28,6 +31,20 @@ def init_gso_app() -> OrchestratorCore:
     app.register_graphql_authorization(graphql_opa_instance)
     app.register_graphql()
     app.include_router(api_router, prefix="/api")
+
+    if app_settings.EXECUTOR == ExecutorType.WORKER:
+        config = load_oss_params()
+        celery = Celery(
+            "geant-service-orchestrator",
+            broker=config.CELERY.broker_url,
+            backend=config.CELERY.result_backend,
+            include=["orchestrator.services.tasks"],
+        )
+        celery.conf.update(
+            result_expires=config.CELERY.result_expires,
+        )
+        initialise_celery(celery)
+
     return app
 
 
diff --git a/gso/worker.py b/gso/worker.py
index 300eb590..09c17a3e 100644
--- a/gso/worker.py
+++ b/gso/worker.py
@@ -1,23 +1,84 @@
 """Module that sets up :term:`GSO` as a Celery worker. This will allow for the scheduling of regular task workflows."""
 
+from typing import Any
+from uuid import UUID
+
 from celery import Celery
+from celery.signals import setup_logging, worker_shutting_down
+from nwastdlib.logging import initialise_logging
+from orchestrator import app_settings
+from orchestrator.db import init_database
+from orchestrator.domain import SUBSCRIPTION_MODEL_REGISTRY
+from orchestrator.log_config import LOGGER_OVERRIDES, logger_config
+from orchestrator.services.tasks import initialise_celery
+from orchestrator.types import BroadcastFunc
+from orchestrator.websocket import broadcast_process_update_to_websocket, init_websocket_manager
+from orchestrator.websocket.websocket_manager import WebSocketManager
+from orchestrator.workflows import ALL_WORKFLOWS
+from structlog import get_logger
 
 from gso import init_worker_app
 from gso.settings import load_oss_params
 
+logger = get_logger(__name__)
+
+LOGGER_OVERRIDES_CELERY = LOGGER_OVERRIDES | dict([
+    logger_config("celery"),
+    logger_config("kombu"),
+])
+
+
+@setup_logging.connect  # type: ignore[misc]
+def on_setup_logging(**kwargs: Any) -> None:  # noqa: ARG001
+    """Set up logging for the Celery worker."""
+    initialise_logging(additional_loggers=LOGGER_OVERRIDES_CELERY)
+
 
-class OrchestratorCelery(Celery):
+def process_broadcast_fn(process_id: UUID) -> None:
+    """Broadcast process update to WebSocket."""
+    # Catch all exceptions as broadcasting failure is noncritical to workflow completion
+    try:
+        broadcast_process_update_to_websocket(process_id)
+    except Exception as e:
+        logger.exception(e)  # noqa: TRY401
+
+
+class OrchestratorWorker(Celery):
     """A :term:`GSO` instance that functions as a Celery worker."""
 
-    def on_init(self) -> None:  # noqa: PLR6301
+    websocket_manager: WebSocketManager
+    process_broadcast_fn: BroadcastFunc
+
+    def on_init(self) -> None:
         """Initialise a new Celery worker."""
+        init_database(app_settings)
+
+        # Prepare the wrapped_websocket_manager
+        # Note: cannot prepare the redis connections here as broadcasting is async
+        self.websocket_manager = init_websocket_manager(app_settings)
+        self.process_broadcast_fn = process_broadcast_fn
+
+        # Load the products and load the workflows
+        import gso.products  # noqa: PLC0415
+        import gso.workflows  # noqa: PLC0415,F401
+
+        logger.info(
+            "Loaded the workflows and products",
+            workflows=len(ALL_WORKFLOWS.values()),
+            products=len(SUBSCRIPTION_MODEL_REGISTRY.values()),
+        )
+
         init_worker_app()
 
+    def close(self) -> None:
+        """Close Celery worker cleanly."""
+        super().close()
+
 
 settings = load_oss_params()
 
-celery = OrchestratorCelery(
-    "worker",
+celery = OrchestratorWorker(
+    "geant-service-orchestrator-worker",
     broker=settings.CELERY.broker_url,
     backend=settings.CELERY.result_backend,
     include=[
@@ -26,8 +87,27 @@ celery = OrchestratorCelery(
         "gso.schedules.validate_subscriptions",
         "gso.schedules.send_email_notifications",
         "gso.schedules.clean_old_tasks",
+        "orchestrator.services.tasks",
     ],
 )
 
-celery.conf.update(result_expires=settings.CELERY.result_expires)
-celery.conf.update(redbeat_redis_url=settings.CELERY.broker_url)
+if app_settings.TESTING:
+    celery.conf.update(backend=settings.CELERY.result_backend, task_ignore_result=False)
+else:
+    celery.conf.update(task_ignore_result=True)
+
+celery.conf.update(
+    result_expires=settings.CELERY.result_expires,
+    worker_prefetch_multiplier=1,
+    worker_send_task_event=True,
+    task_send_sent_event=True,
+    redbeat_redis_url=settings.CELERY.broker_url,
+)
+
+initialise_celery(celery)
+
+
+@worker_shutting_down.connect  # type: ignore[misc]
+def worker_shutting_down_handler(sig, how, exitcode, **kwargs) -> None:  # type: ignore[no-untyped-def]  # noqa: ARG001
+    """Handle the Celery worker shutdown event."""
+    celery.close()
diff --git a/start-worker.sh b/start-worker.sh
index 92cd6304..10a53ca8 100755
--- a/start-worker.sh
+++ b/start-worker.sh
@@ -4,4 +4,4 @@ set -o errexit
 set -o nounset
 
 cd /app
-python -m celery -A gso.worker worker --loglevel=info --concurrency=1 --pool=solo
+python -m celery -A gso.worker worker --loglevel=info --concurrency=1 --pool=solo --queues=new_tasks,resume_tasks,new_workflows,resume_workflows
diff --git a/tox.ini b/tox.ini
index c773a943..a8eab7b2 100644
--- a/tox.ini
+++ b/tox.ini
@@ -10,6 +10,8 @@ passenv = DATABASE_URI_TEST,SKIP_ALL_TESTS,ENVIRONMENT_IGNORE_MUTATION_DISABLED
 setenv =
     OAUTH2_ACTIVE = False
     TRANSLATIONS_DIR = ./gso/translations
+    TESTING=true
+    EXECUTOR=threadpool
 deps =
     coverage
     -r requirements.txt
-- 
GitLab