diff --git a/gso/tasks/massive_redeploy_base_config.py b/gso/tasks/massive_redeploy_base_config.py index d7d2fd924af271ffbaf964877781577dc0596315..cb072d557dbe346c2e81fad8948bae10152deb2e 100644 --- a/gso/tasks/massive_redeploy_base_config.py +++ b/gso/tasks/massive_redeploy_base_config.py @@ -3,7 +3,7 @@ import logging import requests -from celery import shared_task +from celery import chord, shared_task from orchestrator.services.processes import start_process from orchestrator.workflow import ProcessStatus from pydantic_forms.exceptions import FormValidationError @@ -17,50 +17,61 @@ from gso.utils.types.tt_number import TTNumber logger = logging.getLogger(__name__) -@shared_task -def massive_redeploy_base_config_task( - selected_routers: list[UUIDstr], tt_number: TTNumber, callback_route: str +@shared_task(ignore_result=False) +def process_one_router(router_id: UUIDstr, tt_number: TTNumber) -> tuple[str, bool, str]: + """Celery subtask to start & wait for a single router redeploy. + + Returns (router_fqdn, succeeded:bool, message:str). + """ + router_fqdn = Router.from_subscription(router_id).router.router_fqdn + succeeded = False + message = "" + try: + pid = start_process( + "redeploy_base_config", + user_inputs=[ + {"subscription_id": router_id}, + {"tt_number": tt_number, "is_massive_redeploy": True}, + ], + ) + proc = wait_for_workflow_to_stop(pid, check_interval=5, max_retries=60) + if proc is None: + message = "Timed out waiting for workflow to complete" + elif proc.last_step == "Done" and proc.last_status == ProcessStatus.COMPLETED: + succeeded = True + message = "Done" + elif proc.last_status == ProcessStatus.ABORTED: + message = "Workflow was aborted" + elif proc.last_status == ProcessStatus.FAILED: + message = proc.failed_reason or "Workflow failed without a reason" + else: + message = f"Workflow status: {proc.last_status}, last step: {proc.last_step}" + + except FormValidationError as e: + message = f"Validation error: {e}" + except Exception as e: # noqa: BLE001 + message = f"Unexpected error: {e}" + + return router_fqdn, succeeded, message + + +@shared_task(ignore_result=False) +def finalize_massive_redeploy( + results: list[tuple[str, bool, str]], callback_route: str, selected_routers: list[UUIDstr] ) -> None: - """Massive redeploy base config task for routers.""" - failed_wfs: dict[str, str] = {} - successful_wfs: dict[str, str] = {} - - for router_id in selected_routers: - fqdn = Router.from_subscription(router_id).router.router_fqdn - try: - process_id = start_process( - "redeploy_base_config", - user_inputs=[{"subscription_id": router_id}, {"tt_number": tt_number, "is_massive_redeploy": True}], - ) - process = wait_for_workflow_to_stop(process_id, check_interval=5, max_retries=60) - - if not process: - failed_wfs[fqdn] = "Timed out waiting for workflow to complete" - continue - - if process.last_step == "Done" and process.last_status == ProcessStatus.COMPLETED: - successful_wfs[fqdn] = "Done" - elif process.last_status == ProcessStatus.ABORTED: - failed_wfs[fqdn] = "Workflow was aborted" - elif process.last_status == ProcessStatus.FAILED: - failed_wfs[fqdn] = process.failed_reason or "Workflow failed without a reason" - else: - failed_wfs[fqdn] = f"Workflow status: {process.last_status}, last step: {process.last_step}" - - except FormValidationError as e: - failed_wfs[fqdn] = f"Validation error: {e}" - except Exception as e: # noqa: BLE001 - failed_wfs[fqdn] = f"Unexpected error: {e}" + """Called once after all process_one_router tasks.`results` is a list of (FQDN, succeeded, message) tuples.""" + successful_wfs = {} + failed_wfs = {} + for router_fqdn, ok, msg in results: + if ok: + successful_wfs[router_fqdn] = msg + else: + failed_wfs[router_fqdn] = msg + # fire callback oss = settings.load_oss_params() callback_url = f"{oss.GENERAL.internal_hostname}{callback_route}" - msg = f"[provisioning proxy] Callback URL set to {callback_url}" - logger.debug(msg) - - payload = { - "failed_wfs": failed_wfs, - "successful_wfs": successful_wfs, - } + payload = {"failed_wfs": failed_wfs, "successful_wfs": successful_wfs} try: response = requests.post(callback_url, json=payload, timeout=30) @@ -77,4 +88,30 @@ def massive_redeploy_base_config_task( ) except Exception as e: msg = f"Failed to post callback: {e}" - logger.exception(msg) + logger.exception( + msg, + extra={ + "callback_url": callback_url, + "failed_wfs": failed_wfs, + "selected_routers": selected_routers, + }, + ) + + +@shared_task(ignore_result=False) +def massive_redeploy_base_config_task( + selected_routers: list[UUIDstr], + tt_number: TTNumber, + callback_route: str, +) -> None: + """Kicks off one Celery subtask per router, then runs the final callback.""" + # 1. De dupe and fetch FQDNs (so subtasks don't hit the DB twice) + unique_ids = list(dict.fromkeys(selected_routers)) + router_map = {} + for rid in unique_ids: + sub = Router.from_subscription(rid) + router_map[rid] = sub.router.router_fqdn + + # 2. Build a chord: all process_one_router subtasks → finalize_massive_redeploy + header = [process_one_router.s(rid, tt_number) for rid in unique_ids] # type: ignore[attr-defined] + chord(header)(finalize_massive_redeploy.s(callback_route, unique_ids)) # type: ignore[attr-defined]