diff --git a/gso/workflows/tasks/redeploy_base_config.py b/gso/workflows/tasks/redeploy_base_config.py index 32348299228bc10b08ca99dc6d7f4855a65aa236..e1b46a79347d8b02b8382cfeb1872a2feaa59e69 100644 --- a/gso/workflows/tasks/redeploy_base_config.py +++ b/gso/workflows/tasks/redeploy_base_config.py @@ -51,7 +51,7 @@ def _input_form_generator() -> FormGenerator: def start_redeploy_workflows(tt_number: TTNumber, selected_routers: list[UUIDstr], callback_route: str) -> State: """Start the massive redeploy base config task with the selected routers.""" # TODO if in the future you changed UUIDstr to UUID, you need to convert them to string when passing to the task - massive_redeploy_base_config_task.apply_async(args=[selected_routers, tt_number, callback_route], countdown=5) + massive_redeploy_base_config_task.apply_async(args=[selected_routers, tt_number, callback_route], countdown=5) # type: ignore[attr-defined] return {"failed_wfs": {}, "successful_wfs": {}} diff --git a/test/conftest.py b/test/conftest.py index 44e2901b6623a1105ca2d5b8f9c0a133fe162e5f..3e4a765d70cfefbd4ae800de2e18d35423ec8d6f 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -34,7 +34,6 @@ from sqlalchemy.orm import scoped_session, sessionmaker from starlette.testclient import TestClient from urllib3_mock import Responses -import gso.services.mailer from gso.services.partners import PartnerSchema, create_partner from gso.services.subscriptions import is_resource_type_value_unique from test.fixtures import * # noqa: F403 @@ -57,6 +56,8 @@ def pytest_configure(config): # Set environment variables for the test session os.environ["OSS_PARAMS_FILENAME"] = "gso/oss-params-example.json" os.environ["TESTING"] = "true" + os.environ["CELERY_TASK_ALWAYS_EAGER"] = "true" + os.environ["CELERY_TASK_EAGER_PROPAGATES"] = "true" # Register finalizers to clean up after tests are done def cleanup() -> None: @@ -593,11 +594,15 @@ def responses(): def _no_mail(monkeypatch): """Remove sending mails from all tests.""" - def send_mail(subject: str, body: str, *, destination: str | None = None) -> None: + def fake_send_mail(subject: str, body: str, *, destination: str | None = None) -> None: email = f"*** SENT AN EMAIL ***\nTO: {destination}\nSUBJECT: {subject}\nCONTENT:\n{body}" logger.info(email) - monkeypatch.setattr(gso.services.mailer, "send_mail", send_mail) + monkeypatch.setattr( + "gso.services.mailer.send_mail", + fake_send_mail, + raising=True, + ) @pytest.fixture(autouse=True) @@ -605,7 +610,7 @@ def _no_lso_interactions(monkeypatch): """Remove all external LSO calls.""" @step("Mocked playbook execution") - def _execute_playbook( + def fake_execute_playbook( playbook_name: str, callback_route: str, inventory: dict, extra_vars: dict, process_id: UUIDstr ) -> None: assert playbook_name @@ -614,4 +619,8 @@ def _no_lso_interactions(monkeypatch): assert extra_vars assert process_id - monkeypatch.setattr(gso.services.lso_client, "_execute_playbook", _execute_playbook) + monkeypatch.setattr( + "gso.services.lso_client._execute_playbook", + fake_execute_playbook, + raising=True, + ) diff --git a/test/tasks/__init__.py b/test/tasks/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/test/tasks/test_masssive_redeploy_base_config.py b/test/tasks/test_masssive_redeploy_base_config.py new file mode 100644 index 0000000000000000000000000000000000000000..394b2ca21f2d01139f57d49563f317bd26a7b34d --- /dev/null +++ b/test/tasks/test_masssive_redeploy_base_config.py @@ -0,0 +1,193 @@ +import logging +import uuid +from types import SimpleNamespace +from unittest.mock import patch + +from orchestrator.workflow import ProcessStatus +from pydantic import BaseModel, ValidationError +from pydantic_forms.exceptions import FormValidationError +from pydantic_i18n import PydanticI18n + +from gso.tasks.massive_redeploy_base_config import massive_redeploy_base_config_task + + +@patch("gso.tasks.massive_redeploy_base_config.requests.post") +@patch("gso.tasks.massive_redeploy_base_config.settings.load_oss_params") +@patch("gso.tasks.massive_redeploy_base_config.start_process") +@patch("gso.tasks.massive_redeploy_base_config.wait_for_workflow_to_stop") +def test_all_status_branches( + mock_wait_for_workflow_to_stop, mock_start_process, mock_load_oss, mock_post, router_subscription_factory, faker +): + """ + Test: + - Completed → successful_wfs + - Aborted → failed_wfs["Workflow was aborted"] + - Failed+reason → failed_wfs[reason] + - Failed no reason → default message + - Other status → generic formatting + """ + router_ids = [ + router_subscription_factory(router_fqdn=fqdn).subscription_id + for fqdn in [ + "r1.example.com", + "r2.example.com", + "r3.example.com", + "r4.example.com", + "r5.example.com", + ] + ] + + # stub start_process → return a dummy process_id + mock_start_process.side_effect = lambda *args, **kwargs: uuid.UUID # noqa: ARG005 + + # prepare five different ProcessTable-like objects + p1 = SimpleNamespace(last_step="Done", last_status=ProcessStatus.COMPLETED, failed_reason=None) + p2 = SimpleNamespace(last_step="X", last_status=ProcessStatus.ABORTED, failed_reason=None) + p3 = SimpleNamespace(last_step="Y", last_status=ProcessStatus.FAILED, failed_reason="Bad foo") + p4 = SimpleNamespace(last_step="Z", last_status=ProcessStatus.FAILED, failed_reason=None) + p5 = SimpleNamespace(last_step="L", last_status="RUNNING", failed_reason=None) + + mock_wait_for_workflow_to_stop.side_effect = [p1, p2, p3, p4, p5] + + mock_load_oss.return_value = SimpleNamespace(GENERAL=SimpleNamespace(internal_hostname="http://callback.host")) + mock_post.return_value = SimpleNamespace(ok=True) + + # run task + massive_redeploy_base_config_task(router_ids, faker.tt_number(), "/cb") + + expected_payload = { + "successful_wfs": {"r1.example.com": "Done"}, + "failed_wfs": { + "r2.example.com": "Workflow was aborted", + "r3.example.com": "Bad foo", + "r4.example.com": "Workflow failed without a reason", + "r5.example.com": "Workflow status: RUNNING, last step: L", + }, + } + + mock_post.assert_called_once_with("http://callback.host/cb", json=expected_payload, timeout=30) + + +@patch("gso.tasks.massive_redeploy_base_config.requests.post") +@patch("gso.tasks.massive_redeploy_base_config.settings.load_oss_params") +@patch("gso.tasks.massive_redeploy_base_config.start_process") +@patch("gso.tasks.massive_redeploy_base_config.wait_for_workflow_to_stop") +def test_timeout_and_validation_and_unexpected( + mock_wait_for_workflow_to_stop, + mock_start_process, + mock_load_oss, + mock_post, + router_subscription_factory, + faker, + caplog, +): + """ + Test three error branches: + - wait_for_workflow_to_stop → None (timeout) + - start_process raises FormValidationError + - start_process raises generic Exception + """ + # create three routers (their subscription_id is a UUID) + r_timeout = router_subscription_factory(router_fqdn="t1.example.com") + r_validate = router_subscription_factory(router_fqdn="t2.example.com") + r_crash = router_subscription_factory(router_fqdn="t3.example.com") + router_ids = [ + r_timeout.subscription_id, + r_validate.subscription_id, + r_crash.subscription_id, + ] + + # build a real ValidationError via a dummy Pydantic model + class TempModel(BaseModel): + x: int + + try: + TempModel(x="not_an_int") + except ValidationError as ve: + # supply an explicit (empty) translations dict so PydanticI18n initializes + translator = PydanticI18n(source={"en_US": {}}) + validation_exc = FormValidationError("TempModel", ve, translator, locale="en_US") + + # fake start_process: timeout for first, validation_error for second, crash for third + def fake_start(name, user_inputs): + rid = user_inputs[0]["subscription_id"] + if rid == r_validate.subscription_id: + raise validation_exc + if rid == r_crash.subscription_id: + msg = "boom" + raise RuntimeError(msg) + return f"pid-{rid}" + + mock_start_process.side_effect = fake_start + + # always timeout (None) for the first router + mock_wait_for_workflow_to_stop.return_value = None + + # stub OSS params and successful HTTP callback + mock_load_oss.return_value = SimpleNamespace(GENERAL=SimpleNamespace(internal_hostname="https://host")) + mock_post.return_value = SimpleNamespace(ok=True) + + caplog.set_level(logging.ERROR) + massive_redeploy_base_config_task(router_ids, faker.tt_number(), "/done") + + expected_failed = { + "t1.example.com": "Timed out waiting for workflow to complete", + "t2.example.com": f"Validation error: {validation_exc}", + "t3.example.com": "Unexpected error: boom", + } + expected_payload = {"successful_wfs": {}, "failed_wfs": expected_failed} + + mock_post.assert_called_once_with( + "https://host/done", + json=expected_payload, + timeout=30, + ) + + +@patch("gso.tasks.massive_redeploy_base_config.requests.post") +@patch("gso.tasks.massive_redeploy_base_config.settings.load_oss_params") +@patch("gso.tasks.massive_redeploy_base_config.start_process") +@patch("gso.tasks.massive_redeploy_base_config.wait_for_workflow_to_stop") +@patch("gso.tasks.massive_redeploy_base_config.Router.from_subscription") +def test_callback_failure_and_exception( + mock_from_subscription, + mock_wait_for_workflow_to_stop, + mock_start_process, + mock_load_oss_params, + mock_requests_post, + caplog, + router_subscription_factory, + faker, +): + """ + Test that when the HTTP callback either returns ok=False or raises, we log.exception. + """ + # Arrange: one router subscription + subscription = router_subscription_factory(router_fqdn="r1.fqdn") + mock_from_subscription.return_value = subscription + router_ids = [subscription.subscription_id] + + # workflow always completes successfully + mock_start_process.return_value = "pid" + mock_wait_for_workflow_to_stop.return_value = SimpleNamespace( + last_step="Done", + last_status=ProcessStatus.COMPLETED, + failed_reason=None, + ) + + # OSS host stub + mock_load_oss_params.return_value = SimpleNamespace(GENERAL=SimpleNamespace(internal_hostname="http://h")) + + caplog.set_level(logging.ERROR) + + # 1) callback returns ok=False → logs "Callback failed" + mock_requests_post.return_value = SimpleNamespace(ok=False, status_code=500, text="server error") + massive_redeploy_base_config_task(router_ids, faker.tt_number(), "/cb1") + assert "Callback failed" in caplog.text + + caplog.clear() + + # 2) callback raises → logs "Failed to post callback: net down" + mock_requests_post.side_effect = Exception("net down") + massive_redeploy_base_config_task(router_ids, faker.tt_number(), "/cb1") + assert "Failed to post callback: net down" in caplog.text diff --git a/test/utils/test_helpers.py b/test/utils/test_helpers.py index f68cd05230fee82143cd5a6abb3491d06d457bf6..df18bbc9959c42a395979fa6082ebc80ff3c59fd 100644 --- a/test/utils/test_helpers.py +++ b/test/utils/test_helpers.py @@ -1,6 +1,9 @@ +import logging from unittest.mock import patch +from uuid import uuid4 import pytest +from orchestrator.db import ProcessTable from orchestrator.types import SubscriptionLifecycle from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock @@ -10,6 +13,7 @@ from gso.utils.helpers import ( generate_inventory_for_routers, generate_lan_switch_interconnect_subnet_v4, generate_lan_switch_interconnect_subnet_v6, + wait_for_workflow_to_stop, ) from gso.utils.shared_enums import Vendor from gso.utils.types.tt_number import validate_tt_number @@ -162,3 +166,46 @@ def test_generate_lan_switch_interconnect_subnet_v6(execution_count, site_subscr str(generate_lan_switch_interconnect_subnet_v6(site.site.site_internal_id)) == f"beef:cafe:0:{hex(site.site.site_internal_id).split("x")[-1]}::/64" ) + + +@patch("gso.utils.helpers.time.sleep", lambda _: None) +@patch("gso.utils.helpers.get_stopped_process_by_id") +def test_wait_for_workflow_to_stop_success(mock_get_stopped, caplog): + """Simulate get_stopped_process_by_id returning a process on the 3rd attempt.""" + # Configure the side effect: two Nones, then a process + stopped_proc = ProcessTable(last_status="completed", last_step="Done") + mock_get_stopped.side_effect = [None, None, stopped_proc] + + caplog.set_level(logging.INFO) + pid = uuid4() + + proc = wait_for_workflow_to_stop( + process_id=pid, + check_interval=0, + max_retries=5, + ) + + # Assertions + assert proc is stopped_proc + assert proc.last_status == "completed" + assert mock_get_stopped.call_count == 3 + assert f"✅ Process {pid} has stopped with status: completed" in caplog.text + + +@patch("gso.utils.helpers.time.sleep", lambda _: None) +@patch("gso.utils.helpers.get_stopped_process_by_id", return_value=None) +def test_wait_for_workflow_to_stop_timeout(mock_get_stopped, caplog): + """Simulate get_stopped_process_by_id never finding a stopped process.""" + caplog.set_level(logging.ERROR) + pid = uuid4() + + result = wait_for_workflow_to_stop( + process_id=pid, + check_interval=0, + max_retries=3, + ) + + assert result is None + # max_retries * check_interval = 0 + assert f"❌ Timeout reached. Workflow {pid} did not stop after 0 seconds." in caplog.text + assert mock_get_stopped.call_count == 3 diff --git a/test/workflows/router/test_redeploy_base_config.py b/test/workflows/router/test_redeploy_base_config.py index 3f07c79a24dd8d3d6446fbfd09a0962f5c1b87c2..66c915b8abea6443cab6c2152c596761b7a18342 100644 --- a/test/workflows/router/test_redeploy_base_config.py +++ b/test/workflows/router/test_redeploy_base_config.py @@ -4,6 +4,7 @@ from gso.products.product_types.router import Router from test.workflows import ( assert_complete, assert_lso_interaction_success, + assert_lso_success, extract_state, run_workflow, ) @@ -25,8 +26,13 @@ def test_redeploy_base_config_success( {"tt_number": faker.tt_number(), "is_massive_redeploy": is_massive_redeploy}, ] result, process_stat, step_log = run_workflow("redeploy_base_config", initial_input_data) - for _ in range(1 if is_massive_redeploy else 2): - result, step_log = assert_lso_interaction_success(result, process_stat, step_log) + + if is_massive_redeploy: + for _ in range(1): + result, step_log = assert_lso_success(result, process_stat, step_log) + else: + for _ in range(2): + result, step_log = assert_lso_interaction_success(result, process_stat, step_log) assert_complete(result) diff --git a/test/workflows/tasks/test_redeploy_base_config.py b/test/workflows/tasks/test_redeploy_base_config.py new file mode 100644 index 0000000000000000000000000000000000000000..8c3c25f29693bfbf8349c4ee68c7577d55d7c0ab --- /dev/null +++ b/test/workflows/tasks/test_redeploy_base_config.py @@ -0,0 +1,93 @@ +from unittest.mock import patch + +import pytest + +import test +from test.workflows import ( + assert_awaiting_callback, + assert_complete, + extract_state, + resume_suspended_workflow, + resume_workflow, + run_workflow, +) + + +@patch("gso.workflows.tasks.redeploy_base_config.massive_redeploy_base_config_task") +@pytest.mark.workflow() +def test_task_redeploy_base_config_success( + mocked_massive_redeploy_base_config_task, + router_subscription_factory, + faker, +): + selected_routers = [str(router_subscription_factory().subscription_id) for _ in range(2)] + + # Run workflow task + initial_input_data = [ + {"tt_number": faker.tt_number(), "selected_routers": selected_routers}, + ] + result, process_stat, step_log = run_workflow("task_redeploy_base_config", initial_input_data) + + assert_awaiting_callback(result) + result, step_log = resume_workflow( + process_stat, + step_log, + input_data={ + "callback_result": { + "failed_wfs": {}, + "successful_wfs": { + "t4.example.com": "Done", + }, + }, + }, + ) + + assert_complete(result) + + state = extract_state(result) + + assert state["tt_number"] == initial_input_data[0]["tt_number"] + assert state["failed_wfs"] == {} + assert state["successful_wfs"] == { + "t4.example.com": "Done", + } + + +@patch("gso.workflows.tasks.redeploy_base_config.massive_redeploy_base_config_task") +@pytest.mark.workflow() +def test_task_redeploy_base_config_failure( + mocked_massive_redeploy_base_config_task, router_subscription_factory, faker +): + selected_routers = [str(router_subscription_factory().subscription_id) for _ in range(2)] + + # Run workflow task + initial_input_data = [ + {"tt_number": faker.tt_number(), "selected_routers": selected_routers}, + ] + result, process_stat, step_log = run_workflow("task_redeploy_base_config", initial_input_data) + + fake_callback_result = { + "callback_result": { + "failed_wfs": { + "t1.example.com": "Timed out waiting for workflow to complete", + "t2.example.com": "Validation error: validation_exc", + "t3.example.com": "Unexpected error: boom", + }, + "successful_wfs": { + "t4.example.com": "Done", + }, + }, + } + assert_awaiting_callback(result) + result, step_log = resume_workflow(process_stat, step_log, input_data=fake_callback_result) + + result, step_log = resume_suspended_workflow( + result, process_stat, step_log, input_data=test.USER_CONFIRM_EMPTY_FORM + ) + + assert_complete(result) + state = extract_state(result) + + assert state["tt_number"] == initial_input_data[0]["tt_number"] + assert state["failed_wfs"] == fake_callback_result["callback_result"]["failed_wfs"] + assert state["successful_wfs"] == fake_callback_result["callback_result"]["successful_wfs"]