diff --git a/gso/migrations/versions/2025-05-29_9a7bae1f6438_add_mass_router_redeploy_task.py b/gso/migrations/versions/2025-05-29_9a7bae1f6438_add_mass_router_redeploy_task.py new file mode 100644 index 0000000000000000000000000000000000000000..7c21f6da597d730fa2a2a4d9ef0b986757d8bd6f --- /dev/null +++ b/gso/migrations/versions/2025-05-29_9a7bae1f6438_add_mass_router_redeploy_task.py @@ -0,0 +1,37 @@ +"""Add mass router redeploy task. + +Revision ID: 9a7bae1f6438 +Revises: 465008ed496e +Create Date: 2025-05-15 12:01:54.469229 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = '9a7bae1f6438' +down_revision = '90547df711c3' +branch_labels = None +depends_on = None + + +from orchestrator.migrations.helpers import create_task, delete_workflow + +new_tasks = [ + { + "name": "task_redeploy_base_config", + "description": "Redeploy base config on multiple routers" + } +] + + +def upgrade() -> None: + conn = op.get_bind() + for task in new_tasks: + create_task(conn, task) + + +def downgrade() -> None: + conn = op.get_bind() + for task in new_tasks: + delete_workflow(conn, task["name"]) diff --git a/gso/services/lso_client.py b/gso/services/lso_client.py index 67e2791752d1d5d9c4be584b21915e9f5ec9832a..8cfa6785a55d693283093ed80872a999908cf9b9 100644 --- a/gso/services/lso_client.py +++ b/gso/services/lso_client.py @@ -56,7 +56,7 @@ def _send_request(parameters: dict, callback_route: str) -> None: logger.debug(debug_msg) parameters.update({"callback": callback_url}) - url = f"{params.scheme}://{params.api_base}/api/playbook" + url = f"{params.scheme}://{params.api_base}/api/playbook/" request = requests.post(url, json=parameters, timeout=10) request.raise_for_status() diff --git a/gso/services/processes.py b/gso/services/processes.py index dba8abc29b2c86fec9b74064acf1d4b25c0c44f8..30caa96fddf3d997fa9ac91254d5934f5707a830 100644 --- a/gso/services/processes.py +++ b/gso/services/processes.py @@ -4,11 +4,13 @@ This prevents someone from having to re-write database statements many times, th or inconsistent when not careful. These methods are related to operations regarding processes and workflows. """ -from orchestrator.db import ProcessTable, WorkflowTable, db -from orchestrator.workflow import ProcessStatus +from uuid import UUID + +from orchestrator.db import ProcessStepTable, ProcessSubscriptionTable, ProcessTable, WorkflowTable, db +from orchestrator.workflow import ProcessStatus, StepStatus from pydantic_forms.types import UUIDstr from sqlalchemy import ScalarResult, and_, or_, select -from sqlalchemy.orm import Query +from sqlalchemy.orm import Query, joinedload def get_processes_by_workflow_name(workflow_name: str) -> Query: @@ -65,3 +67,21 @@ def get_created_and_completed_processes_by_id(workflow_id: UUIDstr) -> ScalarRes ) ) ) + + +def get_stopped_process_by_id(process_id: UUID | UUIDstr) -> ProcessTable | None: + """Get a stopped process by its ID.""" + return ( + db.session.query(ProcessTable) + .join(ProcessTable.steps) + .filter( + ProcessTable.process_id == process_id, + ProcessStepTable.status.in_({StepStatus.ABORT, StepStatus.FAILED, StepStatus.COMPLETE}), + ) + .options( + joinedload(ProcessTable.steps), + joinedload(ProcessTable.process_subscriptions).joinedload(ProcessSubscriptionTable.subscription), + ) + .order_by(ProcessTable.last_modified_at) + .one_or_none() + ) diff --git a/gso/tasks/massive_redeploy_base_config.py b/gso/tasks/massive_redeploy_base_config.py new file mode 100644 index 0000000000000000000000000000000000000000..d7d2fd924af271ffbaf964877781577dc0596315 --- /dev/null +++ b/gso/tasks/massive_redeploy_base_config.py @@ -0,0 +1,80 @@ +"""Massive redeploy base config task for routers.""" + +import logging + +import requests +from celery import shared_task +from orchestrator.services.processes import start_process +from orchestrator.workflow import ProcessStatus +from pydantic_forms.exceptions import FormValidationError +from pydantic_forms.types import UUIDstr + +from gso import settings +from gso.products.product_types.router import Router +from gso.utils.helpers import wait_for_workflow_to_stop +from gso.utils.types.tt_number import TTNumber + +logger = logging.getLogger(__name__) + + +@shared_task +def massive_redeploy_base_config_task( + selected_routers: list[UUIDstr], tt_number: TTNumber, callback_route: str +) -> None: + """Massive redeploy base config task for routers.""" + failed_wfs: dict[str, str] = {} + successful_wfs: dict[str, str] = {} + + for router_id in selected_routers: + fqdn = Router.from_subscription(router_id).router.router_fqdn + try: + process_id = start_process( + "redeploy_base_config", + user_inputs=[{"subscription_id": router_id}, {"tt_number": tt_number, "is_massive_redeploy": True}], + ) + process = wait_for_workflow_to_stop(process_id, check_interval=5, max_retries=60) + + if not process: + failed_wfs[fqdn] = "Timed out waiting for workflow to complete" + continue + + if process.last_step == "Done" and process.last_status == ProcessStatus.COMPLETED: + successful_wfs[fqdn] = "Done" + elif process.last_status == ProcessStatus.ABORTED: + failed_wfs[fqdn] = "Workflow was aborted" + elif process.last_status == ProcessStatus.FAILED: + failed_wfs[fqdn] = process.failed_reason or "Workflow failed without a reason" + else: + failed_wfs[fqdn] = f"Workflow status: {process.last_status}, last step: {process.last_step}" + + except FormValidationError as e: + failed_wfs[fqdn] = f"Validation error: {e}" + except Exception as e: # noqa: BLE001 + failed_wfs[fqdn] = f"Unexpected error: {e}" + + oss = settings.load_oss_params() + callback_url = f"{oss.GENERAL.internal_hostname}{callback_route}" + msg = f"[provisioning proxy] Callback URL set to {callback_url}" + logger.debug(msg) + + payload = { + "failed_wfs": failed_wfs, + "successful_wfs": successful_wfs, + } + + try: + response = requests.post(callback_url, json=payload, timeout=30) + if not response.ok: + logger.exception( + "Callback failed", + extra={ + "status_code": response.status_code, + "response_text": response.text, + "callback_url": callback_url, + "failed_wfs": failed_wfs, + "selected_routers": selected_routers, + }, + ) + except Exception as e: + msg = f"Failed to post callback: {e}" + logger.exception(msg) diff --git a/gso/translations/en-GB.json b/gso/translations/en-GB.json index 7991b89dd89c838c13bb45fa730ad521db3e0dc9..1a05f72f7f5ef30b926e895c0789c13d9bc4a58b 100644 --- a/gso/translations/en-GB.json +++ b/gso/translations/en-GB.json @@ -151,6 +151,7 @@ "task_create_partners": "Create partner task", "task_delete_partners": "Delete partner task", "task_modify_partners": "Modify partner task", + "task_redeploy_base_config": "Redeploy base config on multiple routers", "task_send_email_notifications": "Send email notifications for failed tasks", "task_validate_geant_products": "Validation task for GEANT products", "terminate_edge_port": "Terminate Edge Port", diff --git a/gso/utils/helpers.py b/gso/utils/helpers.py index fc4c5dee2c279d9007965a488f721ea78f32e9f7..4e4ea7d7015fbd5632b7382e5a2e0e2923abff2c 100644 --- a/gso/utils/helpers.py +++ b/gso/utils/helpers.py @@ -1,11 +1,14 @@ """Helper methods that are used across GSO.""" +import logging import random import re +import time from ipaddress import IPv4Network, IPv6Network from typing import TYPE_CHECKING, TypeAlias, cast from uuid import UUID +from orchestrator.db import ProcessTable from orchestrator.types import SubscriptionLifecycle from pydantic_forms.types import UUIDstr from pydantic_forms.validators import Choice @@ -15,6 +18,7 @@ from gso.products.product_blocks.router import RouterRole from gso.products.product_types.router import Router from gso.services.netbox_client import NetboxClient from gso.services.partners import get_all_partners +from gso.services.processes import get_stopped_process_by_id from gso.services.subscriptions import ( get_active_edge_port_subscriptions, get_active_router_subscriptions, @@ -30,11 +34,13 @@ from gso.utils.types.interfaces import PhysicalPortCapacity from gso.utils.types.ip_address import IPv4AddressType, IPv4NetworkType, IPv6NetworkType from gso.utils.types.virtual_identifiers import VC_ID +logger = logging.getLogger(__name__) + if TYPE_CHECKING: from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock -def available_interfaces_choices(router_id: UUID, speed: str) -> Choice | None: +def available_interfaces_choices(router_id: UUID, speed: str) -> TypeAlias: """Return a list of available interfaces for a given router and speed. For Nokia routers, return a list of available interfaces. @@ -46,14 +52,17 @@ def available_interfaces_choices(router_id: UUID, speed: str) -> Choice | None: interface["name"]: f"{interface["name"]} {interface["description"]}" for interface in NetboxClient().get_available_interfaces(router_id, speed) } - return Choice("ae member", zip(interfaces.keys(), interfaces.items(), strict=True)) # type: ignore[arg-type] + return cast( + type[Choice], + Choice.__call__("ae member", zip(interfaces.keys(), interfaces.items(), strict=True)), + ) def available_interfaces_choices_including_current_members( router_id: UUID, speed: str, interfaces: list["IptrunkInterfaceBlock"], -) -> Choice | None: +) -> TypeAlias: """Return a list of available interfaces for a given router and speed including the current members. For Nokia routers, return a list of available interfaces. @@ -75,10 +84,13 @@ def available_interfaces_choices_including_current_members( options = { interface["name"]: f"{interface["name"]} {interface["description"]}" for interface in available_interfaces } - return Choice("ae member", zip(options.keys(), options.items(), strict=True)) # type: ignore[arg-type] + return cast( + type[Choice], + Choice.__call__("ae member", zip(options.keys(), options.items(), strict=True)), + ) -def available_lags_choices(router_id: UUID) -> Choice | None: +def available_lags_choices(router_id: UUID) -> TypeAlias: """Return a list of available lags for a given router. For Nokia routers, return a list of available lags. @@ -87,10 +99,13 @@ def available_lags_choices(router_id: UUID) -> Choice | None: if get_router_vendor(router_id) != Vendor.NOKIA: return None side_a_ae_iface_list = NetboxClient().get_available_lags(router_id) - return Choice("ae iface", zip(side_a_ae_iface_list, side_a_ae_iface_list, strict=True)) # type: ignore[arg-type] + return cast( + type[Choice], + Choice.__call__("ae iface", zip(side_a_ae_iface_list, side_a_ae_iface_list, strict=True)), + ) -def available_service_lags_choices(router_id: UUID) -> Choice | None: +def available_service_lags_choices(router_id: UUID) -> TypeAlias: """Return a list of available lags for a given router for services. For Nokia routers, return a list of available lags. @@ -99,7 +114,10 @@ def available_service_lags_choices(router_id: UUID) -> Choice | None: if get_router_vendor(router_id) != Vendor.NOKIA: return None side_a_ae_iface_list = NetboxClient().get_available_services_lags(router_id) - return Choice("ae iface", zip(side_a_ae_iface_list, side_a_ae_iface_list, strict=True)) # type: ignore[arg-type] + return cast( + type[Choice], + Choice.__call__("ae iface", zip(side_a_ae_iface_list, side_a_ae_iface_list, strict=True)), + ) def get_router_vendor(router_id: UUID) -> Vendor: @@ -222,30 +240,60 @@ def calculate_recommended_minimum_links(iptrunk_number_of_members: int, iptrunk_ return iptrunk_number_of_members -def active_site_selector() -> Choice: +def active_site_selector() -> TypeAlias: """Generate a dropdown selector for choosing an active site in an input form.""" site_subscriptions = { str(site["subscription_id"]): site["description"] for site in get_active_site_subscriptions(includes=["subscription_id", "description"]) } - return Choice("Select a site", zip(site_subscriptions.keys(), site_subscriptions.items(), strict=True)) # type: ignore[arg-type] + return cast( + type[Choice], + Choice.__call__("Select a site", zip(site_subscriptions.keys(), site_subscriptions.items(), strict=True)), + ) -def active_router_selector(*, excludes: list[UUIDstr] | None = None) -> Choice: - """Generate a dropdown selector for choosing an active Router in an input form.""" - if excludes is None: - excludes = [] +def active_router_selector(*, excludes: list[UUIDstr] | None = None) -> TypeAlias: + """Generate a dropdown selector for choosing an active Router in an input form. + + The resulting list of routers can be filtered using a list of excluded subscription IDs. + """ + excludes = excludes or [] router_subscriptions = { str(router["subscription_id"]): router["description"] for router in get_active_router_subscriptions(includes=["subscription_id", "description"]) if router["subscription_id"] not in excludes } - return Choice("Select a router", zip(router_subscriptions.keys(), router_subscriptions.items(), strict=True)) # type: ignore[arg-type] + return cast( + type[Choice], + Choice.__call__("Select a router", zip(router_subscriptions.keys(), router_subscriptions.items(), strict=True)), + ) -def active_pe_router_selector(excludes: list[UUIDstr] | None = None) -> Choice: +def active_nokia_router_selector(*, excludes: list[UUIDstr] | None = None) -> TypeAlias: + """Generate a dropdown choice list of all active Nokia routers. + + Args: + excludes: An optional list of subscription IDs that should be excluded from the resulting dropdown. + """ + excludes = excludes or [] + router_subscriptions = { + str(router.subscription_id): router.description + for router in [ + Router.from_subscription(subscription["subscription_id"]) + for subscription in get_active_router_subscriptions(["subscription_id"]) + ] + if router.subscription_id not in excludes and router.router.vendor == Vendor.NOKIA + } + + return cast( + type[Choice], + Choice.__call__("Select a router", zip(router_subscriptions.keys(), router_subscriptions.items(), strict=True)), + ) + + +def active_pe_router_selector(excludes: list[UUIDstr] | None = None) -> TypeAlias: """Generate a dropdown selector for choosing an active PE Router in an input form.""" excludes = excludes or [] @@ -255,17 +303,23 @@ def active_pe_router_selector(excludes: list[UUIDstr] | None = None) -> Choice: if router.subscription_id not in excludes } - return Choice("Select a router", zip(routers.keys(), routers.items(), strict=True)) # type: ignore[arg-type] + return cast( + type[Choice], + Choice.__call__("Select a router", zip(routers.keys(), routers.items(), strict=True)), + ) -def active_switch_selector() -> Choice: +def active_switch_selector() -> TypeAlias: """Generate a dropdown selector for choosing an active Switch in an input form.""" switch_subscriptions = { str(switch["subscription_id"]): switch["description"] for switch in get_active_switch_subscriptions(includes=["subscription_id", "description"]) } - return Choice("Select a switch", zip(switch_subscriptions.keys(), switch_subscriptions.items(), strict=True)) # type: ignore[arg-type] + return cast( + type[Choice], + Choice.__call__("Select a switch", zip(switch_subscriptions.keys(), switch_subscriptions.items(), strict=True)), + ) def active_edge_port_selector(*, partner_id: UUIDstr | None = None) -> TypeAlias: @@ -293,11 +347,14 @@ def ip_trunk_service_version_selector() -> Choice: ) -def partner_choice() -> Choice: +def partner_choice() -> TypeAlias: """Return a Choice object containing a list of available partners.""" partners = {partner.partner_id: partner.name for partner in get_all_partners()} - return Choice("Select a partner", zip(partners.values(), partners.items(), strict=True)) # type: ignore[arg-type] + return cast( + type[Choice], + Choice.__call__("Select a partner", zip(partners.values(), partners.items(), strict=True)), + ) def validate_edge_port_number_of_members_based_on_lacp(*, number_of_members: int, enable_lacp: bool) -> None: @@ -325,7 +382,7 @@ def generate_unique_vc_id(l2c_type: str, max_attempts: int = 100) -> VC_ID | Non ``Ethernet`` and ``VLAN`` type circuits get their IDs from different ranges. Args: - l2c_type: type of l2circuit. + l2c_type: type of Layer 2 Circuit. max_attempts: The maximum number of attempts to generate a unique ID. Returns: @@ -344,3 +401,30 @@ def generate_unique_vc_id(l2c_type: str, max_attempts: int = 100) -> VC_ID | Non return VC_ID(vc_id) return None + + +def wait_for_workflow_to_stop( + process_id: UUIDstr | UUID, + check_interval: int, + max_retries: int, +) -> ProcessTable | None: + """Waits until any step in the workflow reaches a terminal status. + + :param process_id: ID of the workflow process + :param check_interval: Seconds between checks + :param max_retries: Max number of retries before giving up + :return: process object if it has stopped, None if it timed out + """ + for attempt in range(max_retries): + if process := get_stopped_process_by_id(process_id): + msg = f"✅ Process {process_id} has stopped with status: {process.last_status}" + logger.info(msg) + return process + + msg = f"⏳ Attempt {attempt + 1}/{max_retries}: Waiting for workflow to progress..." + logger.info(msg) + time.sleep(check_interval) + + msg = f"❌ Timeout reached. Workflow {process_id} did not stop after {max_retries * check_interval} seconds." + logger.error(msg) + return None diff --git a/gso/worker.py b/gso/worker.py index ed9c01ffc5fc3edf055ad48a60100f8e1911f56c..fc21f0c3bbf91e0f3c0df54e9772ce761083124b 100644 --- a/gso/worker.py +++ b/gso/worker.py @@ -84,6 +84,7 @@ celery = OrchestratorWorker( "gso.schedules.clean_old_tasks", "orchestrator.services.tasks", "gso.tasks.start_process", + "gso.tasks.massive_redeploy_base_config", ], ) diff --git a/gso/workflows/__init__.py b/gso/workflows/__init__.py index 8b1f4065d11869daa09e3ab124caf4b7484a89e0..2633b5cd25931e1967321ee0bf7fc2209bd05746 100644 --- a/gso/workflows/__init__.py +++ b/gso/workflows/__init__.py @@ -110,6 +110,7 @@ LazyWorkflowInstance("gso.workflows.tasks.modify_partners", "task_modify_partner LazyWorkflowInstance("gso.workflows.tasks.delete_partners", "task_delete_partners") LazyWorkflowInstance("gso.workflows.tasks.clean_old_tasks", "task_clean_old_tasks") LazyWorkflowInstance("gso.workflows.tasks.check_site_connectivity", "task_check_site_connectivity") +LazyWorkflowInstance("gso.workflows.tasks.redeploy_base_config", "task_redeploy_base_config") # Edge port workflows LazyWorkflowInstance("gso.workflows.edge_port.create_edge_port", "create_edge_port") diff --git a/gso/workflows/router/redeploy_base_config.py b/gso/workflows/router/redeploy_base_config.py index 11c4264dc754f2e7289241a2324ba7b00dabf2c7..1fba2bc87bad1f166c40b0bdc4f23227169f8f69 100644 --- a/gso/workflows/router/redeploy_base_config.py +++ b/gso/workflows/router/redeploy_base_config.py @@ -11,13 +11,13 @@ run. After confirmation by an operator, the configuration is committed to the ma from orchestrator.forms import SubmitFormPage from orchestrator.forms.validators import Label from orchestrator.targets import Target -from orchestrator.workflow import StepList, begin, done, workflow +from orchestrator.workflow import StepList, begin, conditional, done, workflow from orchestrator.workflows.steps import resync, store_process_subscription, unsync from orchestrator.workflows.utils import wrap_modify_initial_input_form from pydantic_forms.types import FormGenerator, UUIDstr from gso.products.product_types.router import Router -from gso.services.lso_client import lso_interaction +from gso.services.lso_client import anonymous_lso_interaction, lso_interaction from gso.utils.types.tt_number import TTNumber from gso.utils.workflow_steps import deploy_base_config_dry, deploy_base_config_real @@ -28,6 +28,7 @@ def _initial_input_form(subscription_id: UUIDstr) -> FormGenerator: class RedeployBaseConfigForm(SubmitFormPage): info_label: Label = f"Redeploy base config on {router.router.router_fqdn}?" tt_number: TTNumber + is_massive_redeploy: bool = False user_input = yield RedeployBaseConfigForm @@ -45,12 +46,16 @@ def redeploy_base_config() -> StepList: * Perform a dry run of deployment * Redeploy base config """ + is_not_massive_redeploy = conditional(lambda state: not bool(state.get("is_massive_redeploy"))) + is_massive_redeploy = conditional(lambda state: bool(state.get("is_massive_redeploy"))) + return ( begin >> store_process_subscription(Target.MODIFY) >> unsync - >> lso_interaction(deploy_base_config_dry) - >> lso_interaction(deploy_base_config_real) + >> is_not_massive_redeploy(lso_interaction(deploy_base_config_dry)) + >> is_not_massive_redeploy(lso_interaction(deploy_base_config_real)) + >> is_massive_redeploy(anonymous_lso_interaction(deploy_base_config_real)) >> resync >> done ) diff --git a/gso/workflows/tasks/redeploy_base_config.py b/gso/workflows/tasks/redeploy_base_config.py new file mode 100644 index 0000000000000000000000000000000000000000..e1b46a79347d8b02b8382cfeb1872a2feaa59e69 --- /dev/null +++ b/gso/workflows/tasks/redeploy_base_config.py @@ -0,0 +1,94 @@ +"""Task for redeploying base config on multiple routers at one. + +This task spawns multiple instances of the ``redeploy_base_config`` workflow, based on a list of Nokia routers given as +input by the operator. The operator can then +""" + +import json +from typing import Annotated + +from annotated_types import Len +from orchestrator.config.assignee import Assignee +from orchestrator.forms import SubmitFormPage +from orchestrator.targets import Target +from orchestrator.workflow import StepList, callback_step, conditional, done, init, inputstep, step, workflow +from pydantic import AfterValidator, ConfigDict +from pydantic_forms.types import FormGenerator, State, UUIDstr +from pydantic_forms.validators import LongText, validate_unique_list + +from gso.products.product_types.router import Router +from gso.services.subscriptions import get_active_router_subscriptions +from gso.tasks.massive_redeploy_base_config import massive_redeploy_base_config_task +from gso.utils.helpers import active_nokia_router_selector +from gso.utils.shared_enums import Vendor +from gso.utils.types.tt_number import TTNumber + + +def _input_form_generator() -> FormGenerator: + router_selection_list = Annotated[ # type: ignore[valid-type] + list[active_nokia_router_selector()], # type: ignore[misc] + AfterValidator(validate_unique_list), + Len(min_length=1), + ] + + all_active_nokia_routers = [ + router["subscription_id"] + for router in get_active_router_subscriptions() + if Router.from_subscription(router["subscription_id"]).router.vendor == Vendor.NOKIA + ] + + class RedeployBaseConfigForm(SubmitFormPage): + model_config = ConfigDict(title="Redeploy base config on multiple routers") + + tt_number: TTNumber + selected_routers: router_selection_list = all_active_nokia_routers # type: ignore[valid-type] + + user_input = yield RedeployBaseConfigForm + return user_input.model_dump() | {"failed_wfs": {}, "successful_wfs": {}} + + +@step("Start worker to redeploy base config on selected routers") +def start_redeploy_workflows(tt_number: TTNumber, selected_routers: list[UUIDstr], callback_route: str) -> State: + """Start the massive redeploy base config task with the selected routers.""" + # TODO if in the future you changed UUIDstr to UUID, you need to convert them to string when passing to the task + massive_redeploy_base_config_task.apply_async(args=[selected_routers, tt_number, callback_route], countdown=5) # type: ignore[attr-defined] + + return {"failed_wfs": {}, "successful_wfs": {}} + + +@step("Evaluate provisioning proxy result") +def evaluate_results(callback_result: dict) -> State: + """Evaluate the result of the provisioning proxy callback.""" + failed_wfs = callback_result.pop("failed_wfs", {}) + successful_wfs = callback_result.pop("successful_wfs", {}) + return {"callback_result": callback_result, "failed_wfs": failed_wfs, "successful_wfs": successful_wfs} + + +@inputstep("Some workflows have failed", assignee=Assignee.SYSTEM) +def workflows_failed_to_prompt(failed_wfs: dict, successful_wfs: dict) -> FormGenerator: + """Prompt the operator that some workflows have failed to start.""" + + class WFFailurePrompt(SubmitFormPage): + model_config = ConfigDict(title="Some redeploy workflows have failed, please inspect the list below") + failed_workflows: LongText = json.dumps(failed_wfs, indent=4) + successful_workflows: LongText = json.dumps(successful_wfs, indent=4) + + yield WFFailurePrompt + return {} + + +@workflow("Redeploy base config on multiple routers", initial_input_form=_input_form_generator, target=Target.SYSTEM) +def task_redeploy_base_config() -> StepList: + """Gather a list of routers from the operator to redeploy base config onto.""" + some_failed_to_start = conditional(lambda state: len(state.get("failed_wfs", {})) > 0) + + return ( + init + >> callback_step( + name="Start running redeploy workflows on selected routers", + action_step=start_redeploy_workflows, + validate_step=evaluate_results, + ) + >> some_failed_to_start(workflows_failed_to_prompt) + >> done + ) diff --git a/test/conftest.py b/test/conftest.py index 44e2901b6623a1105ca2d5b8f9c0a133fe162e5f..3e4a765d70cfefbd4ae800de2e18d35423ec8d6f 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -34,7 +34,6 @@ from sqlalchemy.orm import scoped_session, sessionmaker from starlette.testclient import TestClient from urllib3_mock import Responses -import gso.services.mailer from gso.services.partners import PartnerSchema, create_partner from gso.services.subscriptions import is_resource_type_value_unique from test.fixtures import * # noqa: F403 @@ -57,6 +56,8 @@ def pytest_configure(config): # Set environment variables for the test session os.environ["OSS_PARAMS_FILENAME"] = "gso/oss-params-example.json" os.environ["TESTING"] = "true" + os.environ["CELERY_TASK_ALWAYS_EAGER"] = "true" + os.environ["CELERY_TASK_EAGER_PROPAGATES"] = "true" # Register finalizers to clean up after tests are done def cleanup() -> None: @@ -593,11 +594,15 @@ def responses(): def _no_mail(monkeypatch): """Remove sending mails from all tests.""" - def send_mail(subject: str, body: str, *, destination: str | None = None) -> None: + def fake_send_mail(subject: str, body: str, *, destination: str | None = None) -> None: email = f"*** SENT AN EMAIL ***\nTO: {destination}\nSUBJECT: {subject}\nCONTENT:\n{body}" logger.info(email) - monkeypatch.setattr(gso.services.mailer, "send_mail", send_mail) + monkeypatch.setattr( + "gso.services.mailer.send_mail", + fake_send_mail, + raising=True, + ) @pytest.fixture(autouse=True) @@ -605,7 +610,7 @@ def _no_lso_interactions(monkeypatch): """Remove all external LSO calls.""" @step("Mocked playbook execution") - def _execute_playbook( + def fake_execute_playbook( playbook_name: str, callback_route: str, inventory: dict, extra_vars: dict, process_id: UUIDstr ) -> None: assert playbook_name @@ -614,4 +619,8 @@ def _no_lso_interactions(monkeypatch): assert extra_vars assert process_id - monkeypatch.setattr(gso.services.lso_client, "_execute_playbook", _execute_playbook) + monkeypatch.setattr( + "gso.services.lso_client._execute_playbook", + fake_execute_playbook, + raising=True, + ) diff --git a/test/services/test_lso_client.py b/test/services/test_lso_client.py index 78af95fc212a205f906ddb8e34d1bf5e5fec112c..7fa33323e66960e1094c41263859e2e0dfb26a2b 100644 --- a/test/services/test_lso_client.py +++ b/test/services/test_lso_client.py @@ -27,4 +27,4 @@ def test_replace_unicode_in_lso_call_success(mock_post, faker): execute_playbook = _execute_playbook.__wrapped__ execute_playbook("playbook.yaml", "/api/callback_route", {}, extra_vars, mocked_uuid) - mock_post.assert_called_once_with("https://localhost:44444/api/playbook", json=expected_parameters, timeout=10) + mock_post.assert_called_once_with("https://localhost:44444/api/playbook/", json=expected_parameters, timeout=10) diff --git a/test/tasks/__init__.py b/test/tasks/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/test/tasks/test_masssive_redeploy_base_config.py b/test/tasks/test_masssive_redeploy_base_config.py new file mode 100644 index 0000000000000000000000000000000000000000..394b2ca21f2d01139f57d49563f317bd26a7b34d --- /dev/null +++ b/test/tasks/test_masssive_redeploy_base_config.py @@ -0,0 +1,193 @@ +import logging +import uuid +from types import SimpleNamespace +from unittest.mock import patch + +from orchestrator.workflow import ProcessStatus +from pydantic import BaseModel, ValidationError +from pydantic_forms.exceptions import FormValidationError +from pydantic_i18n import PydanticI18n + +from gso.tasks.massive_redeploy_base_config import massive_redeploy_base_config_task + + +@patch("gso.tasks.massive_redeploy_base_config.requests.post") +@patch("gso.tasks.massive_redeploy_base_config.settings.load_oss_params") +@patch("gso.tasks.massive_redeploy_base_config.start_process") +@patch("gso.tasks.massive_redeploy_base_config.wait_for_workflow_to_stop") +def test_all_status_branches( + mock_wait_for_workflow_to_stop, mock_start_process, mock_load_oss, mock_post, router_subscription_factory, faker +): + """ + Test: + - Completed → successful_wfs + - Aborted → failed_wfs["Workflow was aborted"] + - Failed+reason → failed_wfs[reason] + - Failed no reason → default message + - Other status → generic formatting + """ + router_ids = [ + router_subscription_factory(router_fqdn=fqdn).subscription_id + for fqdn in [ + "r1.example.com", + "r2.example.com", + "r3.example.com", + "r4.example.com", + "r5.example.com", + ] + ] + + # stub start_process → return a dummy process_id + mock_start_process.side_effect = lambda *args, **kwargs: uuid.UUID # noqa: ARG005 + + # prepare five different ProcessTable-like objects + p1 = SimpleNamespace(last_step="Done", last_status=ProcessStatus.COMPLETED, failed_reason=None) + p2 = SimpleNamespace(last_step="X", last_status=ProcessStatus.ABORTED, failed_reason=None) + p3 = SimpleNamespace(last_step="Y", last_status=ProcessStatus.FAILED, failed_reason="Bad foo") + p4 = SimpleNamespace(last_step="Z", last_status=ProcessStatus.FAILED, failed_reason=None) + p5 = SimpleNamespace(last_step="L", last_status="RUNNING", failed_reason=None) + + mock_wait_for_workflow_to_stop.side_effect = [p1, p2, p3, p4, p5] + + mock_load_oss.return_value = SimpleNamespace(GENERAL=SimpleNamespace(internal_hostname="http://callback.host")) + mock_post.return_value = SimpleNamespace(ok=True) + + # run task + massive_redeploy_base_config_task(router_ids, faker.tt_number(), "/cb") + + expected_payload = { + "successful_wfs": {"r1.example.com": "Done"}, + "failed_wfs": { + "r2.example.com": "Workflow was aborted", + "r3.example.com": "Bad foo", + "r4.example.com": "Workflow failed without a reason", + "r5.example.com": "Workflow status: RUNNING, last step: L", + }, + } + + mock_post.assert_called_once_with("http://callback.host/cb", json=expected_payload, timeout=30) + + +@patch("gso.tasks.massive_redeploy_base_config.requests.post") +@patch("gso.tasks.massive_redeploy_base_config.settings.load_oss_params") +@patch("gso.tasks.massive_redeploy_base_config.start_process") +@patch("gso.tasks.massive_redeploy_base_config.wait_for_workflow_to_stop") +def test_timeout_and_validation_and_unexpected( + mock_wait_for_workflow_to_stop, + mock_start_process, + mock_load_oss, + mock_post, + router_subscription_factory, + faker, + caplog, +): + """ + Test three error branches: + - wait_for_workflow_to_stop → None (timeout) + - start_process raises FormValidationError + - start_process raises generic Exception + """ + # create three routers (their subscription_id is a UUID) + r_timeout = router_subscription_factory(router_fqdn="t1.example.com") + r_validate = router_subscription_factory(router_fqdn="t2.example.com") + r_crash = router_subscription_factory(router_fqdn="t3.example.com") + router_ids = [ + r_timeout.subscription_id, + r_validate.subscription_id, + r_crash.subscription_id, + ] + + # build a real ValidationError via a dummy Pydantic model + class TempModel(BaseModel): + x: int + + try: + TempModel(x="not_an_int") + except ValidationError as ve: + # supply an explicit (empty) translations dict so PydanticI18n initializes + translator = PydanticI18n(source={"en_US": {}}) + validation_exc = FormValidationError("TempModel", ve, translator, locale="en_US") + + # fake start_process: timeout for first, validation_error for second, crash for third + def fake_start(name, user_inputs): + rid = user_inputs[0]["subscription_id"] + if rid == r_validate.subscription_id: + raise validation_exc + if rid == r_crash.subscription_id: + msg = "boom" + raise RuntimeError(msg) + return f"pid-{rid}" + + mock_start_process.side_effect = fake_start + + # always timeout (None) for the first router + mock_wait_for_workflow_to_stop.return_value = None + + # stub OSS params and successful HTTP callback + mock_load_oss.return_value = SimpleNamespace(GENERAL=SimpleNamespace(internal_hostname="https://host")) + mock_post.return_value = SimpleNamespace(ok=True) + + caplog.set_level(logging.ERROR) + massive_redeploy_base_config_task(router_ids, faker.tt_number(), "/done") + + expected_failed = { + "t1.example.com": "Timed out waiting for workflow to complete", + "t2.example.com": f"Validation error: {validation_exc}", + "t3.example.com": "Unexpected error: boom", + } + expected_payload = {"successful_wfs": {}, "failed_wfs": expected_failed} + + mock_post.assert_called_once_with( + "https://host/done", + json=expected_payload, + timeout=30, + ) + + +@patch("gso.tasks.massive_redeploy_base_config.requests.post") +@patch("gso.tasks.massive_redeploy_base_config.settings.load_oss_params") +@patch("gso.tasks.massive_redeploy_base_config.start_process") +@patch("gso.tasks.massive_redeploy_base_config.wait_for_workflow_to_stop") +@patch("gso.tasks.massive_redeploy_base_config.Router.from_subscription") +def test_callback_failure_and_exception( + mock_from_subscription, + mock_wait_for_workflow_to_stop, + mock_start_process, + mock_load_oss_params, + mock_requests_post, + caplog, + router_subscription_factory, + faker, +): + """ + Test that when the HTTP callback either returns ok=False or raises, we log.exception. + """ + # Arrange: one router subscription + subscription = router_subscription_factory(router_fqdn="r1.fqdn") + mock_from_subscription.return_value = subscription + router_ids = [subscription.subscription_id] + + # workflow always completes successfully + mock_start_process.return_value = "pid" + mock_wait_for_workflow_to_stop.return_value = SimpleNamespace( + last_step="Done", + last_status=ProcessStatus.COMPLETED, + failed_reason=None, + ) + + # OSS host stub + mock_load_oss_params.return_value = SimpleNamespace(GENERAL=SimpleNamespace(internal_hostname="http://h")) + + caplog.set_level(logging.ERROR) + + # 1) callback returns ok=False → logs "Callback failed" + mock_requests_post.return_value = SimpleNamespace(ok=False, status_code=500, text="server error") + massive_redeploy_base_config_task(router_ids, faker.tt_number(), "/cb1") + assert "Callback failed" in caplog.text + + caplog.clear() + + # 2) callback raises → logs "Failed to post callback: net down" + mock_requests_post.side_effect = Exception("net down") + massive_redeploy_base_config_task(router_ids, faker.tt_number(), "/cb1") + assert "Failed to post callback: net down" in caplog.text diff --git a/test/utils/test_helpers.py b/test/utils/test_helpers.py index f68cd05230fee82143cd5a6abb3491d06d457bf6..df18bbc9959c42a395979fa6082ebc80ff3c59fd 100644 --- a/test/utils/test_helpers.py +++ b/test/utils/test_helpers.py @@ -1,6 +1,9 @@ +import logging from unittest.mock import patch +from uuid import uuid4 import pytest +from orchestrator.db import ProcessTable from orchestrator.types import SubscriptionLifecycle from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock @@ -10,6 +13,7 @@ from gso.utils.helpers import ( generate_inventory_for_routers, generate_lan_switch_interconnect_subnet_v4, generate_lan_switch_interconnect_subnet_v6, + wait_for_workflow_to_stop, ) from gso.utils.shared_enums import Vendor from gso.utils.types.tt_number import validate_tt_number @@ -162,3 +166,46 @@ def test_generate_lan_switch_interconnect_subnet_v6(execution_count, site_subscr str(generate_lan_switch_interconnect_subnet_v6(site.site.site_internal_id)) == f"beef:cafe:0:{hex(site.site.site_internal_id).split("x")[-1]}::/64" ) + + +@patch("gso.utils.helpers.time.sleep", lambda _: None) +@patch("gso.utils.helpers.get_stopped_process_by_id") +def test_wait_for_workflow_to_stop_success(mock_get_stopped, caplog): + """Simulate get_stopped_process_by_id returning a process on the 3rd attempt.""" + # Configure the side effect: two Nones, then a process + stopped_proc = ProcessTable(last_status="completed", last_step="Done") + mock_get_stopped.side_effect = [None, None, stopped_proc] + + caplog.set_level(logging.INFO) + pid = uuid4() + + proc = wait_for_workflow_to_stop( + process_id=pid, + check_interval=0, + max_retries=5, + ) + + # Assertions + assert proc is stopped_proc + assert proc.last_status == "completed" + assert mock_get_stopped.call_count == 3 + assert f"✅ Process {pid} has stopped with status: completed" in caplog.text + + +@patch("gso.utils.helpers.time.sleep", lambda _: None) +@patch("gso.utils.helpers.get_stopped_process_by_id", return_value=None) +def test_wait_for_workflow_to_stop_timeout(mock_get_stopped, caplog): + """Simulate get_stopped_process_by_id never finding a stopped process.""" + caplog.set_level(logging.ERROR) + pid = uuid4() + + result = wait_for_workflow_to_stop( + process_id=pid, + check_interval=0, + max_retries=3, + ) + + assert result is None + # max_retries * check_interval = 0 + assert f"❌ Timeout reached. Workflow {pid} did not stop after 0 seconds." in caplog.text + assert mock_get_stopped.call_count == 3 diff --git a/test/workflows/router/test_redeploy_base_config.py b/test/workflows/router/test_redeploy_base_config.py index 4dee5a5e9d18ed93cc08812f85f256db5174688c..66c915b8abea6443cab6c2152c596761b7a18342 100644 --- a/test/workflows/router/test_redeploy_base_config.py +++ b/test/workflows/router/test_redeploy_base_config.py @@ -4,13 +4,16 @@ from gso.products.product_types.router import Router from test.workflows import ( assert_complete, assert_lso_interaction_success, + assert_lso_success, extract_state, run_workflow, ) @pytest.mark.workflow() +@pytest.mark.parametrize("is_massive_redeploy", [False, True]) def test_redeploy_base_config_success( + is_massive_redeploy, router_subscription_factory, faker, ): @@ -18,11 +21,18 @@ def test_redeploy_base_config_success( product_id = str(router_subscription_factory().subscription_id) # Run workflow - initial_input_data = [{"subscription_id": product_id}, {"tt_number": faker.tt_number()}] + initial_input_data = [ + {"subscription_id": product_id}, + {"tt_number": faker.tt_number(), "is_massive_redeploy": is_massive_redeploy}, + ] result, process_stat, step_log = run_workflow("redeploy_base_config", initial_input_data) - for _ in range(2): - result, step_log = assert_lso_interaction_success(result, process_stat, step_log) + if is_massive_redeploy: + for _ in range(1): + result, step_log = assert_lso_success(result, process_stat, step_log) + else: + for _ in range(2): + result, step_log = assert_lso_interaction_success(result, process_stat, step_log) assert_complete(result) diff --git a/test/workflows/tasks/test_redeploy_base_config.py b/test/workflows/tasks/test_redeploy_base_config.py new file mode 100644 index 0000000000000000000000000000000000000000..8c3c25f29693bfbf8349c4ee68c7577d55d7c0ab --- /dev/null +++ b/test/workflows/tasks/test_redeploy_base_config.py @@ -0,0 +1,93 @@ +from unittest.mock import patch + +import pytest + +import test +from test.workflows import ( + assert_awaiting_callback, + assert_complete, + extract_state, + resume_suspended_workflow, + resume_workflow, + run_workflow, +) + + +@patch("gso.workflows.tasks.redeploy_base_config.massive_redeploy_base_config_task") +@pytest.mark.workflow() +def test_task_redeploy_base_config_success( + mocked_massive_redeploy_base_config_task, + router_subscription_factory, + faker, +): + selected_routers = [str(router_subscription_factory().subscription_id) for _ in range(2)] + + # Run workflow task + initial_input_data = [ + {"tt_number": faker.tt_number(), "selected_routers": selected_routers}, + ] + result, process_stat, step_log = run_workflow("task_redeploy_base_config", initial_input_data) + + assert_awaiting_callback(result) + result, step_log = resume_workflow( + process_stat, + step_log, + input_data={ + "callback_result": { + "failed_wfs": {}, + "successful_wfs": { + "t4.example.com": "Done", + }, + }, + }, + ) + + assert_complete(result) + + state = extract_state(result) + + assert state["tt_number"] == initial_input_data[0]["tt_number"] + assert state["failed_wfs"] == {} + assert state["successful_wfs"] == { + "t4.example.com": "Done", + } + + +@patch("gso.workflows.tasks.redeploy_base_config.massive_redeploy_base_config_task") +@pytest.mark.workflow() +def test_task_redeploy_base_config_failure( + mocked_massive_redeploy_base_config_task, router_subscription_factory, faker +): + selected_routers = [str(router_subscription_factory().subscription_id) for _ in range(2)] + + # Run workflow task + initial_input_data = [ + {"tt_number": faker.tt_number(), "selected_routers": selected_routers}, + ] + result, process_stat, step_log = run_workflow("task_redeploy_base_config", initial_input_data) + + fake_callback_result = { + "callback_result": { + "failed_wfs": { + "t1.example.com": "Timed out waiting for workflow to complete", + "t2.example.com": "Validation error: validation_exc", + "t3.example.com": "Unexpected error: boom", + }, + "successful_wfs": { + "t4.example.com": "Done", + }, + }, + } + assert_awaiting_callback(result) + result, step_log = resume_workflow(process_stat, step_log, input_data=fake_callback_result) + + result, step_log = resume_suspended_workflow( + result, process_stat, step_log, input_data=test.USER_CONFIRM_EMPTY_FORM + ) + + assert_complete(result) + state = extract_state(result) + + assert state["tt_number"] == initial_input_data[0]["tt_number"] + assert state["failed_wfs"] == fake_callback_result["callback_result"]["failed_wfs"] + assert state["successful_wfs"] == fake_callback_result["callback_result"]["successful_wfs"]