diff --git a/gso/services/lso_client.py b/gso/services/lso_client.py index 67e2791752d1d5d9c4be584b21915e9f5ec9832a..8cfa6785a55d693283093ed80872a999908cf9b9 100644 --- a/gso/services/lso_client.py +++ b/gso/services/lso_client.py @@ -56,7 +56,7 @@ def _send_request(parameters: dict, callback_route: str) -> None: logger.debug(debug_msg) parameters.update({"callback": callback_url}) - url = f"{params.scheme}://{params.api_base}/api/playbook" + url = f"{params.scheme}://{params.api_base}/api/playbook/" request = requests.post(url, json=parameters, timeout=10) request.raise_for_status() diff --git a/gso/services/processes.py b/gso/services/processes.py index dba8abc29b2c86fec9b74064acf1d4b25c0c44f8..30caa96fddf3d997fa9ac91254d5934f5707a830 100644 --- a/gso/services/processes.py +++ b/gso/services/processes.py @@ -4,11 +4,13 @@ This prevents someone from having to re-write database statements many times, th or inconsistent when not careful. These methods are related to operations regarding processes and workflows. """ -from orchestrator.db import ProcessTable, WorkflowTable, db -from orchestrator.workflow import ProcessStatus +from uuid import UUID + +from orchestrator.db import ProcessStepTable, ProcessSubscriptionTable, ProcessTable, WorkflowTable, db +from orchestrator.workflow import ProcessStatus, StepStatus from pydantic_forms.types import UUIDstr from sqlalchemy import ScalarResult, and_, or_, select -from sqlalchemy.orm import Query +from sqlalchemy.orm import Query, joinedload def get_processes_by_workflow_name(workflow_name: str) -> Query: @@ -65,3 +67,21 @@ def get_created_and_completed_processes_by_id(workflow_id: UUIDstr) -> ScalarRes ) ) ) + + +def get_stopped_process_by_id(process_id: UUID | UUIDstr) -> ProcessTable | None: + """Get a stopped process by its ID.""" + return ( + db.session.query(ProcessTable) + .join(ProcessTable.steps) + .filter( + ProcessTable.process_id == process_id, + ProcessStepTable.status.in_({StepStatus.ABORT, StepStatus.FAILED, StepStatus.COMPLETE}), + ) + .options( + joinedload(ProcessTable.steps), + joinedload(ProcessTable.process_subscriptions).joinedload(ProcessSubscriptionTable.subscription), + ) + .order_by(ProcessTable.last_modified_at) + .one_or_none() + ) diff --git a/gso/tasks/massive_redeploy_base_config.py b/gso/tasks/massive_redeploy_base_config.py new file mode 100644 index 0000000000000000000000000000000000000000..d7d2fd924af271ffbaf964877781577dc0596315 --- /dev/null +++ b/gso/tasks/massive_redeploy_base_config.py @@ -0,0 +1,80 @@ +"""Massive redeploy base config task for routers.""" + +import logging + +import requests +from celery import shared_task +from orchestrator.services.processes import start_process +from orchestrator.workflow import ProcessStatus +from pydantic_forms.exceptions import FormValidationError +from pydantic_forms.types import UUIDstr + +from gso import settings +from gso.products.product_types.router import Router +from gso.utils.helpers import wait_for_workflow_to_stop +from gso.utils.types.tt_number import TTNumber + +logger = logging.getLogger(__name__) + + +@shared_task +def massive_redeploy_base_config_task( + selected_routers: list[UUIDstr], tt_number: TTNumber, callback_route: str +) -> None: + """Massive redeploy base config task for routers.""" + failed_wfs: dict[str, str] = {} + successful_wfs: dict[str, str] = {} + + for router_id in selected_routers: + fqdn = Router.from_subscription(router_id).router.router_fqdn + try: + process_id = start_process( + "redeploy_base_config", + user_inputs=[{"subscription_id": router_id}, {"tt_number": tt_number, "is_massive_redeploy": True}], + ) + process = wait_for_workflow_to_stop(process_id, check_interval=5, max_retries=60) + + if not process: + failed_wfs[fqdn] = "Timed out waiting for workflow to complete" + continue + + if process.last_step == "Done" and process.last_status == ProcessStatus.COMPLETED: + successful_wfs[fqdn] = "Done" + elif process.last_status == ProcessStatus.ABORTED: + failed_wfs[fqdn] = "Workflow was aborted" + elif process.last_status == ProcessStatus.FAILED: + failed_wfs[fqdn] = process.failed_reason or "Workflow failed without a reason" + else: + failed_wfs[fqdn] = f"Workflow status: {process.last_status}, last step: {process.last_step}" + + except FormValidationError as e: + failed_wfs[fqdn] = f"Validation error: {e}" + except Exception as e: # noqa: BLE001 + failed_wfs[fqdn] = f"Unexpected error: {e}" + + oss = settings.load_oss_params() + callback_url = f"{oss.GENERAL.internal_hostname}{callback_route}" + msg = f"[provisioning proxy] Callback URL set to {callback_url}" + logger.debug(msg) + + payload = { + "failed_wfs": failed_wfs, + "successful_wfs": successful_wfs, + } + + try: + response = requests.post(callback_url, json=payload, timeout=30) + if not response.ok: + logger.exception( + "Callback failed", + extra={ + "status_code": response.status_code, + "response_text": response.text, + "callback_url": callback_url, + "failed_wfs": failed_wfs, + "selected_routers": selected_routers, + }, + ) + except Exception as e: + msg = f"Failed to post callback: {e}" + logger.exception(msg) diff --git a/gso/utils/helpers.py b/gso/utils/helpers.py index dcb09306f568e34610e63c0698105b8e8f68bb07..7fa0726a1de92249290039cd97212663a7eda0e7 100644 --- a/gso/utils/helpers.py +++ b/gso/utils/helpers.py @@ -1,11 +1,14 @@ """Helper methods that are used across GSO.""" +import logging import random import re +import time from ipaddress import IPv4Network, IPv6Network from typing import TYPE_CHECKING, TypeAlias, cast from uuid import UUID +from orchestrator.db import ProcessTable from orchestrator.types import SubscriptionLifecycle from pydantic_forms.types import UUIDstr from pydantic_forms.validators import Choice @@ -15,6 +18,7 @@ from gso.products.product_blocks.router import RouterRole from gso.products.product_types.router import Router from gso.services.netbox_client import NetboxClient from gso.services.partners import get_all_partners +from gso.services.processes import get_stopped_process_by_id from gso.services.subscriptions import ( get_active_edge_port_subscriptions, get_active_router_subscriptions, @@ -30,6 +34,8 @@ from gso.utils.types.interfaces import PhysicalPortCapacity from gso.utils.types.ip_address import IPv4AddressType, IPv4NetworkType, IPv6NetworkType from gso.utils.types.virtual_identifiers import VC_ID +logger = logging.getLogger(__name__) + if TYPE_CHECKING: from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock @@ -395,3 +401,32 @@ def generate_unique_vc_id(l2c_type: str, max_attempts: int = 100) -> VC_ID | Non return VC_ID(vc_id) return None + + +def wait_for_workflow_to_stop( + process_id: UUIDstr | UUID, + check_interval: int, + max_retries: int, +) -> ProcessTable | None: + """Waits until any step in the workflow reaches a terminal status. + + Times out after max_retries * check_interval seconds. + + :param process_id: ID of the workflow process + :param check_interval: Seconds between checks + :param max_retries: Max number of retries before giving up + :return: process object if it has stopped, None if it timed out + """ + for attempt in range(max_retries): + if process := get_stopped_process_by_id(process_id): + msg = f"✅ Process {process_id} has stopped with status: {process.last_status}" + logger.info(msg) + return process + + msg = f"⏳ Attempt {attempt + 1}/{max_retries}: Waiting for workflow to progress..." + logger.info(msg) + time.sleep(check_interval) + + msg = f"❌ Timeout reached. Workflow {process_id} did not stop after {max_retries * check_interval} seconds." + logger.error(msg) + return None diff --git a/gso/worker.py b/gso/worker.py index ed9c01ffc5fc3edf055ad48a60100f8e1911f56c..fc21f0c3bbf91e0f3c0df54e9772ce761083124b 100644 --- a/gso/worker.py +++ b/gso/worker.py @@ -84,6 +84,7 @@ celery = OrchestratorWorker( "gso.schedules.clean_old_tasks", "orchestrator.services.tasks", "gso.tasks.start_process", + "gso.tasks.massive_redeploy_base_config", ], ) diff --git a/gso/workflows/router/redeploy_base_config.py b/gso/workflows/router/redeploy_base_config.py index a7c7dd81effbed681b3e92e2ec79f93566e62d97..1fba2bc87bad1f166c40b0bdc4f23227169f8f69 100644 --- a/gso/workflows/router/redeploy_base_config.py +++ b/gso/workflows/router/redeploy_base_config.py @@ -17,7 +17,7 @@ from orchestrator.workflows.utils import wrap_modify_initial_input_form from pydantic_forms.types import FormGenerator, UUIDstr from gso.products.product_types.router import Router -from gso.services.lso_client import lso_interaction +from gso.services.lso_client import anonymous_lso_interaction, lso_interaction from gso.utils.types.tt_number import TTNumber from gso.utils.workflow_steps import deploy_base_config_dry, deploy_base_config_real @@ -47,13 +47,15 @@ def redeploy_base_config() -> StepList: * Redeploy base config """ is_not_massive_redeploy = conditional(lambda state: not bool(state.get("is_massive_redeploy"))) + is_massive_redeploy = conditional(lambda state: bool(state.get("is_massive_redeploy"))) return ( begin >> store_process_subscription(Target.MODIFY) >> unsync >> is_not_massive_redeploy(lso_interaction(deploy_base_config_dry)) - >> lso_interaction(deploy_base_config_real) + >> is_not_massive_redeploy(lso_interaction(deploy_base_config_real)) + >> is_massive_redeploy(anonymous_lso_interaction(deploy_base_config_real)) >> resync >> done ) diff --git a/gso/workflows/tasks/redeploy_base_config.py b/gso/workflows/tasks/redeploy_base_config.py index 2db1e5a74849d5b7039ab847cb387c55a2599a35..32348299228bc10b08ca99dc6d7f4855a65aa236 100644 --- a/gso/workflows/tasks/redeploy_base_config.py +++ b/gso/workflows/tasks/redeploy_base_config.py @@ -7,19 +7,18 @@ input by the operator. The operator can then import json from typing import Annotated -import requests from annotated_types import Len from orchestrator.config.assignee import Assignee from orchestrator.forms import SubmitFormPage from orchestrator.targets import Target -from orchestrator.workflow import StepList, conditional, done, init, inputstep, step, workflow +from orchestrator.workflow import StepList, callback_step, conditional, done, init, inputstep, step, workflow from pydantic import AfterValidator, ConfigDict from pydantic_forms.types import FormGenerator, State, UUIDstr from pydantic_forms.validators import LongText, validate_unique_list -from requests import HTTPError from gso.products.product_types.router import Router from gso.services.subscriptions import get_active_router_subscriptions +from gso.tasks.massive_redeploy_base_config import massive_redeploy_base_config_task from gso.utils.helpers import active_nokia_router_selector from gso.utils.shared_enums import Vendor from gso.utils.types.tt_number import TTNumber @@ -45,40 +44,34 @@ def _input_form_generator() -> FormGenerator: selected_routers: router_selection_list = all_active_nokia_routers # type: ignore[valid-type] user_input = yield RedeployBaseConfigForm - return user_input.model_dump() - - -@step("Start running redeploy workflows on selected routers") -def start_redeploy_workflows(tt_number: TTNumber, selected_routers: list[UUIDstr]) -> State: - """Loop over all selected routers, and try to start a new workflow for each of them.""" - wf_errors = {} - workflow_url = "http://localhost:8080/api/processes/redeploy_base_config" - - for selected_router in selected_routers: - try: - result = requests.post( - workflow_url, - json=[{"subscription_id": selected_router}, {"tt_number": tt_number, "is_massive_redeploy": True}], - timeout=10, - ) - result.raise_for_status() - except HTTPError as e: - if e.response.json()["validation_errors"]: - error_message = e.response.json()["validation_errors"][0]["msg"] - else: - error_message = e.response.json() - wf_errors[Router.from_subscription(selected_router).router.router_fqdn] = error_message - - return {"wf_errors": wf_errors} - - -@inputstep("Some workflows have failed to start", assignee=Assignee.SYSTEM) -def workflows_failed_to_start_prompt(wf_errors: list) -> FormGenerator: + return user_input.model_dump() | {"failed_wfs": {}, "successful_wfs": {}} + + +@step("Start worker to redeploy base config on selected routers") +def start_redeploy_workflows(tt_number: TTNumber, selected_routers: list[UUIDstr], callback_route: str) -> State: + """Start the massive redeploy base config task with the selected routers.""" + # TODO if in the future you changed UUIDstr to UUID, you need to convert them to string when passing to the task + massive_redeploy_base_config_task.apply_async(args=[selected_routers, tt_number, callback_route], countdown=5) + + return {"failed_wfs": {}, "successful_wfs": {}} + + +@step("Evaluate provisioning proxy result") +def evaluate_results(callback_result: dict) -> State: + """Evaluate the result of the provisioning proxy callback.""" + failed_wfs = callback_result.pop("failed_wfs", {}) + successful_wfs = callback_result.pop("successful_wfs", {}) + return {"callback_result": callback_result, "failed_wfs": failed_wfs, "successful_wfs": successful_wfs} + + +@inputstep("Some workflows have failed", assignee=Assignee.SYSTEM) +def workflows_failed_to_prompt(failed_wfs: dict, successful_wfs: dict) -> FormGenerator: """Prompt the operator that some workflows have failed to start.""" class WFFailurePrompt(SubmitFormPage): - model_config = ConfigDict(title="Some redeploy workflows have failed to start, please inspect the list below") - failed_to_start: LongText = json.dumps(wf_errors, indent=4) + model_config = ConfigDict(title="Some redeploy workflows have failed, please inspect the list below") + failed_workflows: LongText = json.dumps(failed_wfs, indent=4) + successful_workflows: LongText = json.dumps(successful_wfs, indent=4) yield WFFailurePrompt return {} @@ -87,6 +80,15 @@ def workflows_failed_to_start_prompt(wf_errors: list) -> FormGenerator: @workflow("Redeploy base config on multiple routers", initial_input_form=_input_form_generator, target=Target.SYSTEM) def task_redeploy_base_config() -> StepList: """Gather a list of routers from the operator to redeploy base config onto.""" - some_failed_to_start = conditional(lambda state: state["wf_errors"]) - - return init >> start_redeploy_workflows >> some_failed_to_start(workflows_failed_to_start_prompt) >> done + some_failed_to_start = conditional(lambda state: len(state.get("failed_wfs", {})) > 0) + + return ( + init + >> callback_step( + name="Start running redeploy workflows on selected routers", + action_step=start_redeploy_workflows, + validate_step=evaluate_results, + ) + >> some_failed_to_start(workflows_failed_to_prompt) + >> done + )