diff --git a/test/workflows/__init__.py b/test/workflows/__init__.py index 4fcc19905d6b722eeabdc88f345854dea638d8b5..596393002c6ce5c2f273c329be374861d9e0c5db 100644 --- a/test/workflows/__init__.py +++ b/test/workflows/__init__.py @@ -10,8 +10,7 @@ from orchestrator.db import ProcessTable from orchestrator.services.processes import StateMerger, _db_create_process from orchestrator.types import FormGenerator, InputForm, State from orchestrator.utils.json import json_dumps, json_loads -from orchestrator.workflow import Process as WFProcess -from orchestrator.workflow import ProcessStat, Step, Success, Workflow, runwf +from orchestrator.workflow import Process, ProcessStat, Step, Success, Workflow, runwf from orchestrator.workflows import ALL_WORKFLOWS, LazyWorkflowInstance, get_workflow from pydantic_forms.core import post_form @@ -102,8 +101,6 @@ def extract_state(result): def extract_error(result): - from orchestrator.workflow import Process - assert isinstance(result, Process), f"Expected a Process, but got {repr(result)} of type {type(result)}" assert not isinstance(result.s, Process), "Result contained a Process in a Process, this should not happen" @@ -149,35 +146,33 @@ class WorkflowInstanceForTests(LazyWorkflowInstance): return f"WorkflowInstanceForTests('{self.workflow}','{self.name}')" -def _store_step(step_log: list[tuple[Step, WFProcess]]) -> Callable[[ProcessStat, Step, WFProcess], WFProcess]: - def __store_step(pstat: ProcessStat, step: Step, state: WFProcess) -> WFProcess: +def _store_step(step_log: list[tuple[Step, Process]]) -> Callable[[ProcessStat, Step, Process], Process]: + def __store_step(pstat: ProcessStat, step: Step, process: Process) -> Process: try: - state = state.map(lambda s: json_loads(json_dumps(s))) + process = process.map(lambda s: json_loads(json_dumps(s))) except Exception: - logger.exception("Step state is not valid json", state=state) - step_log.append((step, state)) - return state + logger.exception("Step state is not valid json", process=process) + + state = process.unwrap() + state.pop("__step_name_override", None) + for k in state.get("__remove_keys", []) + ["__remove_keys"]: + state.pop(k, None) + if state.pop("__replace_last_state", None): + step_log[-1] = (step, process) + else: + step_log.append((step, process)) + return process return __store_step -def _sanitize_input(input_data: State | list[State]) -> list[State]: - # To be backwards compatible convert single dict to list - if not isinstance(input_data, list): - input_data = [input_data] - - # We need a copy here, and we want to mimic the actual code that returns a serialized version of the state - return cast(list[State], json_loads(json_dumps(input_data))) - - -def run_workflow(workflow_key: str, input_data: State | list[State]) -> tuple[WFProcess, ProcessStat, list]: +def run_workflow(workflow_key: str, input_data: State | list[State]) -> tuple[Process, ProcessStat, list]: # ATTENTION!! This code needs to be as similar as possible to `server.services.processes.start_process` # The main differences are: we use a different step log function, and we don't run in # a separate thread - user_data = _sanitize_input(input_data) user = "john.doe" - step_log: list[tuple[Step, WFProcess]] = [] + step_log: list[tuple[Step, Process]] = [] process_id = uuid4() workflow = get_workflow(workflow_key) @@ -189,7 +184,7 @@ def run_workflow(workflow_key: str, input_data: State | list[State]) -> tuple[WF "workflow_target": workflow.target, } - user_input = post_form(workflow.initial_input_form, initial_state, user_data) + user_input = post_form(workflow.initial_input_form, initial_state, input_data) pstat = ProcessStat( process_id, @@ -207,25 +202,33 @@ def run_workflow(workflow_key: str, input_data: State | list[State]) -> tuple[WF def resume_workflow( - process: ProcessStat, step_log: list[tuple[Step, WFProcess]], input_data: State | list[State] -) -> tuple[WFProcess, list]: + process: ProcessStat, step_log: list[tuple[Step, Process]], input_data: State | list[State] +) -> tuple[Process, list]: # ATTENTION!! This code needs to be as similar as possible to `server.services.processes.resume_process` # The main differences are: we use a different step log function, and we don't run in a separate thread - user_data = _sanitize_input(input_data) - - persistent = list(filter(lambda p: not (p[1].isfailed() or p[1].issuspend() or p[1].iswaiting()), step_log)) + persistent = list( + filter( + lambda p: not (p[1].isfailed() or p[1].issuspend() or p[1].iswaiting() or p[1].isawaitingcallback()), + step_log, + ) + ) nr_of_steps_done = len(persistent) remaining_steps = process.workflow.steps[nr_of_steps_done:] - # Make sure we get the last state from the suspend step (since we removed it before) if step_log and step_log[-1][1].issuspend(): _, current_state = step_log[-1] + elif step_log and step_log[-1][1].isawaitingcallback(): + _, current_state = step_log[-1] elif persistent: _, current_state = persistent[-1] else: current_state = Success({}) - user_input = post_form(remaining_steps[0].form, current_state.unwrap(), user_data) + if step_log and step_log[-1][1].isawaitingcallback(): + # Data is given as input by the external system, not a form. + user_input = input_data + else: + user_input = post_form(remaining_steps[0].form, current_state.unwrap(), input_data) state = current_state.map(lambda state: StateMerger.merge(deepcopy(state), user_input)) updated_process = process.update(log=remaining_steps, state=state) diff --git a/test/workflows/iptrunk/test_create_iptrunk.py b/test/workflows/iptrunk/test_create_iptrunk.py index a152d1c6f6fb3b924ab9ad6226cc0a075c90e1a2..fc0b80e686c7badf4e8f43102e8722162733b74c 100644 --- a/test/workflows/iptrunk/test_create_iptrunk.py +++ b/test/workflows/iptrunk/test_create_iptrunk.py @@ -3,7 +3,6 @@ from os import PathLike from unittest.mock import patch import pytest -from orchestrator.services.processes import continue_awaiting_process from gso.products import Iptrunk, ProductType from gso.products.product_blocks.iptrunk import IptrunkType, PhyPortCapacity @@ -110,19 +109,21 @@ def test_successful_iptrunk_creation_with_standard_lso_result( product_id = get_product_id_by_name(ProductType.IP_TRUNK) initial_site_data = [{"product": product_id}, *input_form_wizard_data] result, process_stat, step_log = run_workflow("create_iptrunk", initial_site_data) - assert_awaiting_callback(result) standard_lso_result = { - "status": "success", - "job_id": str(uuid.uuid4()), - "output": "parsed_output", - "return_code": 0 + "callback_result": { + "status": "success", + "job_id": str(uuid.uuid4()), + "output": "parsed_output", + "return_code": 0, + } } - for _ in range(2): + for _ in range(6): assert_awaiting_callback(result) - current_state = extract_state(result) - continue_awaiting_process(process_stat, token=current_state["callback_token"], input_data=standard_lso_result) + result, step_log = resume_workflow(process_stat, step_log, input_data=standard_lso_result) + assert_suspended(result) + result, step_log = resume_workflow(process_stat, step_log, input_data=[{}]) assert_complete(result)