From cf812c9f08230b88c5f10dc2940036541818759b Mon Sep 17 00:00:00 2001
From: Mohammad Torkashvand <mohammad.torkashvand@geant.org>
Date: Wed, 11 Jun 2025 14:23:31 +0200
Subject: [PATCH] use multi-process and celery to handle massive redeploy base
config
---
gso/tasks/massive_redeploy_base_config.py | 121 ++++++++++++++--------
1 file changed, 79 insertions(+), 42 deletions(-)
diff --git a/gso/tasks/massive_redeploy_base_config.py b/gso/tasks/massive_redeploy_base_config.py
index d7d2fd924..cb072d557 100644
--- a/gso/tasks/massive_redeploy_base_config.py
+++ b/gso/tasks/massive_redeploy_base_config.py
@@ -3,7 +3,7 @@
import logging
import requests
-from celery import shared_task
+from celery import chord, shared_task
from orchestrator.services.processes import start_process
from orchestrator.workflow import ProcessStatus
from pydantic_forms.exceptions import FormValidationError
@@ -17,50 +17,61 @@ from gso.utils.types.tt_number import TTNumber
logger = logging.getLogger(__name__)
-@shared_task
-def massive_redeploy_base_config_task(
- selected_routers: list[UUIDstr], tt_number: TTNumber, callback_route: str
+@shared_task(ignore_result=False)
+def process_one_router(router_id: UUIDstr, tt_number: TTNumber) -> tuple[str, bool, str]:
+ """Celery subtask to start & wait for a single router redeploy.
+
+ Returns (router_fqdn, succeeded:bool, message:str).
+ """
+ router_fqdn = Router.from_subscription(router_id).router.router_fqdn
+ succeeded = False
+ message = ""
+ try:
+ pid = start_process(
+ "redeploy_base_config",
+ user_inputs=[
+ {"subscription_id": router_id},
+ {"tt_number": tt_number, "is_massive_redeploy": True},
+ ],
+ )
+ proc = wait_for_workflow_to_stop(pid, check_interval=5, max_retries=60)
+ if proc is None:
+ message = "Timed out waiting for workflow to complete"
+ elif proc.last_step == "Done" and proc.last_status == ProcessStatus.COMPLETED:
+ succeeded = True
+ message = "Done"
+ elif proc.last_status == ProcessStatus.ABORTED:
+ message = "Workflow was aborted"
+ elif proc.last_status == ProcessStatus.FAILED:
+ message = proc.failed_reason or "Workflow failed without a reason"
+ else:
+ message = f"Workflow status: {proc.last_status}, last step: {proc.last_step}"
+
+ except FormValidationError as e:
+ message = f"Validation error: {e}"
+ except Exception as e: # noqa: BLE001
+ message = f"Unexpected error: {e}"
+
+ return router_fqdn, succeeded, message
+
+
+@shared_task(ignore_result=False)
+def finalize_massive_redeploy(
+ results: list[tuple[str, bool, str]], callback_route: str, selected_routers: list[UUIDstr]
) -> None:
- """Massive redeploy base config task for routers."""
- failed_wfs: dict[str, str] = {}
- successful_wfs: dict[str, str] = {}
-
- for router_id in selected_routers:
- fqdn = Router.from_subscription(router_id).router.router_fqdn
- try:
- process_id = start_process(
- "redeploy_base_config",
- user_inputs=[{"subscription_id": router_id}, {"tt_number": tt_number, "is_massive_redeploy": True}],
- )
- process = wait_for_workflow_to_stop(process_id, check_interval=5, max_retries=60)
-
- if not process:
- failed_wfs[fqdn] = "Timed out waiting for workflow to complete"
- continue
-
- if process.last_step == "Done" and process.last_status == ProcessStatus.COMPLETED:
- successful_wfs[fqdn] = "Done"
- elif process.last_status == ProcessStatus.ABORTED:
- failed_wfs[fqdn] = "Workflow was aborted"
- elif process.last_status == ProcessStatus.FAILED:
- failed_wfs[fqdn] = process.failed_reason or "Workflow failed without a reason"
- else:
- failed_wfs[fqdn] = f"Workflow status: {process.last_status}, last step: {process.last_step}"
-
- except FormValidationError as e:
- failed_wfs[fqdn] = f"Validation error: {e}"
- except Exception as e: # noqa: BLE001
- failed_wfs[fqdn] = f"Unexpected error: {e}"
+ """Called once after all process_one_router tasks.`results` is a list of (FQDN, succeeded, message) tuples."""
+ successful_wfs = {}
+ failed_wfs = {}
+ for router_fqdn, ok, msg in results:
+ if ok:
+ successful_wfs[router_fqdn] = msg
+ else:
+ failed_wfs[router_fqdn] = msg
+ # fire callback
oss = settings.load_oss_params()
callback_url = f"{oss.GENERAL.internal_hostname}{callback_route}"
- msg = f"[provisioning proxy] Callback URL set to {callback_url}"
- logger.debug(msg)
-
- payload = {
- "failed_wfs": failed_wfs,
- "successful_wfs": successful_wfs,
- }
+ payload = {"failed_wfs": failed_wfs, "successful_wfs": successful_wfs}
try:
response = requests.post(callback_url, json=payload, timeout=30)
@@ -77,4 +88,30 @@ def massive_redeploy_base_config_task(
)
except Exception as e:
msg = f"Failed to post callback: {e}"
- logger.exception(msg)
+ logger.exception(
+ msg,
+ extra={
+ "callback_url": callback_url,
+ "failed_wfs": failed_wfs,
+ "selected_routers": selected_routers,
+ },
+ )
+
+
+@shared_task(ignore_result=False)
+def massive_redeploy_base_config_task(
+ selected_routers: list[UUIDstr],
+ tt_number: TTNumber,
+ callback_route: str,
+) -> None:
+ """Kicks off one Celery subtask per router, then runs the final callback."""
+ # 1. De dupe and fetch FQDNs (so subtasks don't hit the DB twice)
+ unique_ids = list(dict.fromkeys(selected_routers))
+ router_map = {}
+ for rid in unique_ids:
+ sub = Router.from_subscription(rid)
+ router_map[rid] = sub.router.router_fqdn
+
+ # 2. Build a chord: all process_one_router subtasks → finalize_massive_redeploy
+ header = [process_one_router.s(rid, tt_number) for rid in unique_ids] # type: ignore[attr-defined]
+ chord(header)(finalize_massive_redeploy.s(callback_route, unique_ids)) # type: ignore[attr-defined]
--
GitLab