diff --git a/gso/main.py b/gso/main.py index 112bd535c9df2054cae59225d3c3d16f9c38e242..398afb2139ecc1d87261cc24b577bee28baed4dd 100644 --- a/gso/main.py +++ b/gso/main.py @@ -11,9 +11,9 @@ from gso.api import router as api_router def init_gso_app(settings: AppSettings) -> OrchestratorCore: - app = OrchestratorCore(base_settings=settings) - app.include_router(api_router, prefix="/api") - return app + gso_app = OrchestratorCore(base_settings=settings) + gso_app.include_router(api_router, prefix="/api") + return gso_app def init_cli_app() -> typer.Typer: diff --git a/test/conftest.py b/test/conftest.py index f46642a13a3babe429e5edcb3667659d65871223..5616e3b6e6bb01e29490eb038714328b4c0e52c4 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -257,8 +257,14 @@ def db_session(database): @pytest.fixture(scope="session", autouse=True) def fastapi_app(database, db_uri): - """Load the GSO FastAPI app for testing purposes.""" + """Load the GSO FastAPI app for testing purposes. + This implementation is as close as possible to the one present in orchestrator-core. + """ + from oauth2_lib.settings import oauth2lib_settings + + oauth2lib_settings.OAUTH2_ACTIVE = False + oauth2lib_settings.ENVIRONMENT_IGNORE_MUTATION_DISABLED = ["local", "TESTING"] app_settings.DATABASE_URI = db_uri return init_gso_app(settings=app_settings) diff --git a/test/workflows/iptrunk/test_create_iptrunk.py b/test/workflows/iptrunk/test_create_iptrunk.py index d5c23c2a631a15e3c329590e695ae56703b3bc2f..a152d1c6f6fb3b924ab9ad6226cc0a075c90e1a2 100644 --- a/test/workflows/iptrunk/test_create_iptrunk.py +++ b/test/workflows/iptrunk/test_create_iptrunk.py @@ -1,7 +1,9 @@ +import uuid from os import PathLike from unittest.mock import patch import pytest +from orchestrator.services.processes import continue_awaiting_process from gso.products import Iptrunk, ProductType from gso.products.product_blocks.iptrunk import IptrunkType, PhyPortCapacity @@ -101,6 +103,7 @@ def test_successful_iptrunk_creation_with_standard_lso_result( faker, data_config_filename: PathLike, netbox_client_mock, + test_client, ): mock_allocate_v4_network.return_value = faker.ipv4_network() mock_allocate_v6_network.return_value = faker.ipv6_network() @@ -110,20 +113,17 @@ def test_successful_iptrunk_creation_with_standard_lso_result( assert_awaiting_callback(result) standard_lso_result = { - "pp_run_results": { - "status": "ok", - "job_id": "random_job_id", - "output": "parsed_output", - "return_code": 0, - }, - "confirm": "ACCEPTED", + "status": "success", + "job_id": str(uuid.uuid4()), + "output": "parsed_output", + "return_code": 0 } - for _ in range(5): - result, step_log = user_accept_and_assert_suspended(process_stat, step_log, standard_lso_result) - result, step_log = user_accept_and_assert_suspended(process_stat, step_log, [{}, {}]) - result, step_log = user_accept_and_assert_suspended(process_stat, step_log, standard_lso_result) - result, step_log = resume_workflow(process_stat, step_log, [{}, {}]) + for _ in range(2): + assert_awaiting_callback(result) + current_state = extract_state(result) + continue_awaiting_process(process_stat, token=current_state["callback_token"], input_data=standard_lso_result) + assert_complete(result) state = extract_state(result) @@ -159,7 +159,7 @@ def test_iptrunk_creation_fails_when_lso_return_code_is_one( initial_site_data = [{"product": product_id}, *input_form_wizard_data] result, process_stat, step_log = run_workflow("create_iptrunk", initial_site_data) - assert_awaiting_callback(result) + assert_suspended(result) standard_lso_result = { "pp_run_results": { diff --git a/test/workflows/iptrunks/iptrunks/test_create_iptrunks.py b/test/workflows/iptrunks/iptrunks/test_create_iptrunks.py deleted file mode 100644 index 3ec39fd8409e7029c913134dbd01d385d18e8e35..0000000000000000000000000000000000000000 --- a/test/workflows/iptrunks/iptrunks/test_create_iptrunks.py +++ /dev/null @@ -1,151 +0,0 @@ -from unittest.mock import patch - -import pytest - -from gso.products import Iptrunk, ProductType -from gso.products.product_blocks.iptrunk import IptrunkType, PhyPortCapacity -from gso.services.crm import customer_selector, get_customer_by_name -from gso.services.subscriptions import get_product_id_by_name -from test.workflows import ( - assert_aborted, - assert_complete, - assert_suspended, - extract_state, - resume_workflow, - run_workflow, -) - - -@pytest.fixture -def input_form_wizard_data(router_subscription_factory, faker): - router_side_a = router_subscription_factory() - router_side_b = router_subscription_factory() - - create_ip_trunk_step = { - "tt_number": faker.pystr(), - "customer": getattr(customer_selector(), get_customer_by_name("GÉANT")["id"]), - "geant_s_sid": faker.pystr(), - "iptrunk_type": IptrunkType.DARK_FIBER, - "iptrunk_description": faker.sentence(), - "iptrunk_speed": PhyPortCapacity.HUNDRED_GIGABIT_PER_SECOND, - "iptrunk_minimum_links": 5, - } - create_ip_trunk_side_a_step = { - "iptrunk_sideA_node_id": router_side_a, - "iptrunk_sideA_ae_iface": faker.pystr(), - "iptrunk_sideA_ae_geant_a_sid": faker.pystr(), - "iptrunk_sideA_ae_members": [faker.pystr() for _ in range(5)], - "iptrunk_sideA_ae_members_descriptions": [faker.sentence() for _ in range(5)], - } - - create_ip_trunk_side_b_step = { - "iptrunk_sideB_node_id": router_side_b, - "iptrunk_sideB_ae_iface": faker.pystr(), - "iptrunk_sideB_ae_geant_a_sid": faker.pystr(), - "iptrunk_sideB_ae_members": [faker.pystr() for _ in range(5)], - "iptrunk_sideB_ae_members_descriptions": [faker.sentence() for _ in range(5)], - } - - return [create_ip_trunk_step, create_ip_trunk_side_a_step, create_ip_trunk_side_b_step] - - -def _user_accept_and_assert_suspended(process_stat, step_log, extra_data=None): - extra_data = extra_data or {} - result, step_log = resume_workflow(process_stat, step_log, extra_data) - assert_suspended(result) - - return result, step_log - - -@pytest.mark.workflow -@patch("gso.workflows.iptrunk.create_iptrunk.provisioning_proxy.check_ip_trunk") -@patch("gso.workflows.iptrunk.create_iptrunk.provisioning_proxy.provision_ip_trunk") -@patch("gso.workflows.iptrunk.create_iptrunk.infoblox.allocate_v6_network") -@patch("gso.workflows.iptrunk.create_iptrunk.infoblox.allocate_v4_network") -def test_successful_iptrunk_creation_with_standard_lso_result( - mock_allocate_v4_network, - mock_allocate_v6_network, - mock_provision_ip_trunk, - mock_check_ip_trunk, - responses, - input_form_wizard_data, - faker, -): - mock_allocate_v4_network.return_value = faker.ipv4_network() - mock_allocate_v6_network.return_value = faker.ipv6_network() - product_id = get_product_id_by_name(ProductType.IP_TRUNK) - initial_site_data = [{"product": product_id}, *input_form_wizard_data] - result, process_stat, step_log = run_workflow("create_iptrunk", initial_site_data) - assert_suspended(result) - - standard_lso_result = { - "callback_result": { - "status": "ok", - "job_id": "random_job_id", - "output": "parsed_output", - "return_code": 0, - }, - "confirm": "ACCEPTED", - } - for _ in range(5): - result, step_log = _user_accept_and_assert_suspended(process_stat, step_log, standard_lso_result) - result, step_log = _user_accept_and_assert_suspended(process_stat, step_log) - - result, step_log = _user_accept_and_assert_suspended(process_stat, step_log, standard_lso_result) - result, step_log = resume_workflow(process_stat, step_log, {}) - assert_complete(result) - - state = extract_state(result) - subscription_id = state["subscription_id"] - subscription = Iptrunk.from_subscription(subscription_id) - - assert "active" == subscription.status - assert subscription.description == f"IP trunk, geant_s_sid:{input_form_wizard_data[0]['geant_s_sid']}" - - assert mock_provision_ip_trunk.call_count == 4 - assert mock_check_ip_trunk.call_count == 2 - - -@pytest.mark.workflow -@patch("gso.workflows.iptrunk.create_iptrunk.provisioning_proxy.check_ip_trunk") -@patch("gso.workflows.iptrunk.create_iptrunk.provisioning_proxy.provision_ip_trunk") -@patch("gso.workflows.iptrunk.create_iptrunk.infoblox.allocate_v6_network") -@patch("gso.workflows.iptrunk.create_iptrunk.infoblox.allocate_v4_network") -def test_iptrunk_creation_fails_when_lso_return_code_is_one( - mock_allocate_v4_network, - mock_allocate_v6_network, - mock_provision_ip_trunk, - mock_check_ip_trunk, - responses, - input_form_wizard_data, - faker, -): - mock_allocate_v4_network.return_value = faker.ipv4_network() - mock_allocate_v6_network.return_value = faker.ipv6_network() - product_id = get_product_id_by_name(ProductType.IP_TRUNK) - - initial_site_data = [{"product": product_id}, *input_form_wizard_data] - result, process_stat, step_log = run_workflow("create_iptrunk", initial_site_data) - assert_suspended(result) - - standard_lso_result = { - "callback_result": { - "status": "ok", - "job_id": "random_job_id", - "output": "parsed_output", - "return_code": 1, - }, - "confirm": "ACCEPTED", - } - - attempts = 3 - for _ in range(0, attempts - 1): - result, step_log = _user_accept_and_assert_suspended(process_stat, step_log, standard_lso_result) - result, step_log = _user_accept_and_assert_suspended(process_stat, step_log) - - result, step_log = _user_accept_and_assert_suspended(process_stat, step_log, standard_lso_result) - result, step_log = resume_workflow(process_stat, step_log, {}) - assert_aborted(result) - - assert mock_provision_ip_trunk.call_count == attempts - assert mock_check_ip_trunk.call_count == 0