Skip to content
Snippets Groups Projects
Commit 6c3fbefb authored by Mohammad Torkashvand's avatar Mohammad Torkashvand
Browse files

make test stuff compatible with orchestrator-core 1.3.0

parent ab58c961
No related branches found
No related tags found
1 merge request!72Feature/nat 260 unit test
Pipeline #84158 passed
...@@ -7,13 +7,13 @@ from uuid import uuid4 ...@@ -7,13 +7,13 @@ from uuid import uuid4
import structlog import structlog
from orchestrator.db import ProcessTable from orchestrator.db import ProcessTable
from orchestrator.forms import post_process
from orchestrator.services.processes import StateMerger, _db_create_process from orchestrator.services.processes import StateMerger, _db_create_process
from orchestrator.types import FormGenerator, InputForm, State from orchestrator.types import FormGenerator, InputForm, State
from orchestrator.utils.json import json_dumps, json_loads from orchestrator.utils.json import json_dumps, json_loads
from orchestrator.workflow import Process as WFProcess from orchestrator.workflow import Process as WFProcess
from orchestrator.workflow import ProcessStat, Step, Success, Workflow, runwf from orchestrator.workflow import ProcessStat, Step, Success, Workflow, runwf
from orchestrator.workflows import ALL_WORKFLOWS, LazyWorkflowInstance, get_workflow from orchestrator.workflows import ALL_WORKFLOWS, LazyWorkflowInstance, get_workflow
from pydantic_forms.core import post_form
logger = structlog.get_logger(__name__) logger = structlog.get_logger(__name__)
...@@ -173,20 +173,24 @@ def run_workflow(workflow_key: str, input_data: Union[State, List[State]]) -> Tu ...@@ -173,20 +173,24 @@ def run_workflow(workflow_key: str, input_data: Union[State, List[State]]) -> Tu
step_log: List[Tuple[Step, WFProcess]] = [] step_log: List[Tuple[Step, WFProcess]] = []
pid = uuid4() process_id = uuid4()
workflow = get_workflow(workflow_key) workflow = get_workflow(workflow_key)
assert workflow, "Workflow does not exist" assert workflow, "Workflow does not exist"
initial_state = { initial_state = {
"process_id": pid, "process_id": process_id,
"reporter": user, "reporter": user,
"workflow_name": workflow_key, "workflow_name": workflow_key,
"workflow_target": workflow.target, "workflow_target": workflow.target,
} }
user_input = post_process(workflow.initial_input_form, initial_state, user_data) user_input = post_form(workflow.initial_input_form, initial_state, user_data)
pstat = ProcessStat( pstat = ProcessStat(
pid, workflow=workflow, state=Success({**user_input, **initial_state}), log=workflow.steps, current_user=user process_id,
workflow=workflow,
state=Success({**user_input, **initial_state}),
log=workflow.steps,
current_user=user,
) )
_db_create_process(pstat) _db_create_process(pstat)
...@@ -200,17 +204,22 @@ def resume_workflow( ...@@ -200,17 +204,22 @@ def resume_workflow(
process: ProcessStat, step_log: List[Tuple[Step, WFProcess]], input_data: State process: ProcessStat, step_log: List[Tuple[Step, WFProcess]], input_data: State
) -> Tuple[WFProcess, List]: ) -> Tuple[WFProcess, List]:
# ATTENTION!! This code needs to be as similar as possible to `server.services.processes.resume_process` # ATTENTION!! This code needs to be as similar as possible to `server.services.processes.resume_process`
# The main differences are: we use a different step log function and we don't run in # The main differences are: we use a different step log function, and we don't run in a separate thread
# a sepperate thread
user_data = _sanitize_input(input_data) user_data = _sanitize_input(input_data)
persistent = list(filter(lambda p: not (p[1].isfailed() or p[1].issuspend() or p[1].iswaiting()), step_log)) persistent = list(filter(lambda p: not (p[1].isfailed() or p[1].issuspend() or p[1].iswaiting()), step_log))
nr_of_steps_done = len(persistent) nr_of_steps_done = len(persistent)
remaining_steps = process.workflow.steps[nr_of_steps_done:] remaining_steps = process.workflow.steps[nr_of_steps_done:]
_, current_state = step_log[-1] # Make sure we get the last state from the suspend step (since we removed it before)
if step_log and step_log[-1][1].issuspend():
_, current_state = step_log[-1]
elif persistent:
_, current_state = persistent[-1]
else:
current_state = Success({})
user_input = post_process(remaining_steps[0].form, current_state.unwrap(), user_data) user_input = post_form(remaining_steps[0].form, current_state.unwrap(), user_data)
state = current_state.map(lambda state: StateMerger.merge(deepcopy(state), user_input)) state = current_state.map(lambda state: StateMerger.merge(deepcopy(state), user_input))
updated_process = process.update(log=remaining_steps, state=state) updated_process = process.update(log=remaining_steps, state=state)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment