Skip to content
Snippets Groups Projects
Verified Commit 9ed6c0e3 authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

make use of callback step for external provisioning

workflows need updating
parent b9218407
No related branches found
No related tags found
1 merge request!96Make use of new callback step for external provisioning
......@@ -6,15 +6,11 @@ import json
import logging
import requests
from orchestrator import conditional, inputstep, step
from orchestrator.config.assignee import Assignee
from orchestrator.domain import SubscriptionModel
from orchestrator.forms import FormPage, ReadOnlyField
from orchestrator.forms.validators import Accept, Label, LongText
from orchestrator.types import FormGenerator, State, UUIDstr, strEnum
from orchestrator import step
from orchestrator.types import State, UUIDstr, strEnum
from orchestrator.utils.errors import ProcessFailureError
from orchestrator.utils.json import json_dumps
from orchestrator.workflow import Step, StepList, abort, init
from pydantic import validator
from orchestrator.workflow import Step, callback_step, begin, StepList
from gso import settings
from gso.products.product_types.iptrunk import Iptrunk, IptrunkProvisioning
......@@ -36,7 +32,7 @@ class CUDOperation(strEnum):
DELETE = "DELETE"
def _send_request(endpoint: str, parameters: dict, process_id: UUIDstr, operation: CUDOperation) -> None:
def _send_request(endpoint: str, parameters: dict, callback_route: str, operation: CUDOperation) -> None:
"""Send a request to :term:`LSO`. The callback address is derived using the process ID provided.
:param endpoint: The :term:`LSO`-specific endpoint to call, depending on the type of service object that's acted
......@@ -45,9 +41,8 @@ def _send_request(endpoint: str, parameters: dict, process_id: UUIDstr, operatio
:param parameters: JSON body for the request, which will almost always at least consist of a subscription object,
and a boolean value to indicate a dry run.
:type parameters: dict
:param process_id: The process ID that this request is a part of, used to call back to when the execution of the
playbook is completed.
:type process_id: UUIDstr
:param callback_route: The callback route that should be used to resume the workflow.
:type callback_route: str
:param operation: The specific operation that's performed with the request.
:type operation: :class:`CUDOperation`
:rtype: None
......@@ -57,8 +52,7 @@ def _send_request(endpoint: str, parameters: dict, process_id: UUIDstr, operatio
assert pp_params
# Build up a callback URL of the Provisioning Proxy to return its results to.
callback_url = f"{settings.load_oss_params().GENERAL.public_hostname}" f"/api/processes/{process_id}/resume"
logger.debug(f"[provisioning proxy] provisioning for process {process_id}")
callback_url = f"{oss.GENERAL.public_hostname}/api{callback_route}"
logger.debug(f"[provisioning proxy] Callback URL set to {callback_url}")
parameters.update({"callback": callback_url})
......@@ -80,7 +74,7 @@ def _send_request(endpoint: str, parameters: dict, process_id: UUIDstr, operatio
def provision_router(
subscription: RouterProvisioning, process_id: UUIDstr, tt_number: str, dry_run: bool = True
subscription: RouterProvisioning, process_id: UUIDstr, callback_route: str, tt_number: str, dry_run: bool = True
) -> None:
"""Provision a new router using :term:`LSO`.
......@@ -88,6 +82,10 @@ def provision_router(
:type subscription: :class:`RouterProvisioning`
:param process_id: The related process ID, used for callback.
:type process_id: UUIDstr
:param callback_route: The API endpoint that should be used for the callback URL.
:type callback_route: str
:param tt_number: Trouble ticket number related to the operation.
:type tt_number: str
:param dry_run: A boolean indicating whether this should be a dry run or not, defaults to `True`.
:type dry_run: bool
:rtype: None
......@@ -99,12 +97,13 @@ def provision_router(
"subscription": json.loads(json_dumps(subscription)),
}
_send_request("router", parameters, process_id, CUDOperation.POST)
_send_request("router", parameters, callback_route, CUDOperation.POST)
def provision_ip_trunk(
subscription: IptrunkProvisioning,
process_id: UUIDstr,
callback_route: str,
tt_number: str,
config_object: str,
dry_run: bool = True,
......@@ -116,6 +115,10 @@ def provision_ip_trunk(
:type subscription: :class:`IptrunkProvisioning`
:param process_id: The related process ID, used for callback.
:type process_id: UUIDstr
:param callback_route: The API endpoint that should be used for the callback URL.
:type callback_route: str
:param tt_number: Trouble ticket number related to the operation.
:type tt_number: str
:param config_object: The type of object that's deployed.
:type config_object: str
:param dry_run: A boolean indicating whether this should be a dry run or not, defaults to `True`.
......@@ -134,16 +137,22 @@ def provision_ip_trunk(
"removed_ae_members": removed_ae_members,
}
_send_request("ip_trunk", parameters, process_id, CUDOperation.POST)
_send_request("ip_trunk", parameters, callback_route, CUDOperation.POST)
def check_ip_trunk(subscription: IptrunkProvisioning, process_id: UUIDstr, tt_number: str, check_name: str) -> None:
def check_ip_trunk(
subscription: IptrunkProvisioning, process_id: UUIDstr, callback_route: str, tt_number: str, check_name: str
) -> None:
"""Provision an IP trunk service using :term:`LSO`.
:param subscription: The subscription object that's to be provisioned.
:type subscription: :class:`IptrunkProvisioning`
:param process_id: The related process ID, used for callback.
:type process_id: UUIDstr
:param callback_route: The API endpoint that should be used for the callback URL.
:type callback_route: str
:param tt_number: Trouble ticket number related to the operation.
:type tt_number: str
:param check_name: The name of the check to execute
:rtype: None
"""
......@@ -154,16 +163,22 @@ def check_ip_trunk(subscription: IptrunkProvisioning, process_id: UUIDstr, tt_nu
"check_name": check_name,
}
_send_request("ip_trunk/perform_check", parameters, process_id, CUDOperation.POST)
_send_request("ip_trunk/perform_check", parameters, callback_route, CUDOperation.POST)
def deprovision_ip_trunk(subscription: Iptrunk, process_id: UUIDstr, tt_number: str, dry_run: bool = True) -> None:
def deprovision_ip_trunk(
subscription: Iptrunk, process_id: UUIDstr, callback_route: str, tt_number: str, dry_run: bool = True
) -> None:
"""Deprovision an IP trunk service using :term:`LSO`.
:param subscription: The subscription object that's to be provisioned.
:type subscription: :class:`IptrunkProvisioning`
:param process_id: The related process ID, used for callback.
:type process_id: UUIDstr
:param callback_route: The API endpoint that should be used for the callback URL.
:type callback_route: str
:param tt_number: Trouble ticket number related to the operation.
:type tt_number: str
:param dry_run: A boolean indicating whether this should be a dry run or not, defaults to `True`.
:type dry_run: bool
:rtype: None
......@@ -176,7 +191,7 @@ def deprovision_ip_trunk(subscription: Iptrunk, process_id: UUIDstr, tt_number:
"verb": "terminate",
}
_send_request("ip_trunk", parameters, process_id, CUDOperation.DELETE)
_send_request("ip_trunk", parameters, callback_route, CUDOperation.DELETE)
def migrate_ip_trunk(
......@@ -186,6 +201,7 @@ def migrate_ip_trunk(
new_lag_member_interfaces: list[dict],
replace_index: int,
process_id: UUIDstr,
callback_route: str,
tt_number: str,
verb: str,
config_object: str,
......@@ -206,8 +222,14 @@ def migrate_ip_trunk(
:type replace_index: int
:param process_id: The related process ID, used for callback.
:type process_id: UUIDstr
:param callback_route: The :term:`API` endpoint that should be used for the callback URL.
:type callback_route: str
:param tt_number: Trouble ticket number related to the operation.
:type tt_number: str
:param verb: The verb that is passed to the executed playbook
:type verb: str
:param config_object: The object that is configured.
:type config_object: str
:param dry_run: A boolean indicating whether this should be a dry run or not, defaults to `True`.
:type dry_run: bool
:rtype: None
......@@ -227,144 +249,26 @@ def migrate_ip_trunk(
"dry_run": dry_run,
}
_send_request("ip_trunk/migrate", parameters, process_id, CUDOperation.POST)
@inputstep("Await provisioning proxy results", assignee=Assignee.SYSTEM)
def _await_pp_results(subscription: SubscriptionModel, label_text: str = DEFAULT_LABEL) -> FormGenerator:
"""Input step that forces the workflow to go into a `SUSPENDED` state.
When the workflow is `SUSPENDED`, it will wait for user input to be presented before it continues running the next
steps of the workflow. User input is mimicked by the provisioning proxy, as it makes a `PUT` request to the callback
URL that it was given in `_send_request()`. This input is fabricated in such a way that it will advance the workflow
to the next step. This next step should always be `confirm_pp_results()`, where the operator is presented with the
output of the provisioning proxy.
:param subscription: The current subscription that the provisioning proxy is acting on.
:type subscription: :class:`orchestrator.domain.SubscriptionModel`
:param label_text: A label that's displayed to the operator when the provisioning proxy has not returned its
results yet. Defaults to `DEFAULT_LABEL`.
:type label_text: str
:return: The input that's given by the provisioning proxy, that should contain run results, and a `confirm`
boolean set to `True`.
:rtype: :class:`orchestrator.types.FormGenerator`
"""
class ProvisioningResultPage(FormPage):
class Config:
title = f"Deploying {subscription.product.name}..."
warning_label: Label = label_text # type: ignore[assignment]
pp_run_results: dict = None # type: ignore[assignment]
confirm: Accept = Accept("INCOMPLETE")
@validator("pp_run_results", allow_reuse=True, pre=True, always=True)
def run_results_must_be_given(cls, run_results: dict) -> dict:
if run_results is None:
raise ValueError("Run results may not be empty. Wait for the provisioning proxy to finish.")
return run_results
result_page = yield ProvisioningResultPage
return result_page.dict()
@step("Reset Provisioning Proxy state")
def _reset_pp_success_state() -> State:
"""Reset the boolean that indicates a successful provisioning proxy result in the state of a running workflow.
:return: A new state of the workflow, where the key `pp_did_succeed` has been (re)set to false.
:rtype: :class:`orchestrator.types.State`
"""
return {"pp_did_succeed": False}
@inputstep("Confirm provisioning proxy results", assignee=Assignee("SYSTEM"))
def _confirm_pp_results(state: State) -> FormGenerator:
"""Input step where a human has to confirm the result from calling provisioning proxy.
The results of a call to the provisioning proxy are displayed, together with the fact whether this execution was
a success or not. If unsuccessful, an extra label is displayed that warns the user about the fact that this
execution will be retried. This will happen up to two times, after which the workflow will fail.
:param state: The current state of the workflow.
:type state: :class:`orchestrator.types.State`
:return: Confirmation from the user, when presented with the run results.
:rtype: :class:`orchestrator.types.FormGenerator`
"""
if "pp_run_results" not in state:
# FIXME: dirty hack that makes the skipping """work"""
return {"pp_did_succeed": True}
class ContinueForm(FormPage):
class Config:
title = "Continue to see the result?"
yield ContinueForm
successful_run = state["pp_run_results"]["return_code"] == 0
_send_request("ip_trunk/migrate", parameters, callback_route, CUDOperation.POST)
class ConfirmRunPage(FormPage):
class Config:
title = (
f"Execution for {state['subscription']['product']['name']} completed, please confirm the results below."
)
if not successful_run:
pp_retry_label1: Label = (
"Provisioning Proxy - playbook execution failed: "
"inspect the output before proceeding" # type: ignore[assignment]
)
@step("Evaluate user input for provisioning proxy")
def _evaluate_pp_results(callback_result: dict) -> State:
if callback_result["return_code"] != 0:
raise ProcessFailureError(message="Provisioning proxy failure", details=callback_result)
run_status: str = ReadOnlyField(state["pp_run_results"]["status"])
run_results: LongText = ReadOnlyField(json.dumps(state["pp_run_results"]["output"], indent=4))
if not successful_run:
pp_retry_label: Label = (
"Click submit to retry. Otherwise, abort the workflow from the process tab." # type: ignore[assignment]
)
return {"callback_result": callback_result}
yield ConfirmRunPage
return {"pp_did_succeed": successful_run}
def pp_interaction(provisioning_step: Step, attempts: int, abort_on_failure: bool = True) -> StepList:
"""Interaction with the provisioning proxy.
This method returns the three steps that make up an interaction with the provisioning proxy:
- The provisioning step itself, given by the user as input.
- The input step that suspends the workflow, and will wait for results from the provisioning proxy.
- An input step that presents the user with the results, where they must be confirmed.
All these steps are wrapped in a :class:`orchestrator.workflow.conditional`. This ensures that when provisioning was
already successful, these steps are skipped. This mechanism is quite a dirty hack, and it's planned to be addressed
in a later release.
The parameter `attempts` indicates how many times a provisioning may be attempted. When this amount is exceeded, and
it's still not successful, the workflow will be aborted if so indicated with the `abort_on_failure` boolean.
:param provisioning_step: The step that executes an interaction with the provisioning proxy.
:type provisioning_step: :class:`orchestrator.workflow.Step`
:param attempts: The maximum amount of times that a provisioning can be retried.
:type attempts: int
:param abort_on_failure: A boolean value that indicates whether a workflow should abort if the provisioning has
failed the maximum amount of tries. Defaults to `True`.
:return: A list of three steps that form one interaction with the provisioning proxy.
:rtype: :class:`orchestrator.workflow.StepList`
"""
should_retry_pp_steps = conditional(lambda state: not state.get("pp_did_succeed"))
pp_steps = init >> _reset_pp_success_state
@step("Clean up the state after external provisioning")
def _clean_up_state(state: State) -> State:
for old_key in ["callback_route", "callback_result", "__callback_token"]:
state.pop(old_key, None)
for _ in range(attempts):
pp_steps >>= (
should_retry_pp_steps(provisioning_step)
>> should_retry_pp_steps(_await_pp_results)
>> should_retry_pp_steps(_confirm_pp_results)
)
return state
if abort_on_failure:
# Abort a workflow if provisioning has failed too many times
pp_steps >>= should_retry_pp_steps(abort)
return pp_steps
def pp_interaction(provisioning_step: Step, interaction_name: str) -> StepList:
return begin >> callback_step(
name=interaction_name, action_step=provisioning_step, validate_step=_evaluate_pp_results
) >> _clean_up_state
......@@ -5,8 +5,8 @@
"confirm": "Confirm",
"confirm_info": "Please verify this form looks correct.",
"callback_successful": "Results look good (enough), let the workflow continue:",
"pp_run_results": "Provisioning proxy results are not ready yet.",
"pp_retry_label": "Playbook execution failure",
"site_bgp_community_id": "Site BGP community ID",
......
import secrets
from ipaddress import IPv4Network, IPv6Network
from typing import Any
......@@ -6,7 +7,7 @@ from orchestrator.forms import FormPage
from orchestrator.forms.validators import Choice
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr
from orchestrator.workflow import StepList, conditional, done, init, step, workflow
from orchestrator.workflow import StepList, conditional, done, init, step, workflow, Step, callback_step
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from orchestrator.workflows.utils import wrap_create_initial_input_form
from pydantic import validator
......@@ -128,29 +129,21 @@ def ipam_allocate_ias_networks(subscription: RouterProvisioning) -> State:
@step("Provision router [DRY RUN]")
def provision_router_dry(subscription: RouterProvisioning, process_id: UUIDstr, tt_number: str) -> State:
provisioning_proxy.provision_router(subscription, process_id, tt_number)
def provision_router_dry(
subscription: RouterProvisioning, process_id: UUIDstr, callback_route: str, tt_number: str
) -> State:
provisioning_proxy.provision_router(subscription, process_id, callback_route, tt_number)
return {
"subscription": subscription,
"label_text": (
"Dry run for the deployment of base config on a new router. Deployment is done by the"
" provisioning proxy, please wait for the results to come back before continuing."
),
}
return {"subscription": subscription}
@step("Provision router [FOR REAL]")
def provision_router_real(subscription: RouterProvisioning, process_id: UUIDstr, tt_number: str) -> State:
provisioning_proxy.provision_router(subscription, process_id, tt_number, False)
def provision_router_real(
subscription: RouterProvisioning, process_id: UUIDstr, callback_route: str, tt_number: str
) -> State:
provisioning_proxy.provision_router(subscription, process_id, callback_route, tt_number, False)
return {
"subscription": subscription,
"label_text": (
"Deployment of base config for a new router. Deployment is being taken care of by the"
" provisioning proxy, please wait for the results to come back before continuing."
),
}
return {"subscription": subscription}
@step("Create NetBox Device")
......@@ -217,8 +210,8 @@ def create_router() -> StepList:
>> initialize_subscription
>> ipam_allocate_loopback
>> should_allocate_ias(ipam_allocate_ias_networks)
>> pp_interaction(provision_router_dry, 3)
>> pp_interaction(provision_router_real, 3)
>> pp_interaction(provision_router_dry, "Provision new router [DRY RUN]")
>> pp_interaction(provision_router_real, "Provision new router [FOR REAL]")
>> verify_ipam_loopback
>> should_allocate_ias(verify_ipam_ias)
>> create_netbox_device
......
from unittest.mock import patch
import pytest
from gso.products import Iptrunk
from gso.products.product_blocks import PhyPortCapacity
from gso.products.product_blocks.iptrunk import IptrunkType
from gso.schemas.enums import ProductType
from gso.services.crm import get_customer_by_name
from gso.services.subscriptions import get_product_id_by_name
from gso.workflows.utils import customer_selector
from test.workflows import (
assert_aborted,
assert_complete,
assert_suspended,
extract_state,
resume_workflow,
run_workflow,
)
@pytest.fixture
def input_form_wizard_data(router_subscription_factory, faker):
router_side_a = router_subscription_factory()
router_side_b = router_subscription_factory()
create_ip_trunk_step = {
"tt_number": faker.pystr(),
"customer": getattr(customer_selector(), get_customer_by_name("GÉANT")["id"]),
"geant_s_sid": faker.pystr(),
"iptrunk_type": IptrunkType.DARK_FIBER,
"iptrunk_description": faker.sentence(),
"iptrunk_speed": PhyPortCapacity.HUNDRED_GIGABIT_PER_SECOND,
"iptrunk_minimum_links": 5,
}
create_ip_trunk_side_a_step = {
"iptrunk_sideA_node_id": router_side_a,
"iptrunk_sideA_ae_iface": faker.pystr(),
"iptrunk_sideA_ae_geant_a_sid": faker.pystr(),
"iptrunk_sideA_ae_members": [faker.pystr() for _ in range(5)],
"iptrunk_sideA_ae_members_descriptions": [faker.sentence() for _ in range(5)],
}
create_ip_trunk_side_b_step = {
"iptrunk_sideB_node_id": router_side_b,
"iptrunk_sideB_ae_iface": faker.pystr(),
"iptrunk_sideB_ae_geant_a_sid": faker.pystr(),
"iptrunk_sideB_ae_members": [faker.pystr() for _ in range(5)],
"iptrunk_sideB_ae_members_descriptions": [faker.sentence() for _ in range(5)],
}
return [create_ip_trunk_step, create_ip_trunk_side_a_step, create_ip_trunk_side_b_step]
def _user_accept_and_assert_suspended(process_stat, step_log, extra_data=None):
extra_data = extra_data or {}
result, step_log = resume_workflow(process_stat, step_log, extra_data)
assert_suspended(result)
return result, step_log
@pytest.mark.workflow
@patch("gso.workflows.iptrunk.create_iptrunk.provisioning_proxy.check_ip_trunk")
@patch("gso.workflows.iptrunk.create_iptrunk.provisioning_proxy.provision_ip_trunk")
@patch("gso.workflows.iptrunk.create_iptrunk.infoblox.allocate_v6_network")
@patch("gso.workflows.iptrunk.create_iptrunk.infoblox.allocate_v4_network")
def test_successful_iptrunk_creation_with_standard_lso_result(
mock_allocate_v4_network,
mock_allocate_v6_network,
mock_provision_ip_trunk,
mock_check_ip_trunk,
responses,
input_form_wizard_data,
faker,
):
mock_allocate_v4_network.return_value = faker.ipv4_network()
mock_allocate_v6_network.return_value = faker.ipv6_network()
product_id = get_product_id_by_name(ProductType.IP_TRUNK)
initial_site_data = [{"product": product_id}, *input_form_wizard_data]
result, process_stat, step_log = run_workflow("create_iptrunk", initial_site_data)
assert_suspended(result)
standard_lso_result = {
"callback_result": {
"status": "ok",
"job_id": "random_job_id",
"output": "parsed_output",
"return_code": 0,
},
"confirm": "ACCEPTED",
}
for _ in range(5):
result, step_log = _user_accept_and_assert_suspended(process_stat, step_log, standard_lso_result)
result, step_log = _user_accept_and_assert_suspended(process_stat, step_log)
result, step_log = _user_accept_and_assert_suspended(process_stat, step_log, standard_lso_result)
result, step_log = resume_workflow(process_stat, step_log, {})
assert_complete(result)
state = extract_state(result)
subscription_id = state["subscription_id"]
subscription = Iptrunk.from_subscription(subscription_id)
assert "active" == subscription.status
assert subscription.description == f"IP trunk, geant_s_sid:{input_form_wizard_data[0]['geant_s_sid']}"
assert mock_provision_ip_trunk.call_count == 4
assert mock_check_ip_trunk.call_count == 2
@pytest.mark.workflow
@patch("gso.workflows.iptrunk.create_iptrunk.provisioning_proxy.check_ip_trunk")
@patch("gso.workflows.iptrunk.create_iptrunk.provisioning_proxy.provision_ip_trunk")
@patch("gso.workflows.iptrunk.create_iptrunk.infoblox.allocate_v6_network")
@patch("gso.workflows.iptrunk.create_iptrunk.infoblox.allocate_v4_network")
def test_iptrunk_creation_fails_when_lso_return_code_is_one(
mock_allocate_v4_network,
mock_allocate_v6_network,
mock_provision_ip_trunk,
mock_check_ip_trunk,
responses,
input_form_wizard_data,
faker,
):
mock_allocate_v4_network.return_value = faker.ipv4_network()
mock_allocate_v6_network.return_value = faker.ipv6_network()
product_id = get_product_id_by_name(ProductType.IP_TRUNK)
initial_site_data = [{"product": product_id}, *input_form_wizard_data]
result, process_stat, step_log = run_workflow("create_iptrunk", initial_site_data)
assert_suspended(result)
standard_lso_result = {
"callback_result": {
"status": "ok",
"job_id": "random_job_id",
"output": "parsed_output",
"return_code": 1,
},
"confirm": "ACCEPTED",
}
attempts = 3
for _ in range(0, attempts - 1):
result, step_log = _user_accept_and_assert_suspended(process_stat, step_log, standard_lso_result)
result, step_log = _user_accept_and_assert_suspended(process_stat, step_log)
result, step_log = _user_accept_and_assert_suspended(process_stat, step_log, standard_lso_result)
result, step_log = resume_workflow(process_stat, step_log, {})
assert_aborted(result)
assert mock_provision_ip_trunk.call_count == attempts
assert mock_check_ip_trunk.call_count == 0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment