Skip to content
Snippets Groups Projects
Commit 1ac68a56 authored by Mohammad Torkashvand's avatar Mohammad Torkashvand
Browse files

add unit tests for massive base config redeploy

parent 955fdd15
No related branches found
No related tags found
1 merge request!429Feature/mass base config redeploy
Pipeline #94770 canceled
......@@ -51,7 +51,7 @@ def _input_form_generator() -> FormGenerator:
def start_redeploy_workflows(tt_number: TTNumber, selected_routers: list[UUIDstr], callback_route: str) -> State:
"""Start the massive redeploy base config task with the selected routers."""
# TODO if in the future you changed UUIDstr to UUID, you need to convert them to string when passing to the task
massive_redeploy_base_config_task.apply_async(args=[selected_routers, tt_number, callback_route], countdown=5)
massive_redeploy_base_config_task.apply_async(args=[selected_routers, tt_number, callback_route], countdown=5) # type: ignore[attr-defined]
return {"failed_wfs": {}, "successful_wfs": {}}
......
......@@ -34,7 +34,6 @@ from sqlalchemy.orm import scoped_session, sessionmaker
from starlette.testclient import TestClient
from urllib3_mock import Responses
import gso.services.mailer
from gso.services.partners import PartnerSchema, create_partner
from gso.services.subscriptions import is_resource_type_value_unique
from test.fixtures import * # noqa: F403
......@@ -57,6 +56,8 @@ def pytest_configure(config):
# Set environment variables for the test session
os.environ["OSS_PARAMS_FILENAME"] = "gso/oss-params-example.json"
os.environ["TESTING"] = "true"
os.environ["CELERY_TASK_ALWAYS_EAGER"] = "true"
os.environ["CELERY_TASK_EAGER_PROPAGATES"] = "true"
# Register finalizers to clean up after tests are done
def cleanup() -> None:
......@@ -593,11 +594,15 @@ def responses():
def _no_mail(monkeypatch):
"""Remove sending mails from all tests."""
def send_mail(subject: str, body: str, *, destination: str | None = None) -> None:
def fake_send_mail(subject: str, body: str, *, destination: str | None = None) -> None:
email = f"*** SENT AN EMAIL ***\nTO: {destination}\nSUBJECT: {subject}\nCONTENT:\n{body}"
logger.info(email)
monkeypatch.setattr(gso.services.mailer, "send_mail", send_mail)
monkeypatch.setattr(
"gso.services.mailer.send_mail",
fake_send_mail,
raising=True,
)
@pytest.fixture(autouse=True)
......@@ -605,7 +610,7 @@ def _no_lso_interactions(monkeypatch):
"""Remove all external LSO calls."""
@step("Mocked playbook execution")
def _execute_playbook(
def fake_execute_playbook(
playbook_name: str, callback_route: str, inventory: dict, extra_vars: dict, process_id: UUIDstr
) -> None:
assert playbook_name
......@@ -614,4 +619,8 @@ def _no_lso_interactions(monkeypatch):
assert extra_vars
assert process_id
monkeypatch.setattr(gso.services.lso_client, "_execute_playbook", _execute_playbook)
monkeypatch.setattr(
"gso.services.lso_client._execute_playbook",
fake_execute_playbook,
raising=True,
)
import logging
import uuid
from types import SimpleNamespace
from unittest.mock import patch
from orchestrator.workflow import ProcessStatus
from pydantic import BaseModel, ValidationError
from pydantic_forms.exceptions import FormValidationError
from pydantic_i18n import PydanticI18n
from gso.tasks.massive_redeploy_base_config import massive_redeploy_base_config_task
@patch("gso.tasks.massive_redeploy_base_config.requests.post")
@patch("gso.tasks.massive_redeploy_base_config.settings.load_oss_params")
@patch("gso.tasks.massive_redeploy_base_config.start_process")
@patch("gso.tasks.massive_redeploy_base_config.wait_for_workflow_to_stop")
def test_all_status_branches(
mock_wait_for_workflow_to_stop, mock_start_process, mock_load_oss, mock_post, router_subscription_factory, faker
):
"""
Test:
- Completed → successful_wfs
- Aborted → failed_wfs["Workflow was aborted"]
- Failed+reason → failed_wfs[reason]
- Failed no reason → default message
- Other status → generic formatting
"""
router_ids = [
router_subscription_factory(router_fqdn=fqdn).subscription_id
for fqdn in [
"r1.example.com",
"r2.example.com",
"r3.example.com",
"r4.example.com",
"r5.example.com",
]
]
# stub start_process → return a dummy process_id
mock_start_process.side_effect = lambda *args, **kwargs: uuid.UUID # noqa: ARG005
# prepare five different ProcessTable-like objects
p1 = SimpleNamespace(last_step="Done", last_status=ProcessStatus.COMPLETED, failed_reason=None)
p2 = SimpleNamespace(last_step="X", last_status=ProcessStatus.ABORTED, failed_reason=None)
p3 = SimpleNamespace(last_step="Y", last_status=ProcessStatus.FAILED, failed_reason="Bad foo")
p4 = SimpleNamespace(last_step="Z", last_status=ProcessStatus.FAILED, failed_reason=None)
p5 = SimpleNamespace(last_step="L", last_status="RUNNING", failed_reason=None)
mock_wait_for_workflow_to_stop.side_effect = [p1, p2, p3, p4, p5]
mock_load_oss.return_value = SimpleNamespace(GENERAL=SimpleNamespace(internal_hostname="http://callback.host"))
mock_post.return_value = SimpleNamespace(ok=True)
# run task
massive_redeploy_base_config_task(router_ids, faker.tt_number(), "/cb")
expected_payload = {
"successful_wfs": {"r1.example.com": "Done"},
"failed_wfs": {
"r2.example.com": "Workflow was aborted",
"r3.example.com": "Bad foo",
"r4.example.com": "Workflow failed without a reason",
"r5.example.com": "Workflow status: RUNNING, last step: L",
},
}
mock_post.assert_called_once_with("http://callback.host/cb", json=expected_payload, timeout=30)
@patch("gso.tasks.massive_redeploy_base_config.requests.post")
@patch("gso.tasks.massive_redeploy_base_config.settings.load_oss_params")
@patch("gso.tasks.massive_redeploy_base_config.start_process")
@patch("gso.tasks.massive_redeploy_base_config.wait_for_workflow_to_stop")
def test_timeout_and_validation_and_unexpected(
mock_wait_for_workflow_to_stop,
mock_start_process,
mock_load_oss,
mock_post,
router_subscription_factory,
faker,
caplog,
):
"""
Test three error branches:
- wait_for_workflow_to_stop → None (timeout)
- start_process raises FormValidationError
- start_process raises generic Exception
"""
# create three routers (their subscription_id is a UUID)
r_timeout = router_subscription_factory(router_fqdn="t1.example.com")
r_validate = router_subscription_factory(router_fqdn="t2.example.com")
r_crash = router_subscription_factory(router_fqdn="t3.example.com")
router_ids = [
r_timeout.subscription_id,
r_validate.subscription_id,
r_crash.subscription_id,
]
# build a real ValidationError via a dummy Pydantic model
class TempModel(BaseModel):
x: int
try:
TempModel(x="not_an_int")
except ValidationError as ve:
# supply an explicit (empty) translations dict so PydanticI18n initializes
translator = PydanticI18n(source={"en_US": {}})
validation_exc = FormValidationError("TempModel", ve, translator, locale="en_US")
# fake start_process: timeout for first, validation_error for second, crash for third
def fake_start(name, user_inputs):
rid = user_inputs[0]["subscription_id"]
if rid == r_validate.subscription_id:
raise validation_exc
if rid == r_crash.subscription_id:
msg = "boom"
raise RuntimeError(msg)
return f"pid-{rid}"
mock_start_process.side_effect = fake_start
# always timeout (None) for the first router
mock_wait_for_workflow_to_stop.return_value = None
# stub OSS params and successful HTTP callback
mock_load_oss.return_value = SimpleNamespace(GENERAL=SimpleNamespace(internal_hostname="https://host"))
mock_post.return_value = SimpleNamespace(ok=True)
caplog.set_level(logging.ERROR)
massive_redeploy_base_config_task(router_ids, faker.tt_number(), "/done")
expected_failed = {
"t1.example.com": "Timed out waiting for workflow to complete",
"t2.example.com": f"Validation error: {validation_exc}",
"t3.example.com": "Unexpected error: boom",
}
expected_payload = {"successful_wfs": {}, "failed_wfs": expected_failed}
mock_post.assert_called_once_with(
"https://host/done",
json=expected_payload,
timeout=30,
)
@patch("gso.tasks.massive_redeploy_base_config.requests.post")
@patch("gso.tasks.massive_redeploy_base_config.settings.load_oss_params")
@patch("gso.tasks.massive_redeploy_base_config.start_process")
@patch("gso.tasks.massive_redeploy_base_config.wait_for_workflow_to_stop")
@patch("gso.tasks.massive_redeploy_base_config.Router.from_subscription")
def test_callback_failure_and_exception(
mock_from_subscription,
mock_wait_for_workflow_to_stop,
mock_start_process,
mock_load_oss_params,
mock_requests_post,
caplog,
router_subscription_factory,
faker,
):
"""
Test that when the HTTP callback either returns ok=False or raises, we log.exception.
"""
# Arrange: one router subscription
subscription = router_subscription_factory(router_fqdn="r1.fqdn")
mock_from_subscription.return_value = subscription
router_ids = [subscription.subscription_id]
# workflow always completes successfully
mock_start_process.return_value = "pid"
mock_wait_for_workflow_to_stop.return_value = SimpleNamespace(
last_step="Done",
last_status=ProcessStatus.COMPLETED,
failed_reason=None,
)
# OSS host stub
mock_load_oss_params.return_value = SimpleNamespace(GENERAL=SimpleNamespace(internal_hostname="http://h"))
caplog.set_level(logging.ERROR)
# 1) callback returns ok=False → logs "Callback failed"
mock_requests_post.return_value = SimpleNamespace(ok=False, status_code=500, text="server error")
massive_redeploy_base_config_task(router_ids, faker.tt_number(), "/cb1")
assert "Callback failed" in caplog.text
caplog.clear()
# 2) callback raises → logs "Failed to post callback: net down"
mock_requests_post.side_effect = Exception("net down")
massive_redeploy_base_config_task(router_ids, faker.tt_number(), "/cb1")
assert "Failed to post callback: net down" in caplog.text
import logging
from unittest.mock import patch
from uuid import uuid4
import pytest
from orchestrator.db import ProcessTable
from orchestrator.types import SubscriptionLifecycle
from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock
......@@ -10,6 +13,7 @@ from gso.utils.helpers import (
generate_inventory_for_routers,
generate_lan_switch_interconnect_subnet_v4,
generate_lan_switch_interconnect_subnet_v6,
wait_for_workflow_to_stop,
)
from gso.utils.shared_enums import Vendor
from gso.utils.types.tt_number import validate_tt_number
......@@ -162,3 +166,46 @@ def test_generate_lan_switch_interconnect_subnet_v6(execution_count, site_subscr
str(generate_lan_switch_interconnect_subnet_v6(site.site.site_internal_id))
== f"beef:cafe:0:{hex(site.site.site_internal_id).split("x")[-1]}::/64"
)
@patch("gso.utils.helpers.time.sleep", lambda _: None)
@patch("gso.utils.helpers.get_stopped_process_by_id")
def test_wait_for_workflow_to_stop_success(mock_get_stopped, caplog):
"""Simulate get_stopped_process_by_id returning a process on the 3rd attempt."""
# Configure the side effect: two Nones, then a process
stopped_proc = ProcessTable(last_status="completed", last_step="Done")
mock_get_stopped.side_effect = [None, None, stopped_proc]
caplog.set_level(logging.INFO)
pid = uuid4()
proc = wait_for_workflow_to_stop(
process_id=pid,
check_interval=0,
max_retries=5,
)
# Assertions
assert proc is stopped_proc
assert proc.last_status == "completed"
assert mock_get_stopped.call_count == 3
assert f"✅ Process {pid} has stopped with status: completed" in caplog.text
@patch("gso.utils.helpers.time.sleep", lambda _: None)
@patch("gso.utils.helpers.get_stopped_process_by_id", return_value=None)
def test_wait_for_workflow_to_stop_timeout(mock_get_stopped, caplog):
"""Simulate get_stopped_process_by_id never finding a stopped process."""
caplog.set_level(logging.ERROR)
pid = uuid4()
result = wait_for_workflow_to_stop(
process_id=pid,
check_interval=0,
max_retries=3,
)
assert result is None
# max_retries * check_interval = 0
assert f"❌ Timeout reached. Workflow {pid} did not stop after 0 seconds." in caplog.text
assert mock_get_stopped.call_count == 3
......@@ -4,6 +4,7 @@ from gso.products.product_types.router import Router
from test.workflows import (
assert_complete,
assert_lso_interaction_success,
assert_lso_success,
extract_state,
run_workflow,
)
......@@ -25,8 +26,13 @@ def test_redeploy_base_config_success(
{"tt_number": faker.tt_number(), "is_massive_redeploy": is_massive_redeploy},
]
result, process_stat, step_log = run_workflow("redeploy_base_config", initial_input_data)
for _ in range(1 if is_massive_redeploy else 2):
result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
if is_massive_redeploy:
for _ in range(1):
result, step_log = assert_lso_success(result, process_stat, step_log)
else:
for _ in range(2):
result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
assert_complete(result)
......
from unittest.mock import patch
import pytest
import test
from test.workflows import (
assert_awaiting_callback,
assert_complete,
extract_state,
resume_suspended_workflow,
resume_workflow,
run_workflow,
)
@patch("gso.workflows.tasks.redeploy_base_config.massive_redeploy_base_config_task")
@pytest.mark.workflow()
def test_task_redeploy_base_config_success(
mocked_massive_redeploy_base_config_task,
router_subscription_factory,
faker,
):
selected_routers = [str(router_subscription_factory().subscription_id) for _ in range(2)]
# Run workflow task
initial_input_data = [
{"tt_number": faker.tt_number(), "selected_routers": selected_routers},
]
result, process_stat, step_log = run_workflow("task_redeploy_base_config", initial_input_data)
assert_awaiting_callback(result)
result, step_log = resume_workflow(
process_stat,
step_log,
input_data={
"callback_result": {
"failed_wfs": {},
"successful_wfs": {
"t4.example.com": "Done",
},
},
},
)
assert_complete(result)
state = extract_state(result)
assert state["tt_number"] == initial_input_data[0]["tt_number"]
assert state["failed_wfs"] == {}
assert state["successful_wfs"] == {
"t4.example.com": "Done",
}
@patch("gso.workflows.tasks.redeploy_base_config.massive_redeploy_base_config_task")
@pytest.mark.workflow()
def test_task_redeploy_base_config_failure(
mocked_massive_redeploy_base_config_task, router_subscription_factory, faker
):
selected_routers = [str(router_subscription_factory().subscription_id) for _ in range(2)]
# Run workflow task
initial_input_data = [
{"tt_number": faker.tt_number(), "selected_routers": selected_routers},
]
result, process_stat, step_log = run_workflow("task_redeploy_base_config", initial_input_data)
fake_callback_result = {
"callback_result": {
"failed_wfs": {
"t1.example.com": "Timed out waiting for workflow to complete",
"t2.example.com": "Validation error: validation_exc",
"t3.example.com": "Unexpected error: boom",
},
"successful_wfs": {
"t4.example.com": "Done",
},
},
}
assert_awaiting_callback(result)
result, step_log = resume_workflow(process_stat, step_log, input_data=fake_callback_result)
result, step_log = resume_suspended_workflow(
result, process_stat, step_log, input_data=test.USER_CONFIRM_EMPTY_FORM
)
assert_complete(result)
state = extract_state(result)
assert state["tt_number"] == initial_input_data[0]["tt_number"]
assert state["failed_wfs"] == fake_callback_result["callback_result"]["failed_wfs"]
assert state["successful_wfs"] == fake_callback_result["callback_result"]["successful_wfs"]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment