Skip to content
Snippets Groups Projects
Verified Commit c1cd83f6 authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

fix __init__ in the test.workflow module, and fix the iptrunk creation workflow unit test

parent 3f27b8a5
Branches
Tags
1 merge request!96Make use of new callback step for external provisioning
......@@ -10,8 +10,7 @@ from orchestrator.db import ProcessTable
from orchestrator.services.processes import StateMerger, _db_create_process
from orchestrator.types import FormGenerator, InputForm, State
from orchestrator.utils.json import json_dumps, json_loads
from orchestrator.workflow import Process as WFProcess
from orchestrator.workflow import ProcessStat, Step, Success, Workflow, runwf
from orchestrator.workflow import Process, ProcessStat, Step, Success, Workflow, runwf
from orchestrator.workflows import ALL_WORKFLOWS, LazyWorkflowInstance, get_workflow
from pydantic_forms.core import post_form
......@@ -102,8 +101,6 @@ def extract_state(result):
def extract_error(result):
from orchestrator.workflow import Process
assert isinstance(result, Process), f"Expected a Process, but got {repr(result)} of type {type(result)}"
assert not isinstance(result.s, Process), "Result contained a Process in a Process, this should not happen"
......@@ -149,35 +146,33 @@ class WorkflowInstanceForTests(LazyWorkflowInstance):
return f"WorkflowInstanceForTests('{self.workflow}','{self.name}')"
def _store_step(step_log: list[tuple[Step, WFProcess]]) -> Callable[[ProcessStat, Step, WFProcess], WFProcess]:
def __store_step(pstat: ProcessStat, step: Step, state: WFProcess) -> WFProcess:
def _store_step(step_log: list[tuple[Step, Process]]) -> Callable[[ProcessStat, Step, Process], Process]:
def __store_step(pstat: ProcessStat, step: Step, process: Process) -> Process:
try:
state = state.map(lambda s: json_loads(json_dumps(s)))
process = process.map(lambda s: json_loads(json_dumps(s)))
except Exception:
logger.exception("Step state is not valid json", state=state)
step_log.append((step, state))
return state
logger.exception("Step state is not valid json", process=process)
state = process.unwrap()
state.pop("__step_name_override", None)
for k in state.get("__remove_keys", []) + ["__remove_keys"]:
state.pop(k, None)
if state.pop("__replace_last_state", None):
step_log[-1] = (step, process)
else:
step_log.append((step, process))
return process
return __store_step
def _sanitize_input(input_data: State | list[State]) -> list[State]:
# To be backwards compatible convert single dict to list
if not isinstance(input_data, list):
input_data = [input_data]
# We need a copy here, and we want to mimic the actual code that returns a serialized version of the state
return cast(list[State], json_loads(json_dumps(input_data)))
def run_workflow(workflow_key: str, input_data: State | list[State]) -> tuple[WFProcess, ProcessStat, list]:
def run_workflow(workflow_key: str, input_data: State | list[State]) -> tuple[Process, ProcessStat, list]:
# ATTENTION!! This code needs to be as similar as possible to `server.services.processes.start_process`
# The main differences are: we use a different step log function, and we don't run in
# a separate thread
user_data = _sanitize_input(input_data)
user = "john.doe"
step_log: list[tuple[Step, WFProcess]] = []
step_log: list[tuple[Step, Process]] = []
process_id = uuid4()
workflow = get_workflow(workflow_key)
......@@ -189,7 +184,7 @@ def run_workflow(workflow_key: str, input_data: State | list[State]) -> tuple[WF
"workflow_target": workflow.target,
}
user_input = post_form(workflow.initial_input_form, initial_state, user_data)
user_input = post_form(workflow.initial_input_form, initial_state, input_data)
pstat = ProcessStat(
process_id,
......@@ -207,25 +202,33 @@ def run_workflow(workflow_key: str, input_data: State | list[State]) -> tuple[WF
def resume_workflow(
process: ProcessStat, step_log: list[tuple[Step, WFProcess]], input_data: State | list[State]
) -> tuple[WFProcess, list]:
process: ProcessStat, step_log: list[tuple[Step, Process]], input_data: State | list[State]
) -> tuple[Process, list]:
# ATTENTION!! This code needs to be as similar as possible to `server.services.processes.resume_process`
# The main differences are: we use a different step log function, and we don't run in a separate thread
user_data = _sanitize_input(input_data)
persistent = list(filter(lambda p: not (p[1].isfailed() or p[1].issuspend() or p[1].iswaiting()), step_log))
persistent = list(
filter(
lambda p: not (p[1].isfailed() or p[1].issuspend() or p[1].iswaiting() or p[1].isawaitingcallback()),
step_log,
)
)
nr_of_steps_done = len(persistent)
remaining_steps = process.workflow.steps[nr_of_steps_done:]
# Make sure we get the last state from the suspend step (since we removed it before)
if step_log and step_log[-1][1].issuspend():
_, current_state = step_log[-1]
elif step_log and step_log[-1][1].isawaitingcallback():
_, current_state = step_log[-1]
elif persistent:
_, current_state = persistent[-1]
else:
current_state = Success({})
user_input = post_form(remaining_steps[0].form, current_state.unwrap(), user_data)
if step_log and step_log[-1][1].isawaitingcallback():
# Data is given as input by the external system, not a form.
user_input = input_data
else:
user_input = post_form(remaining_steps[0].form, current_state.unwrap(), input_data)
state = current_state.map(lambda state: StateMerger.merge(deepcopy(state), user_input))
updated_process = process.update(log=remaining_steps, state=state)
......
......@@ -3,7 +3,6 @@ from os import PathLike
from unittest.mock import patch
import pytest
from orchestrator.services.processes import continue_awaiting_process
from gso.products import Iptrunk, ProductType
from gso.products.product_blocks.iptrunk import IptrunkType, PhyPortCapacity
......@@ -110,19 +109,21 @@ def test_successful_iptrunk_creation_with_standard_lso_result(
product_id = get_product_id_by_name(ProductType.IP_TRUNK)
initial_site_data = [{"product": product_id}, *input_form_wizard_data]
result, process_stat, step_log = run_workflow("create_iptrunk", initial_site_data)
assert_awaiting_callback(result)
standard_lso_result = {
"status": "success",
"job_id": str(uuid.uuid4()),
"output": "parsed_output",
"return_code": 0
"callback_result": {
"status": "success",
"job_id": str(uuid.uuid4()),
"output": "parsed_output",
"return_code": 0,
}
}
for _ in range(2):
for _ in range(6):
assert_awaiting_callback(result)
current_state = extract_state(result)
continue_awaiting_process(process_stat, token=current_state["callback_token"], input_data=standard_lso_result)
result, step_log = resume_workflow(process_stat, step_log, input_data=standard_lso_result)
assert_suspended(result)
result, step_log = resume_workflow(process_stat, step_log, input_data=[{}])
assert_complete(result)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment