Skip to content
Snippets Groups Projects
Verified Commit b1aaca9a authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

add input step that prompts for Sharepoint checklist creation in iptrunk creation workflow

parent ba99996e
No related branches found
No related tags found
1 merge request!170add input step that prompts for Sharepoint checklist creation
Pipeline #85893 passed
......@@ -169,7 +169,7 @@ class EmailParams(BaseSettings):
class SharepointParams(BaseSettings):
"""Settings for different Sharepoint sites."""
# TODO: Stricter typing after pydantic 2.x upgrade
# TODO: Stricter typing after Pydantic 2.x upgrade
checklist_site_url: str
......
......@@ -3,12 +3,13 @@
import json
from uuid import uuid4
from orchestrator.config.assignee import Assignee
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Choice, UniqueConstrainedList
from orchestrator.forms.validators import Choice, Label, UniqueConstrainedList
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr
from orchestrator.utils.json import json_dumps
from orchestrator.workflow import StepList, conditional, done, init, step, workflow
from orchestrator.workflow import StepList, conditional, done, init, inputstep, step, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from orchestrator.workflows.utils import wrap_create_initial_input_form
from pydantic import validator
......@@ -21,7 +22,7 @@ from gso.products.product_blocks.iptrunk import (
IptrunkType,
PhyPortCapacity,
)
from gso.products.product_types.iptrunk import IptrunkInactive
from gso.products.product_types.iptrunk import IptrunkInactive, IptrunkProvisioning
from gso.products.product_types.router import Router
from gso.services import infoblox, subscriptions
from gso.services.crm import get_customer_by_name
......@@ -472,6 +473,27 @@ def netbox_allocate_side_b_interfaces(subscription: IptrunkInactive) -> None:
_allocate_interfaces_in_netbox(subscription.iptrunk.iptrunk_sides[1])
@inputstep("Prompt for new Sharepoint checklist", assignee=Assignee.SYSTEM)
def prompt_start_new_checklist(subscription: IptrunkProvisioning) -> FormGenerator:
"""Prompt the operator to start a new checklist in Sharepoint for approving this new IP trunk."""
oss_params = load_oss_params()
class SharepointPrompt(FormPage):
class Config:
title = "Start new checklist"
info_label_1: Label = (
f"Visit {oss_params.SHAREPOINT.checklist_site_url} and start a new Sharepoint checklist for an IPtrunk " # type: ignore[assignment]
f"from {subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn} to "
f"{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}."
)
info_label_2: Label = "Once this is done, click proceed to finish the workflow." # type: ignore[assignment]
yield SharepointPrompt
return {}
@workflow(
"Create IP trunk",
initial_input_form=wrap_create_initial_input_form(initial_input_form_generator),
......@@ -510,6 +532,7 @@ def create_iptrunk() -> StepList:
>> side_a_is_nokia(netbox_allocate_side_a_interfaces)
>> side_b_is_nokia(netbox_allocate_side_b_interfaces)
>> set_status(SubscriptionLifecycle.PROVISIONING)
>> prompt_start_new_checklist
>> resync
>> done
)
......@@ -8,12 +8,15 @@ from gso.products.product_blocks.iptrunk import IptrunkType, PhyPortCapacity
from gso.services.subscriptions import get_product_id_by_name
from gso.utils.helpers import LAGMember
from gso.utils.shared_enums import Vendor
from test import USER_CONFIRM_EMPTY_FORM
from test.services.conftest import MockedNetboxClient
from test.workflows import (
assert_complete,
assert_pp_interaction_failure,
assert_pp_interaction_success,
assert_suspended,
extract_state,
resume_workflow,
run_workflow,
)
......@@ -120,6 +123,9 @@ def test_successful_iptrunk_creation_with_standard_lso_result(
for _ in range(6):
result, step_log = assert_pp_interaction_success(result, process_stat, step_log)
assert_suspended(result)
result, step_log = resume_workflow(process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
assert_complete(result)
state = extract_state(result)
......@@ -196,4 +202,7 @@ def test_successful_iptrunk_creation_with_juniper_interface_names(
for _ in range(6):
result, step_log = assert_pp_interaction_success(result, process_stat, step_log)
assert_suspended(result)
result, step_log = resume_workflow(process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
assert_complete(result)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment