Skip to content
Snippets Groups Projects
Commit fada3935 authored by Mohammad Torkashvand's avatar Mohammad Torkashvand
Browse files

addjust edgeport migration with new L3 core service migration wf

parent 3dfe7933
No related branches found
No related tags found
2 merge requests!356Fix/nat 1009/fix redeploy base config if there is a vprn,!355add bulk migration for an edge port
Pipeline #91810 passed
Showing
with 155 additions and 114 deletions
...@@ -81,7 +81,7 @@ class ProductName(strEnum): ...@@ -81,7 +81,7 @@ class ProductName(strEnum):
L3_CORE_SERVICE_PRODUCT_TYPE = L3CoreService.__name__ L3_CORE_SERVICE_PRODUCT_TYPE = L3CoreService.__name__
L2_CORE_SERVICE_PRODUCT_TYPE = Layer2Circuit.__name__ L2_CIRCUIT_PRODUCT_TYPE = Layer2Circuit.__name__
class ProductType(strEnum): class ProductType(strEnum):
...@@ -116,9 +116,9 @@ class ProductType(strEnum): ...@@ -116,9 +116,9 @@ class ProductType(strEnum):
IMPORTED_LHCONE = ImportedL3CoreService.__name__ IMPORTED_LHCONE = ImportedL3CoreService.__name__
COPERNICUS = L3_CORE_SERVICE_PRODUCT_TYPE COPERNICUS = L3_CORE_SERVICE_PRODUCT_TYPE
IMPORTED_COPERNICUS = ImportedL3CoreService.__name__ IMPORTED_COPERNICUS = ImportedL3CoreService.__name__
GEANT_PLUS = L2_CORE_SERVICE_PRODUCT_TYPE GEANT_PLUS = L2_CIRCUIT_PRODUCT_TYPE
IMPORTED_GEANT_PLUS = ImportedLayer2Circuit.__name__ IMPORTED_GEANT_PLUS = ImportedLayer2Circuit.__name__
EXPRESSROUTE = L2_CORE_SERVICE_PRODUCT_TYPE EXPRESSROUTE = L2_CIRCUIT_PRODUCT_TYPE
IMPORTED_EXPRESSROUTE = ImportedLayer2Circuit.__name__ IMPORTED_EXPRESSROUTE = ImportedLayer2Circuit.__name__
VRF = VRF.__name__ VRF = VRF.__name__
......
...@@ -17,12 +17,12 @@ from orchestrator.db import ( ...@@ -17,12 +17,12 @@ from orchestrator.db import (
db, db,
) )
from orchestrator.domain import SubscriptionModel from orchestrator.domain import SubscriptionModel
from orchestrator.services.subscriptions import query_depends_on_subscriptions, query_in_use_by_subscriptions from orchestrator.services.subscriptions import query_in_use_by_subscriptions
from orchestrator.types import SubscriptionLifecycle, UUIDstr from orchestrator.types import SubscriptionLifecycle, UUIDstr
from sqlalchemy import and_, text from sqlalchemy import and_, text
from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.exc import SQLAlchemyError
from gso.products import L2_CORE_SERVICE_PRODUCT_TYPE, L3_CORE_SERVICE_PRODUCT_TYPE, ProductName, ProductType from gso.products import L2_CIRCUIT_PRODUCT_TYPE, L3_CORE_SERVICE_PRODUCT_TYPE, ProductName, ProductType
from gso.products.product_types.site import Site from gso.products.product_types.site import Site
SubscriptionType = dict[str, Any] SubscriptionType = dict[str, Any]
...@@ -185,7 +185,7 @@ def get_trunks_that_terminate_on_router( ...@@ -185,7 +185,7 @@ def get_trunks_that_terminate_on_router(
) )
def get_all_active_l3_core_services_on_top_of_edge_port(edge_port_id: UUIDstr) -> list[SubscriptionTable]: def get_active_l3_services_linked_to_edge_port(edge_port_id: UUIDstr) -> list[SubscriptionTable]:
"""Retrieve all active l3 core services that are on top of the given edge port. """Retrieve all active l3 core services that are on top of the given edge port.
Args: Args:
...@@ -207,14 +207,14 @@ def get_all_active_l3_core_services_on_top_of_edge_port(edge_port_id: UUIDstr) - ...@@ -207,14 +207,14 @@ def get_all_active_l3_core_services_on_top_of_edge_port(edge_port_id: UUIDstr) -
) )
def get_all_active_l2_core_services_on_top_of_edge_port(edge_port_id: UUIDstr) -> list[SubscriptionTable]: def get_active_l2_circuit_services_linked_to_edge_port(edge_port_id: UUIDstr) -> list[SubscriptionTable]:
"""Retrieve all active l2 core services that are on top of the given edge port.""" """Retrieve all active l2 circuit services that are on top of the given edge port."""
return ( return (
query_in_use_by_subscriptions(UUID(edge_port_id)) query_in_use_by_subscriptions(UUID(edge_port_id))
.join(ProductTable) .join(ProductTable)
.filter( .filter(
and_( and_(
ProductTable.product_type.in_([L2_CORE_SERVICE_PRODUCT_TYPE]), ProductTable.product_type.in_([L2_CIRCUIT_PRODUCT_TYPE]),
SubscriptionTable.status == SubscriptionLifecycle.ACTIVE, SubscriptionTable.status == SubscriptionLifecycle.ACTIVE,
) )
) )
...@@ -222,22 +222,6 @@ def get_all_active_l2_core_services_on_top_of_edge_port(edge_port_id: UUIDstr) - ...@@ -222,22 +222,6 @@ def get_all_active_l2_core_services_on_top_of_edge_port(edge_port_id: UUIDstr) -
) )
def get_edge_port_subscription_of_l3_core_service(edge_port_subscription_id: UUIDstr) -> list[SubscriptionModel]:
"""Retrieve all active Edge Port subscriptions that are used by the given L3 Core Service."""
query = (
query_depends_on_subscriptions(
UUID(edge_port_subscription_id), filter_statuses=[SubscriptionLifecycle.ACTIVE.value]
)
.join(ProductTable)
.filter(
ProductTable.product_type.in_([ProductType.EDGE_PORT]),
SubscriptionTable.status == SubscriptionLifecycle.ACTIVE,
)
)
return [SubscriptionModel.from_subscription(subscription.subscription_id) for subscription in query.all()]
def get_product_id_by_name(product_name: ProductName) -> UUID: def get_product_id_by_name(product_name: ProductName) -> UUID:
"""Retrieve the UUID of a product by its name. """Retrieve the UUID of a product by its name.
......
"""celery tasks for gso.""" """celery tasks for GSO."""
...@@ -23,6 +23,7 @@ from gso.utils.helpers import generate_inventory_for_routers ...@@ -23,6 +23,7 @@ from gso.utils.helpers import generate_inventory_for_routers
from gso.utils.shared_enums import Vendor from gso.utils.shared_enums import Vendor
SKIP_MOODI_KEY = "__skip_moodi__" SKIP_MOODI_KEY = "__skip_moodi__"
IS_HUMAN_INITIATED_WF_KEY = "__is_human_initiated_wf_key__"
def _deploy_base_config( def _deploy_base_config(
......
"""A modification workflow that migrates an EdgePort to a different endpoint.""" """A modification workflow that migrates an EdgePort to a different endpoint."""
import json
import random import random
from typing import Annotated, Any from typing import Annotated, Any
from uuid import uuid4 from uuid import uuid4
...@@ -7,15 +8,16 @@ from uuid import uuid4 ...@@ -7,15 +8,16 @@ from uuid import uuid4
from annotated_types import Len from annotated_types import Len
from orchestrator import step, workflow from orchestrator import step, workflow
from orchestrator.config.assignee import Assignee from orchestrator.config.assignee import Assignee
from orchestrator.forms import FormPage from orchestrator.forms import FormPage, SubmitFormPage
from orchestrator.targets import Target from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, UUIDstr from orchestrator.types import FormGenerator, State, UUIDstr
from orchestrator.utils.errors import ProcessFailureError from orchestrator.utils.errors import ProcessFailureError
from orchestrator.utils.json import json_dumps
from orchestrator.workflow import StepList, begin, done, inputstep from orchestrator.workflow import StepList, begin, done, inputstep
from orchestrator.workflows.steps import resync, store_process_subscription, unsync from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic import AfterValidator, ConfigDict from pydantic import AfterValidator, ConfigDict, Field
from pydantic_forms.validators import Label, ReadOnlyField, validate_unique_list from pydantic_forms.validators import Divider, Label, ReadOnlyField, validate_unique_list
from pynetbox.models.dcim import Interfaces from pynetbox.models.dcim import Interfaces
from gso.products.product_blocks.edge_port import EdgePortAEMemberBlock from gso.products.product_blocks.edge_port import EdgePortAEMemberBlock
...@@ -25,8 +27,8 @@ from gso.services.lso_client import LSOState, lso_interaction ...@@ -25,8 +27,8 @@ from gso.services.lso_client import LSOState, lso_interaction
from gso.services.netbox_client import NetboxClient from gso.services.netbox_client import NetboxClient
from gso.services.partners import get_partner_by_id from gso.services.partners import get_partner_by_id
from gso.services.subscriptions import ( from gso.services.subscriptions import (
get_all_active_l2_core_services_on_top_of_edge_port, get_active_l2_circuit_services_linked_to_edge_port,
get_all_active_l3_core_services_on_top_of_edge_port, get_active_l3_services_linked_to_edge_port,
) )
from gso.tasks.start_process import start_process_task from gso.tasks.start_process import start_process_task
from gso.utils.helpers import ( from gso.utils.helpers import (
...@@ -50,6 +52,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: ...@@ -50,6 +52,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
tt_number: TTNumber tt_number: TTNumber
partner_name: ReadOnlyField(get_partner_by_id(subscription.customer_id).name, default_type=str) # type: ignore[valid-type] partner_name: ReadOnlyField(get_partner_by_id(subscription.customer_id).name, default_type=str) # type: ignore[valid-type]
divider: Divider = Field(None, exclude=True)
node: active_pe_router_selector(excludes=[subscription.edge_port.node.subscription.subscription_id]) # type: ignore[valid-type] node: active_pe_router_selector(excludes=[subscription.edge_port.node.subscription.subscription_id]) # type: ignore[valid-type]
initial_user_input = yield MigrateEdgePortForm initial_user_input = yield MigrateEdgePortForm
...@@ -93,7 +96,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: ...@@ -93,7 +96,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
"edge_port_ae_members", "edge_port_ae_members",
] ]
yield from create_summary_form(summary_form_data, subscription.product.name, summary_fields) yield from create_summary_form(summary_form_data, subscription.product.name, summary_fields)
return input_forms_data return input_forms_data | {"subscription": subscription}
@step("Update the EdgePort references") @step("Update the EdgePort references")
...@@ -114,9 +117,7 @@ def update_subscription_model( ...@@ -114,9 +117,7 @@ def update_subscription_model(
) )
subscription.edge_port.edge_port_description = description subscription.edge_port.edge_port_description = description
edge_port_ae_members = [EdgePortAEMemberBlock.new(subscription_id=uuid4(), **member) for member in ae_members] edge_port_ae_members = [EdgePortAEMemberBlock.new(subscription_id=uuid4(), **member) for member in ae_members]
subscription.edge_port.edge_port_ae_members = ( subscription.edge_port.edge_port_ae_members = edge_port_ae_members
edge_port_ae_members # TODO: Clear the existing AE members and inactivate them and remove them from netbox?
)
return {"subscription": subscription, "subscription_id": subscription.subscription_id} return {"subscription": subscription, "subscription_id": subscription.subscription_id}
...@@ -210,7 +211,7 @@ def allocate_interfaces_in_netbox(subscription: EdgePort) -> None: ...@@ -210,7 +211,7 @@ def allocate_interfaces_in_netbox(subscription: EdgePort) -> None:
def confirm_continue_move_fiber() -> FormGenerator: def confirm_continue_move_fiber() -> FormGenerator:
"""Wait for confirmation from an operator that the physical fiber has been moved.""" """Wait for confirmation from an operator that the physical fiber has been moved."""
class ProvisioningResultPage(FormPage): class ProvisioningResultPage(SubmitFormPage):
model_config = ConfigDict(title="Please confirm before continuing") model_config = ConfigDict(title="Please confirm before continuing")
info_label: Label = "New EdgePort has been deployed, wait for the physical connection to be moved." info_label: Label = "New EdgePort has been deployed, wait for the physical connection to be moved."
...@@ -221,10 +222,10 @@ def confirm_continue_move_fiber() -> FormGenerator: ...@@ -221,10 +222,10 @@ def confirm_continue_move_fiber() -> FormGenerator:
@inputstep("Wait for confirmation", assignee=Assignee.SYSTEM) @inputstep("Wait for confirmation", assignee=Assignee.SYSTEM)
def confirm_things_looks_good_in_moodi() -> FormGenerator: def confirm_graphs_looks_good_in_moodi() -> FormGenerator:
"""Wait for confirmation from an operator that the new Migration looks good so far.""" """Wait for confirmation from an operator that the new Migration looks good so far."""
class ProvisioningResultPage(FormPage): class ProvisioningResultPage(SubmitFormPage):
model_config = ConfigDict(title="Please confirm before continuing") model_config = ConfigDict(title="Please confirm before continuing")
info_label: Label = "Do you confirm that everything looks good in the Moodi before continuing?" info_label: Label = "Do you confirm that everything looks good in the Moodi before continuing?"
...@@ -234,6 +235,34 @@ def confirm_things_looks_good_in_moodi() -> FormGenerator: ...@@ -234,6 +235,34 @@ def confirm_things_looks_good_in_moodi() -> FormGenerator:
return {} return {}
@inputstep("Wait for confirmation", assignee=Assignee.SYSTEM)
def confirm_l3_core_service_migrations_are_complete() -> FormGenerator:
"""Wait for confirmation from an operator that all L3 core services have been completed successfully."""
class ProvisioningResultPage(SubmitFormPage):
model_config = ConfigDict(title="Please confirm before continuing")
info_label: Label = "Do you confirm that all L3 core service migrations have been completed successfully?"
yield ProvisioningResultPage
return {}
@inputstep("Wait for confirmation", assignee=Assignee.SYSTEM)
def confirm_l2_circuit_migrations_are_complete() -> FormGenerator:
"""Wait for confirmation from an operator that all L2 circuit migrations have been completed successfully."""
class ProvisioningResultPage(SubmitFormPage):
model_config = ConfigDict(title="Please confirm before continuing")
info_label: Label = "Do you confirm that all L2 circuit migrations have been completed successfully?"
yield ProvisioningResultPage
return {}
@step("Migrate L3 core services to new node") @step("Migrate L3 core services to new node")
def migrate_l3_core_services_to_new_node(subscription_id: UUIDstr, tt_number: TTNumber) -> State: def migrate_l3_core_services_to_new_node(subscription_id: UUIDstr, tt_number: TTNumber) -> State:
"""Migrate all L3 core services from the old EdgePort to the new EdgePort. """Migrate all L3 core services from the old EdgePort to the new EdgePort.
...@@ -241,9 +270,9 @@ def migrate_l3_core_services_to_new_node(subscription_id: UUIDstr, tt_number: TT ...@@ -241,9 +270,9 @@ def migrate_l3_core_services_to_new_node(subscription_id: UUIDstr, tt_number: TT
This sub migrations do not modify the L3 core services. This sub migrations do not modify the L3 core services.
The source and destination EdgePort remain the same for each service. The source and destination EdgePort remain the same for each service.
The migration playbook is executed once for each service to apply the configuration on the new node and as a result, The migration playbook is executed once for each service to apply the configuration on the new node and as a result,
the service bindings port and bgp sessions related to this edge port of each service will be moved to the new node. the service bindings port and BGP sessions related to this edge port of each service will be moved to the new node.
""" """
l3_core_services = get_all_active_l3_core_services_on_top_of_edge_port(subscription_id) l3_core_services = get_active_l3_services_linked_to_edge_port(subscription_id)
edge_port = EdgePort.from_subscription(subscription_id) edge_port = EdgePort.from_subscription(subscription_id)
for l3_core_service in l3_core_services: for l3_core_service in l3_core_services:
...@@ -255,7 +284,10 @@ def migrate_l3_core_services_to_new_node(subscription_id: UUIDstr, tt_number: TT ...@@ -255,7 +284,10 @@ def migrate_l3_core_services_to_new_node(subscription_id: UUIDstr, tt_number: TT
{ {
"tt_number": tt_number, "tt_number": tt_number,
"skip_moodi": True, "skip_moodi": True,
"is_human_initiated_wf": False,
"source_edge_port": str(edge_port.subscription_id), "source_edge_port": str(edge_port.subscription_id),
},
{
"destination_edge_port": str(edge_port.subscription_id), "destination_edge_port": str(edge_port.subscription_id),
}, },
], ],
...@@ -269,7 +301,7 @@ def migrate_l3_core_services_to_new_node(subscription_id: UUIDstr, tt_number: TT ...@@ -269,7 +301,7 @@ def migrate_l3_core_services_to_new_node(subscription_id: UUIDstr, tt_number: TT
@step("Migrate L2 circuits to new node") @step("Migrate L2 circuits to new node")
def migrate_l2_circuits_to_new_node(subscription_id: UUIDstr, tt_number: TTNumber) -> State: def migrate_l2_circuits_to_new_node(subscription_id: UUIDstr, tt_number: TTNumber) -> State:
"""Migrate Layer2 circuits from the old EdgePort to the new EdgePort.""" """Migrate Layer2 circuits from the old EdgePort to the new EdgePort."""
layer2_circuits = get_all_active_l2_core_services_on_top_of_edge_port(subscription_id) layer2_circuits = get_active_l2_circuit_services_linked_to_edge_port(subscription_id)
edge_port = EdgePort.from_subscription(subscription_id) edge_port = EdgePort.from_subscription(subscription_id)
for l2_core_service in layer2_circuits: for l2_core_service in layer2_circuits:
...@@ -294,16 +326,21 @@ def migrate_l2_circuits_to_new_node(subscription_id: UUIDstr, tt_number: TTNumbe ...@@ -294,16 +326,21 @@ def migrate_l2_circuits_to_new_node(subscription_id: UUIDstr, tt_number: TTNumbe
@step("[DRY RUN] Disable configuration on old router") @step("[DRY RUN] Disable configuration on old router")
def disable_old_config_dry( def disable_old_config_dry(
# subscription: EdgePort, TODO subscription: EdgePort,
process_id: UUIDstr, process_id: UUIDstr,
tt_number: str, tt_number: str,
) -> LSOState: ) -> LSOState:
"""Perform a dry run of disabling the old configuration on the routers.""" """Perform a dry run of disabling the old configuration on the routers."""
layer3_services = get_active_l3_services_linked_to_edge_port(str(subscription.subscription_id))
layer2_circuits = get_active_l2_circuit_services_linked_to_edge_port(str(subscription.subscription_id))
extra_vars = { extra_vars = {
"verb": "deactivate", "verb": "deactivate",
"config_object": "deactivate", "config_object": "deactivate",
"dry_run": True, "dry_run": True,
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} " f"- Deploy config for #TODO", "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} " f"- Deploy config for #TODO",
"l3_core_services": [json.loads(json_dumps(layer3_service)) for layer3_service in layer3_services],
"l2_circuits": [json.loads(json_dumps(layer2_circuit)) for layer2_circuit in layer2_circuits],
} }
return { return {
...@@ -346,6 +383,26 @@ def disable_old_config_real( ...@@ -346,6 +383,26 @@ def disable_old_config_real(
} }
@inputstep("Verify pre-check results", assignee=Assignee.SYSTEM)
def inform_operator_traffic_check() -> FormGenerator:
"""Wait for confirmation from an operator that the results from the pre-checks look OK.
In case the results look OK, the workflow can continue. If the results don't look OK, the workflow can still be
aborted at this time, without the subscription going out of sync. Moodi will also not start, and the subscription
model has not been updated yet. Effectively, this prevents any changes inside the orchestrator from occurring. The
one thing that must be rolled back manually, is the deactivated configuration that sits on the source device.
"""
class PreCheckPage(SubmitFormPage):
model_config = ConfigDict(title="Please confirm before continuing")
info_label_1: Label = "Please verify that traffic has moved as expected."
info_label_2: Label = "If traffic is misbehaving, this is your last chance to abort this workflow cleanly."
yield PreCheckPage
return {}
@workflow( @workflow(
"Migrate an EdgePort", "Migrate an EdgePort",
initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator), initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator),
...@@ -356,25 +413,25 @@ def migrate_edge_port() -> StepList: ...@@ -356,25 +413,25 @@ def migrate_edge_port() -> StepList:
return ( return (
begin begin
>> store_process_subscription(Target.MODIFY) >> store_process_subscription(Target.MODIFY)
>> unsync
>> lso_interaction(disable_old_config_dry) >> lso_interaction(disable_old_config_dry)
>> lso_interaction(disable_old_config_real) >> lso_interaction(disable_old_config_real)
>> inform_operator_traffic_check
>> unsync
>> update_subscription_model >> update_subscription_model
>> start_moodi() >> start_moodi()
# TODO: Neda mentioned if in the future we have nokia-to-nokia migration, then we need another step to # TODO: Add LAG de-allocation step for future Nokia-to-Nokia migration if needed.
# dealocate the lag-- but this is not needed for now right?
>> reserve_interfaces_in_netbox >> reserve_interfaces_in_netbox
>> lso_interaction(create_edge_port_dry) >> lso_interaction(create_edge_port_dry)
>> lso_interaction(create_edge_port_real) >> lso_interaction(create_edge_port_real)
>> confirm_continue_move_fiber >> confirm_continue_move_fiber
>> confirm_things_looks_good_in_moodi >> confirm_graphs_looks_good_in_moodi
>> resync >> resync
# TODO: Explain this step that at this point related subscriptions are unsynced and prevent
# any changes on l3/l2 services
>> migrate_l3_core_services_to_new_node >> migrate_l3_core_services_to_new_node
>> confirm_things_looks_good_in_moodi >> confirm_l3_core_service_migrations_are_complete
>> confirm_graphs_looks_good_in_moodi
>> migrate_l2_circuits_to_new_node >> migrate_l2_circuits_to_new_node
>> confirm_things_looks_good_in_moodi >> confirm_l2_circuit_migrations_are_complete
>> confirm_graphs_looks_good_in_moodi
>> stop_moodi() >> stop_moodi()
>> done >> done
) )
...@@ -11,7 +11,7 @@ even though the L2 Circuit Service is not changed. ...@@ -11,7 +11,7 @@ even though the L2 Circuit Service is not changed.
Operator-triggered migration: Operator-triggered migration:
When an operator initiates the workflow, they are required to specify both the source and destination EdgePorts. When an operator initiates the workflow, they are required to specify both the source and destination EdgePorts.
During the migration process, the system updates the related access_point.sbp.edge_port reference to replace the source During the migration process, the system updates the related edge_port reference to replace the source
EdgePort with the destination EdgePort and applies the necessary configuration changes to the router. EdgePort with the destination EdgePort and applies the necessary configuration changes to the router.
Important Note: Important Note:
......
...@@ -115,6 +115,3 @@ def modify_layer_2_circuit() -> StepList: ...@@ -115,6 +115,3 @@ def modify_layer_2_circuit() -> StepList:
>> resync >> resync
>> done >> done
) )
# TODO: where is ansible playbook execution steps to reflect this changes on the network?
...@@ -17,7 +17,7 @@ from orchestrator.forms import FormPage, SubmitFormPage ...@@ -17,7 +17,7 @@ from orchestrator.forms import FormPage, SubmitFormPage
from orchestrator.targets import Target from orchestrator.targets import Target
from orchestrator.utils.errors import ProcessFailureError from orchestrator.utils.errors import ProcessFailureError
from orchestrator.utils.json import json_dumps from orchestrator.utils.json import json_dumps
from orchestrator.workflow import StepList, begin, done, inputstep, step from orchestrator.workflow import StepList, begin, conditional, done, inputstep, step
from orchestrator.workflows.steps import resync, store_process_subscription, unsync from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic import ConfigDict, Field from pydantic import ConfigDict, Field
...@@ -29,7 +29,7 @@ from gso.products.product_types.l3_core_service import L3CoreService ...@@ -29,7 +29,7 @@ from gso.products.product_types.l3_core_service import L3CoreService
from gso.services.lso_client import LSOState, lso_interaction from gso.services.lso_client import LSOState, lso_interaction
from gso.services.subscriptions import get_active_edge_port_subscriptions from gso.services.subscriptions import get_active_edge_port_subscriptions
from gso.utils.types.tt_number import TTNumber from gso.utils.types.tt_number import TTNumber
from gso.utils.workflow_steps import start_moodi, stop_moodi from gso.utils.workflow_steps import IS_HUMAN_INITIATED_WF_KEY, SKIP_MOODI_KEY, start_moodi, stop_moodi
from gso.workflows.shared import create_summary_form from gso.workflows.shared import create_summary_form
...@@ -53,6 +53,8 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: ...@@ -53,6 +53,8 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
tt_number: TTNumber tt_number: TTNumber
divider: Divider = Field(None, exclude=True) divider: Divider = Field(None, exclude=True)
skip_moodi: bool = False
is_human_initiated_wf: bool = True
source_edge_port: source_edge_port_selector | str # type: ignore[valid-type] source_edge_port: source_edge_port_selector | str # type: ignore[valid-type]
source_ep_user_input = yield L3CoreServiceSourceEdgePortSelectionForm source_ep_user_input = yield L3CoreServiceSourceEdgePortSelectionForm
...@@ -77,21 +79,27 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: ...@@ -77,21 +79,27 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
destination_ep_user_input = yield L3CoreServiceEdgePortSelectionForm destination_ep_user_input = yield L3CoreServiceEdgePortSelectionForm
summary_input = { if source_ep_user_input.is_human_initiated_wf:
"source_edge_port": EdgePort.from_subscription(source_ep_user_input.source_edge_port).description, summary_input = {
"destination_edge_port": EdgePort.from_subscription( "source_edge_port": EdgePort.from_subscription(source_ep_user_input.source_edge_port).description,
destination_ep_user_input.destination_edge_port "destination_edge_port": EdgePort.from_subscription(
).description, destination_ep_user_input.destination_edge_port
} ).description,
yield from create_summary_form(summary_input, subscription.l3_core_service_type, list(summary_input.keys())) }
user_input = ( yield from create_summary_form(
summary_input, subscription.l3_core_service_type.value, list(summary_input.keys())
)
return (
{"subscription_id": subscription_id, "subscription": subscription} {"subscription_id": subscription_id, "subscription": subscription}
| source_ep_user_input.model_dump() | source_ep_user_input.model_dump()
| destination_ep_user_input.model_dump() | destination_ep_user_input.model_dump()
| {
IS_HUMAN_INITIATED_WF_KEY: source_ep_user_input.is_human_initiated_wf,
SKIP_MOODI_KEY: source_ep_user_input.skip_moodi,
}
) )
return user_input
@step("Show BGP neighbors") @step("Show BGP neighbors")
def show_bgp_neighbors( def show_bgp_neighbors(
...@@ -320,15 +328,17 @@ def update_subscription_model( ...@@ -320,15 +328,17 @@ def update_subscription_model(
) )
def migrate_l3_core_service() -> StepList: def migrate_l3_core_service() -> StepList:
"""Migrate a L3 Core Service to a destination Edge Port.""" """Migrate a L3 Core Service to a destination Edge Port."""
is_human_initiated_wf = conditional(lambda state: bool(state.get(IS_HUMAN_INITIATED_WF_KEY)))
return ( return (
begin begin
>> store_process_subscription(Target.MODIFY) >> store_process_subscription(Target.MODIFY)
>> lso_interaction(show_bgp_neighbors) # TODO: send OTRS email with pre-check results >> is_human_initiated_wf(lso_interaction(show_bgp_neighbors)) # TODO: send OTRS email with pre-check results
>> lso_interaction(deactivate_bgp_dry) >> is_human_initiated_wf(lso_interaction(deactivate_bgp_dry))
>> lso_interaction(deactivate_bgp_real) >> is_human_initiated_wf(lso_interaction(deactivate_bgp_real))
>> lso_interaction(deactivate_sbp_dry) >> is_human_initiated_wf(lso_interaction(deactivate_sbp_dry))
>> lso_interaction(deactivate_sbp_real) >> is_human_initiated_wf(lso_interaction(deactivate_sbp_real))
>> inform_operator_traffic_check >> is_human_initiated_wf(inform_operator_traffic_check)
>> unsync >> unsync
>> start_moodi() # TODO: include results from first LSO run >> start_moodi() # TODO: include results from first LSO run
>> generate_scoped_subscription_model >> generate_scoped_subscription_model
......
...@@ -272,7 +272,7 @@ def resume_workflow( ...@@ -272,7 +272,7 @@ def resume_workflow(
return result, step_log return result, step_log
def resume_suspend_workflow( def resume_suspended_workflow(
result, result,
process: ProcessStat, process: ProcessStat,
step_log: list[tuple[Step, Process]], step_log: list[tuple[Step, Process]],
...@@ -318,6 +318,5 @@ def assert_lso_interaction_failure(result: Process, process_stat: ProcessStat, s ...@@ -318,6 +318,5 @@ def assert_lso_interaction_failure(result: Process, process_stat: ProcessStat, s
def assert_stop_moodi(result: Process, process_stat: ProcessStat, step_log: list): def assert_stop_moodi(result: Process, process_stat: ProcessStat, step_log: list):
"""Assert a successful LSO execution in a workflow.""" """Assert a successful LSO execution in a workflow."""
assert_suspended(result) result, step_log = resume_suspended_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
result, step_log = resume_workflow(process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
return assert_lso_success(result, process_stat, step_log) return assert_lso_success(result, process_stat, step_log)
...@@ -14,7 +14,7 @@ from test.workflows import ( ...@@ -14,7 +14,7 @@ from test.workflows import (
assert_lso_interaction_success, assert_lso_interaction_success,
assert_stop_moodi, assert_stop_moodi,
extract_state, extract_state,
resume_suspend_workflow, resume_suspended_workflow,
run_workflow, run_workflow,
) )
...@@ -104,19 +104,16 @@ def test_successful_edge_port_migration( ...@@ -104,19 +104,16 @@ def test_successful_edge_port_migration(
initial_data = [{"subscription_id": str(edge_port.subscription_id)}, *input_form_wizard_data] initial_data = [{"subscription_id": str(edge_port.subscription_id)}, *input_form_wizard_data]
result, process_stat, step_log = run_workflow("migrate_edge_port", initial_data) result, process_stat, step_log = run_workflow("migrate_edge_port", initial_data)
# confirm inform_operator_traffic_check
result, step_log = resume_suspended_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
# Dry and Real run for disabling config and creating a new edge port # Dry and Real run for disabling config and creating a new edge port
for _ in range(3): for _ in range(3):
result, step_log = assert_lso_interaction_success(result, process_stat, step_log) result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
# confirm continue moved fiber # all the steps in the workflow that needs user confirmation
result, step_log = resume_suspend_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM) for _ in range(6):
result, step_log = resume_suspended_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
# confirm things in moodi looks good and start Migrate all L3 Core Services
result, step_log = resume_suspend_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
# confirm things in moodi looks good and start migration of L2 core services
result, step_log = resume_suspend_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
result, step_log = resume_suspend_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
result, step_log = assert_stop_moodi(result, process_stat, step_log) result, step_log = assert_stop_moodi(result, process_stat, step_log)
......
...@@ -16,9 +16,8 @@ from test.workflows import ( ...@@ -16,9 +16,8 @@ from test.workflows import (
assert_failed, assert_failed,
assert_lso_interaction_failure, assert_lso_interaction_failure,
assert_lso_interaction_success, assert_lso_interaction_success,
assert_suspended,
extract_state, extract_state,
resume_workflow, resume_suspended_workflow,
run_workflow, run_workflow,
) )
...@@ -134,8 +133,7 @@ def test_successful_iptrunk_creation_with_standard_lso_result( ...@@ -134,8 +133,7 @@ def test_successful_iptrunk_creation_with_standard_lso_result(
for _ in range(6): for _ in range(6):
result, step_log = assert_lso_interaction_success(result, process_stat, step_log) result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
assert_suspended(result) result, step_log = resume_suspended_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
result, step_log = resume_workflow(process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
assert_complete(result) assert_complete(result)
...@@ -227,8 +225,7 @@ def test_successful_iptrunk_creation_with_juniper_interface_names( ...@@ -227,8 +225,7 @@ def test_successful_iptrunk_creation_with_juniper_interface_names(
for _ in range(6): for _ in range(6):
result, step_log = assert_lso_interaction_success(result, process_stat, step_log) result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
assert_suspended(result) result, step_log = resume_suspended_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
result, step_log = resume_workflow(process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
assert_complete(result) assert_complete(result)
assert mock_execute_playbook.call_count == 6 assert mock_execute_playbook.call_count == 6
......
...@@ -13,6 +13,7 @@ from test.workflows import ( ...@@ -13,6 +13,7 @@ from test.workflows import (
assert_lso_interaction_success, assert_lso_interaction_success,
assert_suspended, assert_suspended,
extract_state, extract_state,
resume_suspended_workflow,
resume_workflow, resume_workflow,
run_workflow, run_workflow,
) )
...@@ -118,7 +119,7 @@ def interface_lists_are_equal(list1, list2): ...@@ -118,7 +119,7 @@ def interface_lists_are_equal(list1, list2):
@patch("gso.services.netbox_client.NetboxClient.free_interface") @patch("gso.services.netbox_client.NetboxClient.free_interface")
@patch("gso.services.netbox_client.NetboxClient.delete_interface") @patch("gso.services.netbox_client.NetboxClient.delete_interface")
@patch("gso.workflows.iptrunk.migrate_iptrunk.SharePointClient") @patch("gso.workflows.iptrunk.migrate_iptrunk.SharePointClient")
def test_migrate_iptrunk_success( # noqa: PLR0915 def test_migrate_iptrunk_success(
mock_sharepoint_client, mock_sharepoint_client,
mocked_delete_interface, mocked_delete_interface,
mocked_free_interface, mocked_free_interface,
...@@ -152,8 +153,7 @@ def test_migrate_iptrunk_success( # noqa: PLR0915 ...@@ -152,8 +153,7 @@ def test_migrate_iptrunk_success( # noqa: PLR0915
for _ in range(8): for _ in range(8):
result, step_log = assert_lso_interaction_success(result, process_stat, step_log) result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
assert_suspended(result) result, step_log = resume_suspended_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
result, step_log = resume_workflow(process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
for _ in range(8): for _ in range(8):
result, step_log = assert_lso_interaction_success(result, process_stat, step_log) result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
...@@ -165,8 +165,7 @@ def test_migrate_iptrunk_success( # noqa: PLR0915 ...@@ -165,8 +165,7 @@ def test_migrate_iptrunk_success( # noqa: PLR0915
result, step_log = assert_lso_interaction_success(result, process_stat, step_log) result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
# Continue workflow after it has displayed a checklist URL. # Continue workflow after it has displayed a checklist URL.
assert_suspended(result) result, step_log = resume_suspended_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
result, step_log = resume_workflow(process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
assert_complete(result) assert_complete(result)
......
...@@ -13,9 +13,8 @@ from test.workflows import ( ...@@ -13,9 +13,8 @@ from test.workflows import (
assert_complete, assert_complete,
assert_lso_interaction_success, assert_lso_interaction_success,
assert_stop_moodi, assert_stop_moodi,
assert_suspended,
extract_state, extract_state,
resume_workflow, resume_suspended_workflow,
run_workflow, run_workflow,
) )
...@@ -93,8 +92,7 @@ def test_create_l3_core_service_success( ...@@ -93,8 +92,7 @@ def test_create_l3_core_service_success(
for _ in range(lso_interaction_count): for _ in range(lso_interaction_count):
result, step_log = assert_lso_interaction_success(result, process_stat, step_log) result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
assert_suspended(result) result, step_log = resume_suspended_workflow(result, process_stat, step_log, USER_CONFIRM_EMPTY_FORM)
result, step_log = resume_workflow(process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
result, step_log = assert_stop_moodi(result, process_stat, step_log) result, step_log = assert_stop_moodi(result, process_stat, step_log)
......
...@@ -9,9 +9,9 @@ from test import USER_CONFIRM_EMPTY_FORM ...@@ -9,9 +9,9 @@ from test import USER_CONFIRM_EMPTY_FORM
from test.workflows import ( from test.workflows import (
assert_complete, assert_complete,
assert_lso_interaction_success, assert_lso_interaction_success,
assert_suspended, assert_stop_moodi,
extract_state, extract_state,
resume_workflow, resume_suspended_workflow,
run_workflow, run_workflow,
) )
...@@ -52,16 +52,17 @@ def test_migrate_l3_core_service_success( ...@@ -52,16 +52,17 @@ def test_migrate_l3_core_service_success(
for _ in range(5): for _ in range(5):
result, step_log = assert_lso_interaction_success(result, process_stat, step_log) result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
assert_suspended(result) result, step_log = resume_suspended_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
result, step_log = resume_workflow(process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
for _ in range(4): for _ in range(5):
result, step_log = assert_lso_interaction_success(result, process_stat, step_log) result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
result, step_log = assert_stop_moodi(result, process_stat, step_log)
assert_complete(result) assert_complete(result)
state = extract_state(result) state = extract_state(result)
subscription = L3CoreService.from_subscription(state["subscription_id"]) subscription = L3CoreService.from_subscription(state["subscription_id"])
assert mock_execute_playbook.call_count == 9 assert mock_execute_playbook.call_count == 11
assert subscription.insync assert subscription.insync
assert len(subscription.l3_core_service.ap_list) == 1 assert len(subscription.l3_core_service.ap_list) == 1
assert str(subscription.l3_core_service.ap_list[0].sbp.edge_port.owner_subscription_id) == destination_edge_port assert str(subscription.l3_core_service.ap_list[0].sbp.edge_port.owner_subscription_id) == destination_edge_port
...@@ -113,8 +114,7 @@ def test_migrate_l3_core_service_scoped_emission( ...@@ -113,8 +114,7 @@ def test_migrate_l3_core_service_scoped_emission(
for _ in range(4): for _ in range(4):
result, step_log = assert_lso_interaction_success(result, process_stat, step_log) result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
assert_suspended(result) result, step_log = resume_suspended_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
result, step_log = resume_workflow(process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
result, step_log = assert_lso_interaction_success(result, process_stat, step_log) result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
...@@ -130,13 +130,15 @@ def test_migrate_l3_core_service_scoped_emission( ...@@ -130,13 +130,15 @@ def test_migrate_l3_core_service_scoped_emission(
state["subscription"]["l3_core_service"] == state["__old_subscriptions__"][subscription_id]["l3_core_service"] state["subscription"]["l3_core_service"] == state["__old_subscriptions__"][subscription_id]["l3_core_service"]
) # Subscription is unchanged for now ) # Subscription is unchanged for now
for _ in range(3): for _ in range(4):
result, step_log = assert_lso_interaction_success(result, process_stat, step_log) result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
result, step_log = assert_stop_moodi(result, process_stat, step_log)
assert_complete(result) assert_complete(result)
state = extract_state(result) state = extract_state(result)
subscription = L3CoreService.from_subscription(state["subscription_id"]) subscription = L3CoreService.from_subscription(state["subscription_id"])
assert mock_execute_playbook.call_count == 9 assert mock_execute_playbook.call_count == 11
assert subscription.insync assert subscription.insync
assert len(subscription.l3_core_service.ap_list) == 5 assert len(subscription.l3_core_service.ap_list) == 5
assert str(subscription.l3_core_service.ap_list[3].sbp.edge_port.owner_subscription_id) == destination_edge_port assert str(subscription.l3_core_service.ap_list[3].sbp.edge_port.owner_subscription_id) == destination_edge_port
...@@ -17,6 +17,7 @@ from test.workflows import ( ...@@ -17,6 +17,7 @@ from test.workflows import (
assert_lso_interaction_success, assert_lso_interaction_success,
assert_suspended, assert_suspended,
extract_state, extract_state,
resume_suspended_workflow,
resume_workflow, resume_workflow,
run_workflow, run_workflow,
) )
...@@ -102,8 +103,7 @@ def test_create_nokia_router_success( ...@@ -102,8 +103,7 @@ def test_create_nokia_router_success(
result, step_log = assert_lso_interaction_success(result, process_stat, step_log) result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
assert_suspended(result) result, step_log = resume_suspended_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
result, step_log = resume_workflow(process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
assert_complete(result) assert_complete(result)
......
...@@ -13,6 +13,7 @@ from test.workflows import ( ...@@ -13,6 +13,7 @@ from test.workflows import (
assert_lso_interaction_success, assert_lso_interaction_success,
assert_suspended, assert_suspended,
extract_state, extract_state,
resume_suspended_workflow,
resume_workflow, resume_workflow,
run_workflow, run_workflow,
) )
...@@ -54,8 +55,7 @@ def test_create_switch_success( ...@@ -54,8 +55,7 @@ def test_create_switch_success(
result, step_log = assert_lso_interaction_success(result, process_stat, step_log) result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
# Sharepoint list created # Sharepoint list created
assert_suspended(result) result, step_log = resume_suspended_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
result, step_log = resume_workflow(process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
assert_complete(result) assert_complete(result)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment