Skip to content
Snippets Groups Projects
Verified Commit ce8c8ea4 authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

update iptrunk creation test

parent c1cd83f6
No related branches found
No related tags found
1 merge request!96Make use of new callback step for external provisioning
from uuid import uuid4
LSO_RESULT_SUCCESS = {
"callback_result": {
"status": "success",
"job_id": str(uuid4()),
"output": "parsed_output",
"return_code": 0,
}
}
LSO_RESULT_FAILURE = {
"callback_result": {
"status": "failure",
"job_id": str(uuid4()),
"output": "parsed_output",
"return_code": 1,
}
}
USER_CONFIRM_EMPTY_FORM = [{}]
import uuid
from os import PathLike
from unittest.mock import patch
......@@ -9,6 +8,7 @@ from gso.products.product_blocks.iptrunk import IptrunkType, PhyPortCapacity
from gso.services.crm import customer_selector, get_customer_by_name
from gso.services.subscriptions import get_product_id_by_name
from gso.utils.helpers import LAGMember
from test import LSO_RESULT_SUCCESS, LSO_RESULT_FAILURE, USER_CONFIRM_EMPTY_FORM
from test.services.conftest import MockedNetboxClient
from test.workflows import (
assert_aborted,
......@@ -17,8 +17,7 @@ from test.workflows import (
assert_suspended,
extract_state,
resume_workflow,
run_workflow,
user_accept_and_assert_suspended,
run_workflow, assert_failed,
)
......@@ -110,18 +109,9 @@ def test_successful_iptrunk_creation_with_standard_lso_result(
initial_site_data = [{"product": product_id}, *input_form_wizard_data]
result, process_stat, step_log = run_workflow("create_iptrunk", initial_site_data)
standard_lso_result = {
"callback_result": {
"status": "success",
"job_id": str(uuid.uuid4()),
"output": "parsed_output",
"return_code": 0,
}
}
for _ in range(6):
assert_awaiting_callback(result)
result, step_log = resume_workflow(process_stat, step_log, input_data=standard_lso_result)
result, step_log = resume_workflow(process_stat, step_log, input_data=LSO_RESULT_SUCCESS)
assert_suspended(result)
result, step_log = resume_workflow(process_stat, step_log, input_data=[{}])
......@@ -160,26 +150,14 @@ def test_iptrunk_creation_fails_when_lso_return_code_is_one(
initial_site_data = [{"product": product_id}, *input_form_wizard_data]
result, process_stat, step_log = run_workflow("create_iptrunk", initial_site_data)
assert_suspended(result)
standard_lso_result = {
"pp_run_results": {
"status": "ok",
"job_id": "random_job_id",
"output": "parsed_output",
"return_code": 1,
},
"confirm": "ACCEPTED",
}
attempts = 3
for _ in range(0, attempts - 1):
result, step_log = user_accept_and_assert_suspended(process_stat, step_log, standard_lso_result)
result, step_log = user_accept_and_assert_suspended(process_stat, step_log, [{}, {}])
assert_awaiting_callback(result)
result, step_log = resume_workflow(process_stat, step_log, input_data=LSO_RESULT_SUCCESS)
assert_suspended(result)
result, step_log = resume_workflow(process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
result, step_log = user_accept_and_assert_suspended(process_stat, step_log, standard_lso_result)
result, step_log = resume_workflow(process_stat, step_log, [{}, {}])
assert_aborted(result)
assert_awaiting_callback(result)
result, step_log = resume_workflow(process_stat, step_log, input_data=LSO_RESULT_FAILURE)
assert_failed(result)
assert mock_provision_ip_trunk.call_count == attempts
assert mock_check_ip_trunk.call_count == 0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment