diff --git a/gso/services/provisioning_proxy.py b/gso/services/provisioning_proxy.py index c29d942915a537e1a9d64ad0ad449449dd2ae9e0..601e12cc98557369387d36cd8e79906e222fab90 100644 --- a/gso/services/provisioning_proxy.py +++ b/gso/services/provisioning_proxy.py @@ -6,15 +6,11 @@ import json import logging import requests -from orchestrator import conditional, inputstep, step -from orchestrator.config.assignee import Assignee -from orchestrator.domain import SubscriptionModel -from orchestrator.forms import FormPage, ReadOnlyField -from orchestrator.forms.validators import Accept, Label, LongText -from orchestrator.types import FormGenerator, State, UUIDstr, strEnum +from orchestrator import step +from orchestrator.types import State, UUIDstr, strEnum +from orchestrator.utils.errors import ProcessFailureError from orchestrator.utils.json import json_dumps -from orchestrator.workflow import Step, StepList, abort, init -from pydantic import validator +from orchestrator.workflow import Step, callback_step, begin, StepList from gso import settings from gso.products.product_types.iptrunk import Iptrunk, IptrunkProvisioning @@ -36,7 +32,7 @@ class CUDOperation(strEnum): DELETE = "DELETE" -def _send_request(endpoint: str, parameters: dict, process_id: UUIDstr, operation: CUDOperation) -> None: +def _send_request(endpoint: str, parameters: dict, callback_route: str, operation: CUDOperation) -> None: """Send a request to :term:`LSO`. The callback address is derived using the process ID provided. :param endpoint: The :term:`LSO`-specific endpoint to call, depending on the type of service object that's acted @@ -45,9 +41,8 @@ def _send_request(endpoint: str, parameters: dict, process_id: UUIDstr, operatio :param parameters: JSON body for the request, which will almost always at least consist of a subscription object, and a boolean value to indicate a dry run. :type parameters: dict - :param process_id: The process ID that this request is a part of, used to call back to when the execution of the - playbook is completed. - :type process_id: UUIDstr + :param callback_route: The callback route that should be used to resume the workflow. + :type callback_route: str :param operation: The specific operation that's performed with the request. :type operation: :class:`CUDOperation` :rtype: None @@ -57,8 +52,7 @@ def _send_request(endpoint: str, parameters: dict, process_id: UUIDstr, operatio assert pp_params # Build up a callback URL of the Provisioning Proxy to return its results to. - callback_url = f"{settings.load_oss_params().GENERAL.public_hostname}" f"/api/processes/{process_id}/resume" - logger.debug(f"[provisioning proxy] provisioning for process {process_id}") + callback_url = f"{oss.GENERAL.public_hostname}/api{callback_route}" logger.debug(f"[provisioning proxy] Callback URL set to {callback_url}") parameters.update({"callback": callback_url}) @@ -80,7 +74,7 @@ def _send_request(endpoint: str, parameters: dict, process_id: UUIDstr, operatio def provision_router( - subscription: RouterProvisioning, process_id: UUIDstr, tt_number: str, dry_run: bool = True + subscription: RouterProvisioning, process_id: UUIDstr, callback_route: str, tt_number: str, dry_run: bool = True ) -> None: """Provision a new router using :term:`LSO`. @@ -88,6 +82,10 @@ def provision_router( :type subscription: :class:`RouterProvisioning` :param process_id: The related process ID, used for callback. :type process_id: UUIDstr + :param callback_route: The API endpoint that should be used for the callback URL. + :type callback_route: str + :param tt_number: Trouble ticket number related to the operation. + :type tt_number: str :param dry_run: A boolean indicating whether this should be a dry run or not, defaults to `True`. :type dry_run: bool :rtype: None @@ -99,12 +97,13 @@ def provision_router( "subscription": json.loads(json_dumps(subscription)), } - _send_request("router", parameters, process_id, CUDOperation.POST) + _send_request("router", parameters, callback_route, CUDOperation.POST) def provision_ip_trunk( subscription: IptrunkProvisioning, process_id: UUIDstr, + callback_route: str, tt_number: str, config_object: str, dry_run: bool = True, @@ -116,6 +115,10 @@ def provision_ip_trunk( :type subscription: :class:`IptrunkProvisioning` :param process_id: The related process ID, used for callback. :type process_id: UUIDstr + :param callback_route: The API endpoint that should be used for the callback URL. + :type callback_route: str + :param tt_number: Trouble ticket number related to the operation. + :type tt_number: str :param config_object: The type of object that's deployed. :type config_object: str :param dry_run: A boolean indicating whether this should be a dry run or not, defaults to `True`. @@ -134,16 +137,22 @@ def provision_ip_trunk( "removed_ae_members": removed_ae_members, } - _send_request("ip_trunk", parameters, process_id, CUDOperation.POST) + _send_request("ip_trunk", parameters, callback_route, CUDOperation.POST) -def check_ip_trunk(subscription: IptrunkProvisioning, process_id: UUIDstr, tt_number: str, check_name: str) -> None: +def check_ip_trunk( + subscription: IptrunkProvisioning, process_id: UUIDstr, callback_route: str, tt_number: str, check_name: str +) -> None: """Provision an IP trunk service using :term:`LSO`. :param subscription: The subscription object that's to be provisioned. :type subscription: :class:`IptrunkProvisioning` :param process_id: The related process ID, used for callback. :type process_id: UUIDstr + :param callback_route: The API endpoint that should be used for the callback URL. + :type callback_route: str + :param tt_number: Trouble ticket number related to the operation. + :type tt_number: str :param check_name: The name of the check to execute :rtype: None """ @@ -154,16 +163,22 @@ def check_ip_trunk(subscription: IptrunkProvisioning, process_id: UUIDstr, tt_nu "check_name": check_name, } - _send_request("ip_trunk/perform_check", parameters, process_id, CUDOperation.POST) + _send_request("ip_trunk/perform_check", parameters, callback_route, CUDOperation.POST) -def deprovision_ip_trunk(subscription: Iptrunk, process_id: UUIDstr, tt_number: str, dry_run: bool = True) -> None: +def deprovision_ip_trunk( + subscription: Iptrunk, process_id: UUIDstr, callback_route: str, tt_number: str, dry_run: bool = True +) -> None: """Deprovision an IP trunk service using :term:`LSO`. :param subscription: The subscription object that's to be provisioned. :type subscription: :class:`IptrunkProvisioning` :param process_id: The related process ID, used for callback. :type process_id: UUIDstr + :param callback_route: The API endpoint that should be used for the callback URL. + :type callback_route: str + :param tt_number: Trouble ticket number related to the operation. + :type tt_number: str :param dry_run: A boolean indicating whether this should be a dry run or not, defaults to `True`. :type dry_run: bool :rtype: None @@ -176,7 +191,7 @@ def deprovision_ip_trunk(subscription: Iptrunk, process_id: UUIDstr, tt_number: "verb": "terminate", } - _send_request("ip_trunk", parameters, process_id, CUDOperation.DELETE) + _send_request("ip_trunk", parameters, callback_route, CUDOperation.DELETE) def migrate_ip_trunk( @@ -186,6 +201,7 @@ def migrate_ip_trunk( new_lag_member_interfaces: list[dict], replace_index: int, process_id: UUIDstr, + callback_route: str, tt_number: str, verb: str, config_object: str, @@ -206,8 +222,14 @@ def migrate_ip_trunk( :type replace_index: int :param process_id: The related process ID, used for callback. :type process_id: UUIDstr + :param callback_route: The :term:`API` endpoint that should be used for the callback URL. + :type callback_route: str + :param tt_number: Trouble ticket number related to the operation. + :type tt_number: str :param verb: The verb that is passed to the executed playbook :type verb: str + :param config_object: The object that is configured. + :type config_object: str :param dry_run: A boolean indicating whether this should be a dry run or not, defaults to `True`. :type dry_run: bool :rtype: None @@ -227,144 +249,26 @@ def migrate_ip_trunk( "dry_run": dry_run, } - _send_request("ip_trunk/migrate", parameters, process_id, CUDOperation.POST) - - -@inputstep("Await provisioning proxy results", assignee=Assignee.SYSTEM) -def _await_pp_results(subscription: SubscriptionModel, label_text: str = DEFAULT_LABEL) -> FormGenerator: - """Input step that forces the workflow to go into a `SUSPENDED` state. - - When the workflow is `SUSPENDED`, it will wait for user input to be presented before it continues running the next - steps of the workflow. User input is mimicked by the provisioning proxy, as it makes a `PUT` request to the callback - URL that it was given in `_send_request()`. This input is fabricated in such a way that it will advance the workflow - to the next step. This next step should always be `confirm_pp_results()`, where the operator is presented with the - output of the provisioning proxy. - - :param subscription: The current subscription that the provisioning proxy is acting on. - :type subscription: :class:`orchestrator.domain.SubscriptionModel` - :param label_text: A label that's displayed to the operator when the provisioning proxy has not returned its - results yet. Defaults to `DEFAULT_LABEL`. - :type label_text: str - :return: The input that's given by the provisioning proxy, that should contain run results, and a `confirm` - boolean set to `True`. - :rtype: :class:`orchestrator.types.FormGenerator` - """ - - class ProvisioningResultPage(FormPage): - class Config: - title = f"Deploying {subscription.product.name}..." - - warning_label: Label = label_text # type: ignore[assignment] - pp_run_results: dict = None # type: ignore[assignment] - confirm: Accept = Accept("INCOMPLETE") - - @validator("pp_run_results", allow_reuse=True, pre=True, always=True) - def run_results_must_be_given(cls, run_results: dict) -> dict: - if run_results is None: - raise ValueError("Run results may not be empty. Wait for the provisioning proxy to finish.") - return run_results - - result_page = yield ProvisioningResultPage - - return result_page.dict() - - -@step("Reset Provisioning Proxy state") -def _reset_pp_success_state() -> State: - """Reset the boolean that indicates a successful provisioning proxy result in the state of a running workflow. - - :return: A new state of the workflow, where the key `pp_did_succeed` has been (re)set to false. - :rtype: :class:`orchestrator.types.State` - """ - return {"pp_did_succeed": False} - - -@inputstep("Confirm provisioning proxy results", assignee=Assignee("SYSTEM")) -def _confirm_pp_results(state: State) -> FormGenerator: - """Input step where a human has to confirm the result from calling provisioning proxy. - - The results of a call to the provisioning proxy are displayed, together with the fact whether this execution was - a success or not. If unsuccessful, an extra label is displayed that warns the user about the fact that this - execution will be retried. This will happen up to two times, after which the workflow will fail. - - :param state: The current state of the workflow. - :type state: :class:`orchestrator.types.State` - :return: Confirmation from the user, when presented with the run results. - :rtype: :class:`orchestrator.types.FormGenerator` - """ - if "pp_run_results" not in state: - # FIXME: dirty hack that makes the skipping """work""" - return {"pp_did_succeed": True} - - class ContinueForm(FormPage): - class Config: - title = "Continue to see the result?" - - yield ContinueForm - - successful_run = state["pp_run_results"]["return_code"] == 0 + _send_request("ip_trunk/migrate", parameters, callback_route, CUDOperation.POST) - class ConfirmRunPage(FormPage): - class Config: - title = ( - f"Execution for {state['subscription']['product']['name']} completed, please confirm the results below." - ) - if not successful_run: - pp_retry_label1: Label = ( - "Provisioning Proxy - playbook execution failed: " - "inspect the output before proceeding" # type: ignore[assignment] - ) +@step("Evaluate user input for provisioning proxy") +def _evaluate_pp_results(callback_result: dict) -> State: + if callback_result["return_code"] != 0: + raise ProcessFailureError(message="Provisioning proxy failure", details=callback_result) - run_status: str = ReadOnlyField(state["pp_run_results"]["status"]) - run_results: LongText = ReadOnlyField(json.dumps(state["pp_run_results"]["output"], indent=4)) - if not successful_run: - pp_retry_label: Label = ( - "Click submit to retry. Otherwise, abort the workflow from the process tab." # type: ignore[assignment] - ) + return {"callback_result": callback_result} - yield ConfirmRunPage - - return {"pp_did_succeed": successful_run} - - -def pp_interaction(provisioning_step: Step, attempts: int, abort_on_failure: bool = True) -> StepList: - """Interaction with the provisioning proxy. - - This method returns the three steps that make up an interaction with the provisioning proxy: - - The provisioning step itself, given by the user as input. - - The input step that suspends the workflow, and will wait for results from the provisioning proxy. - - An input step that presents the user with the results, where they must be confirmed. - - All these steps are wrapped in a :class:`orchestrator.workflow.conditional`. This ensures that when provisioning was - already successful, these steps are skipped. This mechanism is quite a dirty hack, and it's planned to be addressed - in a later release. - - The parameter `attempts` indicates how many times a provisioning may be attempted. When this amount is exceeded, and - it's still not successful, the workflow will be aborted if so indicated with the `abort_on_failure` boolean. - - :param provisioning_step: The step that executes an interaction with the provisioning proxy. - :type provisioning_step: :class:`orchestrator.workflow.Step` - :param attempts: The maximum amount of times that a provisioning can be retried. - :type attempts: int - :param abort_on_failure: A boolean value that indicates whether a workflow should abort if the provisioning has - failed the maximum amount of tries. Defaults to `True`. - :return: A list of three steps that form one interaction with the provisioning proxy. - :rtype: :class:`orchestrator.workflow.StepList` - """ - should_retry_pp_steps = conditional(lambda state: not state.get("pp_did_succeed")) - pp_steps = init >> _reset_pp_success_state +@step("Clean up the state after external provisioning") +def _clean_up_state(state: State) -> State: + for old_key in ["callback_route", "callback_result", "__callback_token"]: + state.pop(old_key, None) - for _ in range(attempts): - pp_steps >>= ( - should_retry_pp_steps(provisioning_step) - >> should_retry_pp_steps(_await_pp_results) - >> should_retry_pp_steps(_confirm_pp_results) - ) + return state - if abort_on_failure: - # Abort a workflow if provisioning has failed too many times - pp_steps >>= should_retry_pp_steps(abort) - return pp_steps +def pp_interaction(provisioning_step: Step, interaction_name: str) -> StepList: + return begin >> callback_step( + name=interaction_name, action_step=provisioning_step, validate_step=_evaluate_pp_results + ) >> _clean_up_state diff --git a/gso/translations/en-GB.json b/gso/translations/en-GB.json index 39f43b0c556286d7bf292439f93b17cef3775c17..6822716b55c95d40fc4b323f3f76a364d30f737e 100644 --- a/gso/translations/en-GB.json +++ b/gso/translations/en-GB.json @@ -5,8 +5,8 @@ "confirm": "Confirm", "confirm_info": "Please verify this form looks correct.", + "callback_successful": "Results look good (enough), let the workflow continue:", - "pp_run_results": "Provisioning proxy results are not ready yet.", "pp_retry_label": "Playbook execution failure", "site_bgp_community_id": "Site BGP community ID", diff --git a/gso/workflows/router/create_router.py b/gso/workflows/router/create_router.py index 0902f20d8a6290f8e981a20056d6d367ef0a9f13..7e18161d7a95a0c04df65b0faaa94b8cba9fe8c2 100644 --- a/gso/workflows/router/create_router.py +++ b/gso/workflows/router/create_router.py @@ -1,3 +1,4 @@ +import secrets from ipaddress import IPv4Network, IPv6Network from typing import Any @@ -6,7 +7,7 @@ from orchestrator.forms import FormPage from orchestrator.forms.validators import Choice from orchestrator.targets import Target from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr -from orchestrator.workflow import StepList, conditional, done, init, step, workflow +from orchestrator.workflow import StepList, conditional, done, init, step, workflow, Step, callback_step from orchestrator.workflows.steps import resync, set_status, store_process_subscription from orchestrator.workflows.utils import wrap_create_initial_input_form from pydantic import validator @@ -128,29 +129,21 @@ def ipam_allocate_ias_networks(subscription: RouterProvisioning) -> State: @step("Provision router [DRY RUN]") -def provision_router_dry(subscription: RouterProvisioning, process_id: UUIDstr, tt_number: str) -> State: - provisioning_proxy.provision_router(subscription, process_id, tt_number) +def provision_router_dry( + subscription: RouterProvisioning, process_id: UUIDstr, callback_route: str, tt_number: str +) -> State: + provisioning_proxy.provision_router(subscription, process_id, callback_route, tt_number) - return { - "subscription": subscription, - "label_text": ( - "Dry run for the deployment of base config on a new router. Deployment is done by the" - " provisioning proxy, please wait for the results to come back before continuing." - ), - } + return {"subscription": subscription} @step("Provision router [FOR REAL]") -def provision_router_real(subscription: RouterProvisioning, process_id: UUIDstr, tt_number: str) -> State: - provisioning_proxy.provision_router(subscription, process_id, tt_number, False) +def provision_router_real( + subscription: RouterProvisioning, process_id: UUIDstr, callback_route: str, tt_number: str +) -> State: + provisioning_proxy.provision_router(subscription, process_id, callback_route, tt_number, False) - return { - "subscription": subscription, - "label_text": ( - "Deployment of base config for a new router. Deployment is being taken care of by the" - " provisioning proxy, please wait for the results to come back before continuing." - ), - } + return {"subscription": subscription} @step("Create NetBox Device") @@ -217,8 +210,8 @@ def create_router() -> StepList: >> initialize_subscription >> ipam_allocate_loopback >> should_allocate_ias(ipam_allocate_ias_networks) - >> pp_interaction(provision_router_dry, 3) - >> pp_interaction(provision_router_real, 3) + >> pp_interaction(provision_router_dry, "Provision new router [DRY RUN]") + >> pp_interaction(provision_router_real, "Provision new router [FOR REAL]") >> verify_ipam_loopback >> should_allocate_ias(verify_ipam_ias) >> create_netbox_device diff --git a/test/workflows/iptrunks/iptrunks/test_create_iptrunks.py b/test/workflows/iptrunks/iptrunks/test_create_iptrunks.py new file mode 100644 index 0000000000000000000000000000000000000000..c0d09fb7758acaa602959703d4a9ef061b904a4c --- /dev/null +++ b/test/workflows/iptrunks/iptrunks/test_create_iptrunks.py @@ -0,0 +1,154 @@ +from unittest.mock import patch + +import pytest + +from gso.products import Iptrunk +from gso.products.product_blocks import PhyPortCapacity +from gso.products.product_blocks.iptrunk import IptrunkType +from gso.schemas.enums import ProductType +from gso.services.crm import get_customer_by_name +from gso.services.subscriptions import get_product_id_by_name +from gso.workflows.utils import customer_selector +from test.workflows import ( + assert_aborted, + assert_complete, + assert_suspended, + extract_state, + resume_workflow, + run_workflow, +) + + +@pytest.fixture +def input_form_wizard_data(router_subscription_factory, faker): + router_side_a = router_subscription_factory() + router_side_b = router_subscription_factory() + + create_ip_trunk_step = { + "tt_number": faker.pystr(), + "customer": getattr(customer_selector(), get_customer_by_name("GÉANT")["id"]), + "geant_s_sid": faker.pystr(), + "iptrunk_type": IptrunkType.DARK_FIBER, + "iptrunk_description": faker.sentence(), + "iptrunk_speed": PhyPortCapacity.HUNDRED_GIGABIT_PER_SECOND, + "iptrunk_minimum_links": 5, + } + create_ip_trunk_side_a_step = { + "iptrunk_sideA_node_id": router_side_a, + "iptrunk_sideA_ae_iface": faker.pystr(), + "iptrunk_sideA_ae_geant_a_sid": faker.pystr(), + "iptrunk_sideA_ae_members": [faker.pystr() for _ in range(5)], + "iptrunk_sideA_ae_members_descriptions": [faker.sentence() for _ in range(5)], + } + + create_ip_trunk_side_b_step = { + "iptrunk_sideB_node_id": router_side_b, + "iptrunk_sideB_ae_iface": faker.pystr(), + "iptrunk_sideB_ae_geant_a_sid": faker.pystr(), + "iptrunk_sideB_ae_members": [faker.pystr() for _ in range(5)], + "iptrunk_sideB_ae_members_descriptions": [faker.sentence() for _ in range(5)], + } + + return [create_ip_trunk_step, create_ip_trunk_side_a_step, create_ip_trunk_side_b_step] + + +def _user_accept_and_assert_suspended(process_stat, step_log, extra_data=None): + extra_data = extra_data or {} + result, step_log = resume_workflow(process_stat, step_log, extra_data) + assert_suspended(result) + + return result, step_log + + +@pytest.mark.workflow +@patch("gso.workflows.iptrunk.create_iptrunk.provisioning_proxy.check_ip_trunk") +@patch("gso.workflows.iptrunk.create_iptrunk.provisioning_proxy.provision_ip_trunk") +@patch("gso.workflows.iptrunk.create_iptrunk.infoblox.allocate_v6_network") +@patch("gso.workflows.iptrunk.create_iptrunk.infoblox.allocate_v4_network") +def test_successful_iptrunk_creation_with_standard_lso_result( + mock_allocate_v4_network, + mock_allocate_v6_network, + mock_provision_ip_trunk, + mock_check_ip_trunk, + responses, + input_form_wizard_data, + faker, +): + mock_allocate_v4_network.return_value = faker.ipv4_network() + mock_allocate_v6_network.return_value = faker.ipv6_network() + product_id = get_product_id_by_name(ProductType.IP_TRUNK) + initial_site_data = [{"product": product_id}, *input_form_wizard_data] + result, process_stat, step_log = run_workflow("create_iptrunk", initial_site_data) + assert_suspended(result) + + standard_lso_result = { + "callback_result": { + "status": "ok", + "job_id": "random_job_id", + "output": "parsed_output", + "return_code": 0, + }, + "confirm": "ACCEPTED", + } + for _ in range(5): + result, step_log = _user_accept_and_assert_suspended(process_stat, step_log, standard_lso_result) + result, step_log = _user_accept_and_assert_suspended(process_stat, step_log) + + result, step_log = _user_accept_and_assert_suspended(process_stat, step_log, standard_lso_result) + result, step_log = resume_workflow(process_stat, step_log, {}) + assert_complete(result) + + state = extract_state(result) + subscription_id = state["subscription_id"] + subscription = Iptrunk.from_subscription(subscription_id) + + assert "active" == subscription.status + assert subscription.description == f"IP trunk, geant_s_sid:{input_form_wizard_data[0]['geant_s_sid']}" + + assert mock_provision_ip_trunk.call_count == 4 + assert mock_check_ip_trunk.call_count == 2 + + +@pytest.mark.workflow +@patch("gso.workflows.iptrunk.create_iptrunk.provisioning_proxy.check_ip_trunk") +@patch("gso.workflows.iptrunk.create_iptrunk.provisioning_proxy.provision_ip_trunk") +@patch("gso.workflows.iptrunk.create_iptrunk.infoblox.allocate_v6_network") +@patch("gso.workflows.iptrunk.create_iptrunk.infoblox.allocate_v4_network") +def test_iptrunk_creation_fails_when_lso_return_code_is_one( + mock_allocate_v4_network, + mock_allocate_v6_network, + mock_provision_ip_trunk, + mock_check_ip_trunk, + responses, + input_form_wizard_data, + faker, +): + mock_allocate_v4_network.return_value = faker.ipv4_network() + mock_allocate_v6_network.return_value = faker.ipv6_network() + product_id = get_product_id_by_name(ProductType.IP_TRUNK) + + initial_site_data = [{"product": product_id}, *input_form_wizard_data] + result, process_stat, step_log = run_workflow("create_iptrunk", initial_site_data) + assert_suspended(result) + + standard_lso_result = { + "callback_result": { + "status": "ok", + "job_id": "random_job_id", + "output": "parsed_output", + "return_code": 1, + }, + "confirm": "ACCEPTED", + } + + attempts = 3 + for _ in range(0, attempts - 1): + result, step_log = _user_accept_and_assert_suspended(process_stat, step_log, standard_lso_result) + result, step_log = _user_accept_and_assert_suspended(process_stat, step_log) + + result, step_log = _user_accept_and_assert_suspended(process_stat, step_log, standard_lso_result) + result, step_log = resume_workflow(process_stat, step_log, {}) + assert_aborted(result) + + assert mock_provision_ip_trunk.call_count == attempts + assert mock_check_ip_trunk.call_count == 0