Skip to content
Snippets Groups Projects
Commit cf812c9f authored by Mohammad Torkashvand's avatar Mohammad Torkashvand
Browse files

use multi-process and celery to handle massive redeploy base config

parent 0be6d7ec
No related branches found
No related tags found
1 merge request!431Resolve NAT-1177 "Feature/ concurrent massive base config redeploy using celery"
Pipeline #94792 passed
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
import logging import logging
import requests import requests
from celery import shared_task from celery import chord, shared_task
from orchestrator.services.processes import start_process from orchestrator.services.processes import start_process
from orchestrator.workflow import ProcessStatus from orchestrator.workflow import ProcessStatus
from pydantic_forms.exceptions import FormValidationError from pydantic_forms.exceptions import FormValidationError
...@@ -17,50 +17,61 @@ from gso.utils.types.tt_number import TTNumber ...@@ -17,50 +17,61 @@ from gso.utils.types.tt_number import TTNumber
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@shared_task @shared_task(ignore_result=False)
def massive_redeploy_base_config_task( def process_one_router(router_id: UUIDstr, tt_number: TTNumber) -> tuple[str, bool, str]:
selected_routers: list[UUIDstr], tt_number: TTNumber, callback_route: str """Celery subtask to start & wait for a single router redeploy.
Returns (router_fqdn, succeeded:bool, message:str).
"""
router_fqdn = Router.from_subscription(router_id).router.router_fqdn
succeeded = False
message = ""
try:
pid = start_process(
"redeploy_base_config",
user_inputs=[
{"subscription_id": router_id},
{"tt_number": tt_number, "is_massive_redeploy": True},
],
)
proc = wait_for_workflow_to_stop(pid, check_interval=5, max_retries=60)
if proc is None:
message = "Timed out waiting for workflow to complete"
elif proc.last_step == "Done" and proc.last_status == ProcessStatus.COMPLETED:
succeeded = True
message = "Done"
elif proc.last_status == ProcessStatus.ABORTED:
message = "Workflow was aborted"
elif proc.last_status == ProcessStatus.FAILED:
message = proc.failed_reason or "Workflow failed without a reason"
else:
message = f"Workflow status: {proc.last_status}, last step: {proc.last_step}"
except FormValidationError as e:
message = f"Validation error: {e}"
except Exception as e: # noqa: BLE001
message = f"Unexpected error: {e}"
return router_fqdn, succeeded, message
@shared_task(ignore_result=False)
def finalize_massive_redeploy(
results: list[tuple[str, bool, str]], callback_route: str, selected_routers: list[UUIDstr]
) -> None: ) -> None:
"""Massive redeploy base config task for routers.""" """Called once after all process_one_router tasks.`results` is a list of (FQDN, succeeded, message) tuples."""
failed_wfs: dict[str, str] = {} successful_wfs = {}
successful_wfs: dict[str, str] = {} failed_wfs = {}
for router_fqdn, ok, msg in results:
for router_id in selected_routers: if ok:
fqdn = Router.from_subscription(router_id).router.router_fqdn successful_wfs[router_fqdn] = msg
try: else:
process_id = start_process( failed_wfs[router_fqdn] = msg
"redeploy_base_config",
user_inputs=[{"subscription_id": router_id}, {"tt_number": tt_number, "is_massive_redeploy": True}],
)
process = wait_for_workflow_to_stop(process_id, check_interval=5, max_retries=60)
if not process:
failed_wfs[fqdn] = "Timed out waiting for workflow to complete"
continue
if process.last_step == "Done" and process.last_status == ProcessStatus.COMPLETED:
successful_wfs[fqdn] = "Done"
elif process.last_status == ProcessStatus.ABORTED:
failed_wfs[fqdn] = "Workflow was aborted"
elif process.last_status == ProcessStatus.FAILED:
failed_wfs[fqdn] = process.failed_reason or "Workflow failed without a reason"
else:
failed_wfs[fqdn] = f"Workflow status: {process.last_status}, last step: {process.last_step}"
except FormValidationError as e:
failed_wfs[fqdn] = f"Validation error: {e}"
except Exception as e: # noqa: BLE001
failed_wfs[fqdn] = f"Unexpected error: {e}"
# fire callback
oss = settings.load_oss_params() oss = settings.load_oss_params()
callback_url = f"{oss.GENERAL.internal_hostname}{callback_route}" callback_url = f"{oss.GENERAL.internal_hostname}{callback_route}"
msg = f"[provisioning proxy] Callback URL set to {callback_url}" payload = {"failed_wfs": failed_wfs, "successful_wfs": successful_wfs}
logger.debug(msg)
payload = {
"failed_wfs": failed_wfs,
"successful_wfs": successful_wfs,
}
try: try:
response = requests.post(callback_url, json=payload, timeout=30) response = requests.post(callback_url, json=payload, timeout=30)
...@@ -77,4 +88,30 @@ def massive_redeploy_base_config_task( ...@@ -77,4 +88,30 @@ def massive_redeploy_base_config_task(
) )
except Exception as e: except Exception as e:
msg = f"Failed to post callback: {e}" msg = f"Failed to post callback: {e}"
logger.exception(msg) logger.exception(
msg,
extra={
"callback_url": callback_url,
"failed_wfs": failed_wfs,
"selected_routers": selected_routers,
},
)
@shared_task(ignore_result=False)
def massive_redeploy_base_config_task(
selected_routers: list[UUIDstr],
tt_number: TTNumber,
callback_route: str,
) -> None:
"""Kicks off one Celery subtask per router, then runs the final callback."""
# 1. De dupe and fetch FQDNs (so subtasks don't hit the DB twice)
unique_ids = list(dict.fromkeys(selected_routers))
router_map = {}
for rid in unique_ids:
sub = Router.from_subscription(rid)
router_map[rid] = sub.router.router_fqdn
# 2. Build a chord: all process_one_router subtasks → finalize_massive_redeploy
header = [process_one_router.s(rid, tt_number) for rid in unique_ids] # type: ignore[attr-defined]
chord(header)(finalize_massive_redeploy.s(callback_route, unique_ids)) # type: ignore[attr-defined]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment