Skip to content
Snippets Groups Projects
Verified Commit dfc81080 authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

Make it Simone-proof

parent d6eb9dbe
No related branches found
No related tags found
1 merge request!165router creation flow update
Pipeline #85810 passed
"""Activate router takes a provisioning router to the active lifecycle state."""
from orchestrator.config.assignee import Assignee
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Label
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, SubscriptionLifecycle, UUIDstr
from orchestrator.workflow import StepList, done, init, workflow
from orchestrator.workflow import StepList, done, init, inputstep, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
......@@ -16,13 +17,25 @@ def _initial_input_form(subscription_id: UUIDstr) -> FormGenerator:
class ActivateRouterForm(FormPage):
info_label: Label = "Start approval process for router activation." # type:ignore[assignment]
tt_number: str
user_input = yield ActivateRouterForm
return user_input.dict() | {"subscription": router}
@inputstep("Verify checklist completion", assignee=Assignee.SYSTEM)
def verify_complete_checklist() -> FormGenerator:
"""Show a form for the operator to input a link to the completed checklist."""
class VerifyCompleteForm(FormPage):
info_label: Label = "Verify that the checklist has been completed. Then continue this workflow." # type: ignore[assignment]
checklist_url: str = ""
user_input = yield VerifyCompleteForm
return {"checklist_url": user_input.dict()["checklist_url"]}
@workflow(
"Activate a router",
initial_input_form=wrap_modify_initial_input_form(_initial_input_form),
......@@ -39,6 +52,7 @@ def activate_router() -> StepList:
init
>> store_process_subscription(Target.MODIFY)
>> unsync
>> verify_complete_checklist
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
......
......@@ -3,7 +3,9 @@ import pytest
from gso.products import Router
from test.workflows import (
assert_complete,
assert_suspended,
extract_state,
resume_workflow,
run_workflow,
)
......@@ -19,8 +21,11 @@ def test_activate_router_success(
assert Router.from_subscription(product_id).status == "provisioning"
# Run workflow
initial_input_data = [{"subscription_id": product_id}, {"tt_number": faker.tt_number()}]
result, _, _ = run_workflow("activate_router", initial_input_data)
initial_input_data = [{"subscription_id": product_id}, {}]
result, process_stat, step_log = run_workflow("activate_router", initial_input_data)
assert_suspended(result)
result, step_log = resume_workflow(process_stat, step_log, input_data=[{"checklist_url": "http://localhost"}])
assert_complete(result)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment