From 3f27b8a5cf35d5b001b28b757183a324409b4c51 Mon Sep 17 00:00:00 2001
From: Karel van Klink <karel.vanklink@geant.org>
Date: Fri, 27 Oct 2023 12:37:17 +0200
Subject: [PATCH] update unit tests

---
 gso/main.py                                   |   6 +-
 test/conftest.py                              |   8 +-
 test/workflows/iptrunk/test_create_iptrunk.py |  26 +--
 .../iptrunks/iptrunks/test_create_iptrunks.py | 151 ------------------
 4 files changed, 23 insertions(+), 168 deletions(-)
 delete mode 100644 test/workflows/iptrunks/iptrunks/test_create_iptrunks.py

diff --git a/gso/main.py b/gso/main.py
index 112bd535..398afb21 100644
--- a/gso/main.py
+++ b/gso/main.py
@@ -11,9 +11,9 @@ from gso.api import router as api_router
 
 
 def init_gso_app(settings: AppSettings) -> OrchestratorCore:
-    app = OrchestratorCore(base_settings=settings)
-    app.include_router(api_router, prefix="/api")
-    return app
+    gso_app = OrchestratorCore(base_settings=settings)
+    gso_app.include_router(api_router, prefix="/api")
+    return gso_app
 
 
 def init_cli_app() -> typer.Typer:
diff --git a/test/conftest.py b/test/conftest.py
index f46642a1..5616e3b6 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -257,8 +257,14 @@ def db_session(database):
 
 @pytest.fixture(scope="session", autouse=True)
 def fastapi_app(database, db_uri):
-    """Load the GSO FastAPI app for testing purposes."""
+    """Load the GSO FastAPI app for testing purposes.
 
+    This implementation is as close as possible to the one present in orchestrator-core.
+    """
+    from oauth2_lib.settings import oauth2lib_settings
+
+    oauth2lib_settings.OAUTH2_ACTIVE = False
+    oauth2lib_settings.ENVIRONMENT_IGNORE_MUTATION_DISABLED = ["local", "TESTING"]
     app_settings.DATABASE_URI = db_uri
     return init_gso_app(settings=app_settings)
 
diff --git a/test/workflows/iptrunk/test_create_iptrunk.py b/test/workflows/iptrunk/test_create_iptrunk.py
index d5c23c2a..a152d1c6 100644
--- a/test/workflows/iptrunk/test_create_iptrunk.py
+++ b/test/workflows/iptrunk/test_create_iptrunk.py
@@ -1,7 +1,9 @@
+import uuid
 from os import PathLike
 from unittest.mock import patch
 
 import pytest
+from orchestrator.services.processes import continue_awaiting_process
 
 from gso.products import Iptrunk, ProductType
 from gso.products.product_blocks.iptrunk import IptrunkType, PhyPortCapacity
@@ -101,6 +103,7 @@ def test_successful_iptrunk_creation_with_standard_lso_result(
     faker,
     data_config_filename: PathLike,
     netbox_client_mock,
+    test_client,
 ):
     mock_allocate_v4_network.return_value = faker.ipv4_network()
     mock_allocate_v6_network.return_value = faker.ipv6_network()
@@ -110,20 +113,17 @@ def test_successful_iptrunk_creation_with_standard_lso_result(
     assert_awaiting_callback(result)
 
     standard_lso_result = {
-        "pp_run_results": {
-            "status": "ok",
-            "job_id": "random_job_id",
-            "output": "parsed_output",
-            "return_code": 0,
-        },
-        "confirm": "ACCEPTED",
+        "status": "success",
+        "job_id": str(uuid.uuid4()),
+        "output": "parsed_output",
+        "return_code": 0
     }
-    for _ in range(5):
-        result, step_log = user_accept_and_assert_suspended(process_stat, step_log, standard_lso_result)
-        result, step_log = user_accept_and_assert_suspended(process_stat, step_log, [{}, {}])
 
-    result, step_log = user_accept_and_assert_suspended(process_stat, step_log, standard_lso_result)
-    result, step_log = resume_workflow(process_stat, step_log, [{}, {}])
+    for _ in range(2):
+        assert_awaiting_callback(result)
+        current_state = extract_state(result)
+        continue_awaiting_process(process_stat, token=current_state["callback_token"], input_data=standard_lso_result)
+
     assert_complete(result)
 
     state = extract_state(result)
@@ -159,7 +159,7 @@ def test_iptrunk_creation_fails_when_lso_return_code_is_one(
 
     initial_site_data = [{"product": product_id}, *input_form_wizard_data]
     result, process_stat, step_log = run_workflow("create_iptrunk", initial_site_data)
-    assert_awaiting_callback(result)
+    assert_suspended(result)
 
     standard_lso_result = {
         "pp_run_results": {
diff --git a/test/workflows/iptrunks/iptrunks/test_create_iptrunks.py b/test/workflows/iptrunks/iptrunks/test_create_iptrunks.py
deleted file mode 100644
index 3ec39fd8..00000000
--- a/test/workflows/iptrunks/iptrunks/test_create_iptrunks.py
+++ /dev/null
@@ -1,151 +0,0 @@
-from unittest.mock import patch
-
-import pytest
-
-from gso.products import Iptrunk, ProductType
-from gso.products.product_blocks.iptrunk import IptrunkType, PhyPortCapacity
-from gso.services.crm import customer_selector, get_customer_by_name
-from gso.services.subscriptions import get_product_id_by_name
-from test.workflows import (
-    assert_aborted,
-    assert_complete,
-    assert_suspended,
-    extract_state,
-    resume_workflow,
-    run_workflow,
-)
-
-
-@pytest.fixture
-def input_form_wizard_data(router_subscription_factory, faker):
-    router_side_a = router_subscription_factory()
-    router_side_b = router_subscription_factory()
-
-    create_ip_trunk_step = {
-        "tt_number": faker.pystr(),
-        "customer": getattr(customer_selector(), get_customer_by_name("GÉANT")["id"]),
-        "geant_s_sid": faker.pystr(),
-        "iptrunk_type": IptrunkType.DARK_FIBER,
-        "iptrunk_description": faker.sentence(),
-        "iptrunk_speed": PhyPortCapacity.HUNDRED_GIGABIT_PER_SECOND,
-        "iptrunk_minimum_links": 5,
-    }
-    create_ip_trunk_side_a_step = {
-        "iptrunk_sideA_node_id": router_side_a,
-        "iptrunk_sideA_ae_iface": faker.pystr(),
-        "iptrunk_sideA_ae_geant_a_sid": faker.pystr(),
-        "iptrunk_sideA_ae_members": [faker.pystr() for _ in range(5)],
-        "iptrunk_sideA_ae_members_descriptions": [faker.sentence() for _ in range(5)],
-    }
-
-    create_ip_trunk_side_b_step = {
-        "iptrunk_sideB_node_id": router_side_b,
-        "iptrunk_sideB_ae_iface": faker.pystr(),
-        "iptrunk_sideB_ae_geant_a_sid": faker.pystr(),
-        "iptrunk_sideB_ae_members": [faker.pystr() for _ in range(5)],
-        "iptrunk_sideB_ae_members_descriptions": [faker.sentence() for _ in range(5)],
-    }
-
-    return [create_ip_trunk_step, create_ip_trunk_side_a_step, create_ip_trunk_side_b_step]
-
-
-def _user_accept_and_assert_suspended(process_stat, step_log, extra_data=None):
-    extra_data = extra_data or {}
-    result, step_log = resume_workflow(process_stat, step_log, extra_data)
-    assert_suspended(result)
-
-    return result, step_log
-
-
-@pytest.mark.workflow
-@patch("gso.workflows.iptrunk.create_iptrunk.provisioning_proxy.check_ip_trunk")
-@patch("gso.workflows.iptrunk.create_iptrunk.provisioning_proxy.provision_ip_trunk")
-@patch("gso.workflows.iptrunk.create_iptrunk.infoblox.allocate_v6_network")
-@patch("gso.workflows.iptrunk.create_iptrunk.infoblox.allocate_v4_network")
-def test_successful_iptrunk_creation_with_standard_lso_result(
-    mock_allocate_v4_network,
-    mock_allocate_v6_network,
-    mock_provision_ip_trunk,
-    mock_check_ip_trunk,
-    responses,
-    input_form_wizard_data,
-    faker,
-):
-    mock_allocate_v4_network.return_value = faker.ipv4_network()
-    mock_allocate_v6_network.return_value = faker.ipv6_network()
-    product_id = get_product_id_by_name(ProductType.IP_TRUNK)
-    initial_site_data = [{"product": product_id}, *input_form_wizard_data]
-    result, process_stat, step_log = run_workflow("create_iptrunk", initial_site_data)
-    assert_suspended(result)
-
-    standard_lso_result = {
-        "callback_result": {
-            "status": "ok",
-            "job_id": "random_job_id",
-            "output": "parsed_output",
-            "return_code": 0,
-        },
-        "confirm": "ACCEPTED",
-    }
-    for _ in range(5):
-        result, step_log = _user_accept_and_assert_suspended(process_stat, step_log, standard_lso_result)
-        result, step_log = _user_accept_and_assert_suspended(process_stat, step_log)
-
-    result, step_log = _user_accept_and_assert_suspended(process_stat, step_log, standard_lso_result)
-    result, step_log = resume_workflow(process_stat, step_log, {})
-    assert_complete(result)
-
-    state = extract_state(result)
-    subscription_id = state["subscription_id"]
-    subscription = Iptrunk.from_subscription(subscription_id)
-
-    assert "active" == subscription.status
-    assert subscription.description == f"IP trunk, geant_s_sid:{input_form_wizard_data[0]['geant_s_sid']}"
-
-    assert mock_provision_ip_trunk.call_count == 4
-    assert mock_check_ip_trunk.call_count == 2
-
-
-@pytest.mark.workflow
-@patch("gso.workflows.iptrunk.create_iptrunk.provisioning_proxy.check_ip_trunk")
-@patch("gso.workflows.iptrunk.create_iptrunk.provisioning_proxy.provision_ip_trunk")
-@patch("gso.workflows.iptrunk.create_iptrunk.infoblox.allocate_v6_network")
-@patch("gso.workflows.iptrunk.create_iptrunk.infoblox.allocate_v4_network")
-def test_iptrunk_creation_fails_when_lso_return_code_is_one(
-    mock_allocate_v4_network,
-    mock_allocate_v6_network,
-    mock_provision_ip_trunk,
-    mock_check_ip_trunk,
-    responses,
-    input_form_wizard_data,
-    faker,
-):
-    mock_allocate_v4_network.return_value = faker.ipv4_network()
-    mock_allocate_v6_network.return_value = faker.ipv6_network()
-    product_id = get_product_id_by_name(ProductType.IP_TRUNK)
-
-    initial_site_data = [{"product": product_id}, *input_form_wizard_data]
-    result, process_stat, step_log = run_workflow("create_iptrunk", initial_site_data)
-    assert_suspended(result)
-
-    standard_lso_result = {
-        "callback_result": {
-            "status": "ok",
-            "job_id": "random_job_id",
-            "output": "parsed_output",
-            "return_code": 1,
-        },
-        "confirm": "ACCEPTED",
-    }
-
-    attempts = 3
-    for _ in range(0, attempts - 1):
-        result, step_log = _user_accept_and_assert_suspended(process_stat, step_log, standard_lso_result)
-        result, step_log = _user_accept_and_assert_suspended(process_stat, step_log)
-
-    result, step_log = _user_accept_and_assert_suspended(process_stat, step_log, standard_lso_result)
-    result, step_log = resume_workflow(process_stat, step_log, {})
-    assert_aborted(result)
-
-    assert mock_provision_ip_trunk.call_count == attempts
-    assert mock_check_ip_trunk.call_count == 0
-- 
GitLab