Skip to content
Snippets Groups Projects

Make use of new callback step for external provisioning

Merged Karel van Klink requested to merge feature/use-async-steps into develop
All threads resolved!
2 files
+ 43
39
Compare changes
  • Side-by-side
  • Inline
Files
2
@@ -3,7 +3,6 @@ from os import PathLike
from unittest.mock import patch
import pytest
from orchestrator.services.processes import continue_awaiting_process
from gso.products import Iptrunk, ProductType
from gso.products.product_blocks.iptrunk import IptrunkType, PhyPortCapacity
@@ -110,19 +109,21 @@ def test_successful_iptrunk_creation_with_standard_lso_result(
product_id = get_product_id_by_name(ProductType.IP_TRUNK)
initial_site_data = [{"product": product_id}, *input_form_wizard_data]
result, process_stat, step_log = run_workflow("create_iptrunk", initial_site_data)
assert_awaiting_callback(result)
standard_lso_result = {
"status": "success",
"job_id": str(uuid.uuid4()),
"output": "parsed_output",
"return_code": 0
"callback_result": {
"status": "success",
"job_id": str(uuid.uuid4()),
"output": "parsed_output",
"return_code": 0,
}
}
for _ in range(2):
for _ in range(6):
assert_awaiting_callback(result)
current_state = extract_state(result)
continue_awaiting_process(process_stat, token=current_state["callback_token"], input_data=standard_lso_result)
result, step_log = resume_workflow(process_stat, step_log, input_data=standard_lso_result)
assert_suspended(result)
result, step_log = resume_workflow(process_stat, step_log, input_data=[{}])
assert_complete(result)
Loading