diff --git a/test/workflows/__init__.py b/test/workflows/__init__.py index 05c9de4f693d356f8d2ca614c7f9dba30896b912..2b42a33928c6c9932b3512bd64b66d56267e9d28 100644 --- a/test/workflows/__init__.py +++ b/test/workflows/__init__.py @@ -7,13 +7,13 @@ from uuid import uuid4 import structlog from orchestrator.db import ProcessTable -from orchestrator.forms import post_process from orchestrator.services.processes import StateMerger, _db_create_process from orchestrator.types import FormGenerator, InputForm, State from orchestrator.utils.json import json_dumps, json_loads from orchestrator.workflow import Process as WFProcess from orchestrator.workflow import ProcessStat, Step, Success, Workflow, runwf from orchestrator.workflows import ALL_WORKFLOWS, LazyWorkflowInstance, get_workflow +from pydantic_forms.core import post_form logger = structlog.get_logger(__name__) @@ -173,20 +173,24 @@ def run_workflow(workflow_key: str, input_data: Union[State, List[State]]) -> Tu step_log: List[Tuple[Step, WFProcess]] = [] - pid = uuid4() + process_id = uuid4() workflow = get_workflow(workflow_key) assert workflow, "Workflow does not exist" initial_state = { - "process_id": pid, + "process_id": process_id, "reporter": user, "workflow_name": workflow_key, "workflow_target": workflow.target, } - user_input = post_process(workflow.initial_input_form, initial_state, user_data) + user_input = post_form(workflow.initial_input_form, initial_state, user_data) pstat = ProcessStat( - pid, workflow=workflow, state=Success({**user_input, **initial_state}), log=workflow.steps, current_user=user + process_id, + workflow=workflow, + state=Success({**user_input, **initial_state}), + log=workflow.steps, + current_user=user, ) _db_create_process(pstat) @@ -200,17 +204,22 @@ def resume_workflow( process: ProcessStat, step_log: List[Tuple[Step, WFProcess]], input_data: State ) -> Tuple[WFProcess, List]: # ATTENTION!! This code needs to be as similar as possible to `server.services.processes.resume_process` - # The main differences are: we use a different step log function and we don't run in - # a sepperate thread + # The main differences are: we use a different step log function, and we don't run in a separate thread user_data = _sanitize_input(input_data) persistent = list(filter(lambda p: not (p[1].isfailed() or p[1].issuspend() or p[1].iswaiting()), step_log)) nr_of_steps_done = len(persistent) remaining_steps = process.workflow.steps[nr_of_steps_done:] - _, current_state = step_log[-1] + # Make sure we get the last state from the suspend step (since we removed it before) + if step_log and step_log[-1][1].issuspend(): + _, current_state = step_log[-1] + elif persistent: + _, current_state = persistent[-1] + else: + current_state = Success({}) - user_input = post_process(remaining_steps[0].form, current_state.unwrap(), user_data) + user_input = post_form(remaining_steps[0].form, current_state.unwrap(), user_data) state = current_state.map(lambda state: StateMerger.merge(deepcopy(state), user_input)) updated_process = process.update(log=remaining_steps, state=state)