diff --git a/gso/__init__.py b/gso/__init__.py index ba8f1bdcc6b8790f24e77e4d03f9285a81530d7b..610b109831a99599c7d2551fb8d1d88634cd5f8a 100644 --- a/gso/__init__.py +++ b/gso/__init__.py @@ -7,7 +7,6 @@ from orchestrator import OrchestratorCore, app_settings from orchestrator.cli.main import app as cli_app from orchestrator.graphql import SCALAR_OVERRIDES from orchestrator.services.tasks import initialise_celery -from orchestrator.settings import ExecutorType # noinspection PyUnresolvedReferences import gso.products @@ -37,18 +36,19 @@ def init_gso_app() -> OrchestratorCore: app.register_graphql(subscription_interface=custom_subscription_interface) app.include_router(api_router, prefix="/api") - if app_settings.EXECUTOR == ExecutorType.WORKER: - config = load_oss_params() - celery = Celery( - "geant-service-orchestrator", - broker=config.CELERY.broker_url, - backend=config.CELERY.result_backend, - include=["orchestrator.services.tasks"], - ) - celery.conf.update( - result_expires=config.CELERY.result_expires, - ) - gso_initialise_celery(celery) + oss_params = load_oss_params() + celery = Celery( + "geant-service-orchestrator", + broker=oss_params.CELERY.broker_url, + backend=oss_params.CELERY.result_backend, + include=["orchestrator.services.tasks", "gso.tasks.start_process"], + ) + celery.conf.update( + result_expires=oss_params.CELERY.result_expires, + task_always_eager=app_settings.TESTING, + task_eager_propagates=app_settings.TESTING, + ) + gso_initialise_celery(celery) return app diff --git a/gso/migrations/versions/2025-02-06_efebcde91f2f_add_migration_workflow_for_an_edgeport.py b/gso/migrations/versions/2025-02-06_efebcde91f2f_add_migration_workflow_for_an_edgeport.py new file mode 100644 index 0000000000000000000000000000000000000000..6005b58bec48a258590128d9ec2be8729e35ef41 --- /dev/null +++ b/gso/migrations/versions/2025-02-06_efebcde91f2f_add_migration_workflow_for_an_edgeport.py @@ -0,0 +1,39 @@ +"""Add a migration workflow for an EdgePort. + +Revision ID: efebcde91f2f +Revises: 8a65d0ed588e +Create Date: 2025-01-09 17:17:24.972289 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = 'efebcde91f2f' +down_revision = '16eef776a258' +branch_labels = None +depends_on = None + + +from orchestrator.migrations.helpers import create_workflow, delete_workflow + +new_workflows = [ + { + "name": "migrate_edge_port", + "target": "MODIFY", + "description": "Migrate an Edge Port", + "product_type": "EdgePort" + }, +] + + +def upgrade() -> None: + conn = op.get_bind() + for workflow in new_workflows: + create_workflow(conn, workflow) + + +def downgrade() -> None: + conn = op.get_bind() + for workflow in new_workflows: + delete_workflow(conn, workflow["name"]) diff --git a/gso/oss-params-example.json b/gso/oss-params-example.json index ac0a29de1cd5204fc2443e2a3e8c4473f52a730b..1a1495e4d56813a02048fc1d0698e99f2ea1766e 100644 --- a/gso/oss-params-example.json +++ b/gso/oss-params-example.json @@ -133,6 +133,6 @@ }, "MOODI": { "host": "moodi.test.gap.geant.org", - "moodi_enabled": false + "moodi_enabled": true } } diff --git a/gso/products/__init__.py b/gso/products/__init__.py index 7af2244220aa1b321ba57db01e2bcb1504a155c0..a1f5c19360301a18c539c1472d6c8fb3c7f8b535 100644 --- a/gso/products/__init__.py +++ b/gso/products/__init__.py @@ -81,7 +81,7 @@ class ProductName(strEnum): L3_CORE_SERVICE_PRODUCT_TYPE = L3CoreService.__name__ -L2_CORE_SERVICE_PRODUCT_TYPE = Layer2Circuit.__name__ +L2_CIRCUIT_PRODUCT_TYPE = Layer2Circuit.__name__ class ProductType(strEnum): @@ -116,9 +116,9 @@ class ProductType(strEnum): IMPORTED_LHCONE = ImportedL3CoreService.__name__ COPERNICUS = L3_CORE_SERVICE_PRODUCT_TYPE IMPORTED_COPERNICUS = ImportedL3CoreService.__name__ - GEANT_PLUS = L2_CORE_SERVICE_PRODUCT_TYPE + GEANT_PLUS = L2_CIRCUIT_PRODUCT_TYPE IMPORTED_GEANT_PLUS = ImportedLayer2Circuit.__name__ - EXPRESSROUTE = L2_CORE_SERVICE_PRODUCT_TYPE + EXPRESSROUTE = L2_CIRCUIT_PRODUCT_TYPE IMPORTED_EXPRESSROUTE = ImportedLayer2Circuit.__name__ VRF = VRF.__name__ diff --git a/gso/services/subscriptions.py b/gso/services/subscriptions.py index e83f7831ccaf1ad5928f94a866c3e086c65bf033..8c378697e64d863beb30854c5d18ce5184bcd7bf 100644 --- a/gso/services/subscriptions.py +++ b/gso/services/subscriptions.py @@ -19,10 +19,10 @@ from orchestrator.db import ( from orchestrator.domain import SubscriptionModel from orchestrator.services.subscriptions import query_in_use_by_subscriptions from orchestrator.types import SubscriptionLifecycle, UUIDstr -from sqlalchemy import text +from sqlalchemy import and_, text from sqlalchemy.exc import SQLAlchemyError -from gso.products import ProductName, ProductType +from gso.products import L2_CIRCUIT_PRODUCT_TYPE, L3_CORE_SERVICE_PRODUCT_TYPE, ProductName, ProductType from gso.products.product_types.site import Site SubscriptionType = dict[str, Any] @@ -185,6 +185,43 @@ def get_trunks_that_terminate_on_router( ) +def get_active_l3_services_linked_to_edge_port(edge_port_id: UUIDstr) -> list[SubscriptionTable]: + """Retrieve all active l3 core services that are on top of the given edge port. + + Args: + edge_port_id: The ID of the edge port. + + Returns: + A list of active services that are on top of the edge port. + """ + return ( + query_in_use_by_subscriptions(UUID(edge_port_id)) + .join(ProductTable) + .filter( + and_( + ProductTable.product_type.in_([L3_CORE_SERVICE_PRODUCT_TYPE]), + SubscriptionTable.status == SubscriptionLifecycle.ACTIVE, + ) + ) + .all() + ) + + +def get_active_l2_circuit_services_linked_to_edge_port(edge_port_id: UUIDstr) -> list[SubscriptionTable]: + """Retrieve all active l2 circuit services that are on top of the given edge port.""" + return ( + query_in_use_by_subscriptions(UUID(edge_port_id)) + .join(ProductTable) + .filter( + and_( + ProductTable.product_type.in_([L2_CIRCUIT_PRODUCT_TYPE]), + SubscriptionTable.status == SubscriptionLifecycle.ACTIVE, + ) + ) + .all() + ) + + def get_product_id_by_name(product_name: ProductName) -> UUID: """Retrieve the UUID of a product by its name. diff --git a/gso/tasks/__init__.py b/gso/tasks/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..6d94542359db04786aa0a3035bbd1346057d02ed --- /dev/null +++ b/gso/tasks/__init__.py @@ -0,0 +1 @@ +"""celery tasks for GSO.""" diff --git a/gso/tasks/start_process.py b/gso/tasks/start_process.py new file mode 100644 index 0000000000000000000000000000000000000000..5760bffc3b35ee39b220bbc311660dc672e3ed4c --- /dev/null +++ b/gso/tasks/start_process.py @@ -0,0 +1,11 @@ +"""Celery task to start a process with the given workflow key and state.""" + +from celery import shared_task + + +@shared_task +def start_process_task(workflow_key: str, user_inputs: list[dict[str, str]]) -> None: + """Start a process with the given workflow key and state.""" + from orchestrator.services.processes import start_process # noqa: PLC0415 + + start_process(workflow_key, user_inputs=user_inputs) diff --git a/gso/translations/en-GB.json b/gso/translations/en-GB.json index 1c6561b2a2efe8576aa5d6ae6ec2c3dd1d6a6230..c5d17a43401cfe1b9e454990189a885b0c7d0db1 100644 --- a/gso/translations/en-GB.json +++ b/gso/translations/en-GB.json @@ -52,6 +52,7 @@ "create_site": "Create Site", "create_switch": "Create Switch", "create_edge_port": "Create Edge Port", + "migrate_edge_port": "Migrate Edge Port", "create_l3_core_service": "Create L3 Core Service", "create_lan_switch_interconnect": "Create LAN Switch Interconnect", "deploy_twamp": "Deploy TWAMP", diff --git a/gso/utils/workflow_steps.py b/gso/utils/workflow_steps.py index bfa88b491dd96302641e05fc430a44cb99073d4f..fd5eac9a335ec46a25fa6f44bacf9e2694b6862e 100644 --- a/gso/utils/workflow_steps.py +++ b/gso/utils/workflow_steps.py @@ -22,6 +22,9 @@ from gso.settings import load_oss_params from gso.utils.helpers import generate_inventory_for_routers from gso.utils.shared_enums import Vendor +SKIP_MOODI_KEY = "__skip_moodi" +IS_HUMAN_INITIATED_WF_KEY = "__is_human_initiated_wf" + def _deploy_base_config( subscription: dict[str, Any], @@ -394,7 +397,7 @@ def prompt_sharepoint_checklist_url(checklist_url: str) -> FormGenerator: return {} -_is_moodi_enabled = conditional(lambda _: load_oss_params().MOODI.moodi_enabled) +_is_moodi_enabled = conditional(lambda state: load_oss_params().MOODI.moodi_enabled and not state.get(SKIP_MOODI_KEY)) def start_moodi() -> StepList: diff --git a/gso/worker.py b/gso/worker.py index f7bfa08326cdac2df484520ebb009258d8219809..3a28ba925d057537fee60e1d0385b6b72ca617da 100644 --- a/gso/worker.py +++ b/gso/worker.py @@ -85,6 +85,7 @@ celery = OrchestratorWorker( "gso.schedules.send_email_notifications", "gso.schedules.clean_old_tasks", "orchestrator.services.tasks", + "gso.tasks.start_process", ], ) diff --git a/gso/workflows/__init__.py b/gso/workflows/__init__.py index 92710b591759234a5239bf893b9909b84566448f..74f67c01f108350adb5ac7c8ce3667d762c21f66 100644 --- a/gso/workflows/__init__.py +++ b/gso/workflows/__init__.py @@ -116,6 +116,7 @@ LazyWorkflowInstance("gso.workflows.edge_port.terminate_edge_port", "terminate_e LazyWorkflowInstance("gso.workflows.edge_port.validate_edge_port", "validate_edge_port") LazyWorkflowInstance("gso.workflows.edge_port.create_imported_edge_port", "create_imported_edge_port") LazyWorkflowInstance("gso.workflows.edge_port.import_edge_port", "import_edge_port") +LazyWorkflowInstance("gso.workflows.edge_port.migrate_edge_port", "migrate_edge_port") # L3 Core Service workflows LazyWorkflowInstance("gso.workflows.l3_core_service.create_l3_core_service", "create_l3_core_service") diff --git a/gso/workflows/edge_port/migrate_edge_port.py b/gso/workflows/edge_port/migrate_edge_port.py new file mode 100644 index 0000000000000000000000000000000000000000..4602d72dd4dff19598d9932034b25f535d0f7271 --- /dev/null +++ b/gso/workflows/edge_port/migrate_edge_port.py @@ -0,0 +1,443 @@ +"""A modification workflow that migrates an EdgePort to a different endpoint.""" + +import json +import random +from typing import Annotated, Any +from uuid import uuid4 + +from annotated_types import Len +from orchestrator import step, workflow +from orchestrator.config.assignee import Assignee +from orchestrator.forms import FormPage, SubmitFormPage +from orchestrator.targets import Target +from orchestrator.types import FormGenerator, State, UUIDstr +from orchestrator.utils.errors import ProcessFailureError +from orchestrator.utils.json import json_dumps +from orchestrator.workflow import StepList, begin, done, inputstep +from orchestrator.workflows.steps import resync, store_process_subscription, unsync +from orchestrator.workflows.utils import wrap_modify_initial_input_form +from pydantic import AfterValidator, ConfigDict, Field +from pydantic_forms.validators import Divider, Label, ReadOnlyField, validate_unique_list +from pynetbox.models.dcim import Interfaces + +from gso.products.product_blocks.edge_port import EdgePortAEMemberBlock +from gso.products.product_types.edge_port import EdgePort +from gso.products.product_types.router import Router +from gso.services.lso_client import LSOState, lso_interaction +from gso.services.netbox_client import NetboxClient +from gso.services.partners import get_partner_by_id +from gso.services.subscriptions import ( + get_active_l2_circuit_services_linked_to_edge_port, + get_active_l3_services_linked_to_edge_port, +) +from gso.tasks.start_process import start_process_task +from gso.utils.helpers import ( + active_pe_router_selector, + available_interfaces_choices, + available_service_lags_choices, +) +from gso.utils.types.interfaces import LAGMember +from gso.utils.types.tt_number import TTNumber +from gso.utils.workflow_steps import start_moodi, stop_moodi +from gso.workflows.shared import create_summary_form + + +def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: + """Gather input from the operator on the new router that the EdgePort should connect to.""" + subscription = EdgePort.from_subscription(subscription_id) + form_title = f"Migrating {subscription.edge_port.edge_port_description} " + + class MigrateEdgePortForm(FormPage): + model_config = ConfigDict(title=form_title) + + tt_number: TTNumber + partner_name: ReadOnlyField(get_partner_by_id(subscription.customer_id).name, default_type=str) # type: ignore[valid-type] + divider: Divider = Field(None, exclude=True) + node: active_pe_router_selector(excludes=[subscription.edge_port.node.subscription.subscription_id]) # type: ignore[valid-type] + + initial_user_input = yield MigrateEdgePortForm + + class EdgePortLAGMember(LAGMember): + interface_name: available_interfaces_choices( # type: ignore[valid-type] + router_id=initial_user_input.node, speed=subscription.edge_port.member_speed + ) + + lag_ae_members = Annotated[ + list[EdgePortLAGMember], + AfterValidator(validate_unique_list), + Len( + min_length=len(subscription.edge_port.edge_port_ae_members), + max_length=len(subscription.edge_port.edge_port_ae_members), + ), + ] + + class SelectInterfaceForm(FormPage): + model_config = ConfigDict(title="Select Interfaces") + + name: available_service_lags_choices(router_id=initial_user_input.node) # type: ignore[valid-type] + description: str | None = None + ae_members: lag_ae_members + + interface_form_input_data = yield SelectInterfaceForm + + input_forms_data = initial_user_input.model_dump() | interface_form_input_data.model_dump() + summary_form_data = input_forms_data | { + "node": Router.from_subscription(initial_user_input.node).router.router_fqdn, + "partner_name": initial_user_input.partner_name, + "edge_port_name": input_forms_data["name"], + "edge_port_description": input_forms_data["description"], + "edge_port_ae_members": input_forms_data["ae_members"], + } + summary_fields = [ + "node", + "partner_name", + "edge_port_name", + "edge_port_description", + "edge_port_ae_members", + ] + yield from create_summary_form(summary_form_data, subscription.product.name, summary_fields) + return input_forms_data | {"subscription": subscription} + + +@step("Update the EdgePort references") +def update_subscription_model( + subscription: EdgePort, + node: UUIDstr, + name: str, + partner_name: str, + ae_members: list[dict[str, Any]], + description: str | None = None, +) -> State: + """Update the EdgePort subscription object in the service database with the new values.""" + router = Router.from_subscription(node).router + subscription.edge_port.node = router + subscription.edge_port.edge_port_name = name + subscription.description = ( + f"Edge Port {name} on {router.router_fqdn}, {partner_name}, {subscription.edge_port.ga_id or ""}" + ) + subscription.edge_port.edge_port_description = description + edge_port_ae_members = [EdgePortAEMemberBlock.new(subscription_id=uuid4(), **member) for member in ae_members] + subscription.edge_port.edge_port_ae_members = edge_port_ae_members + + return {"subscription": subscription, "subscription_id": subscription.subscription_id} + + +@step("Reserve interfaces in NetBox") +def reserve_interfaces_in_netbox(subscription: EdgePort) -> State: + """Create the LAG interfaces in NetBox and attach the LAG interfaces to the physical interfaces.""" + nbclient = NetboxClient() + edge_port = subscription.edge_port + # Create LAG interfaces + lag_interface: Interfaces = nbclient.create_interface( + iface_name=edge_port.edge_port_name, + interface_type="lag", + device_name=edge_port.node.router_fqdn, + description=str(subscription.subscription_id), + enabled=True, + ) + # Attach physical interfaces to LAG + # Update interface description to subscription ID + # Reserve interfaces + for interface in edge_port.edge_port_ae_members: + nbclient.attach_interface_to_lag( + device_name=edge_port.node.router_fqdn, + lag_name=lag_interface.name, + iface_name=interface.interface_name, + description=str(subscription.subscription_id), + ) + nbclient.reserve_interface( + device_name=edge_port.node.router_fqdn, + iface_name=interface.interface_name, + ) + return { + "subscription": subscription, + } + + +@step("[DRY RUN] Create edge port") +def create_edge_port_dry( + subscription: dict[str, Any], tt_number: str, process_id: UUIDstr, partner_name: str +) -> LSOState: + """Create a new edge port in the network as a dry run.""" + extra_vars = { + "dry_run": True, + "subscription": subscription, + "partner_name": partner_name, + "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Create Edge Port", + "verb": "create", + } + + return { + "playbook_name": "gap_ansible/playbooks/edge_port.yaml", + "inventory": {"all": {"hosts": {subscription["edge_port"]["node"]["router_fqdn"]: None}}}, + "extra_vars": extra_vars, + } + + +@step("[FOR REAL] Create edge port") +def create_edge_port_real( + subscription: dict[str, Any], tt_number: str, process_id: UUIDstr, partner_name: str +) -> LSOState: + """Create a new edge port in the network for real.""" + extra_vars = { + "dry_run": False, + "subscription": subscription, + "partner_name": partner_name, + "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Create Edge Port", + "verb": "create", + } + + return { + "playbook_name": "gap_ansible/playbooks/edge_port.yaml", + "inventory": {"all": {"hosts": {subscription["edge_port"]["node"]["router_fqdn"]: None}}}, + "extra_vars": extra_vars, + } + + +@step("Allocate interfaces in NetBox") +def allocate_interfaces_in_netbox(subscription: EdgePort) -> None: + """Allocate the interfaces in NetBox.""" + fqdn = subscription.edge_port.node.router_fqdn + for interface in subscription.edge_port.edge_port_ae_members: + iface_name = interface.interface_name + if not fqdn or not iface_name: + msg = "FQDN and/or interface name missing in subscription" + raise ProcessFailureError(msg, details={"fqdn": fqdn, "interface_name": iface_name}) + + NetboxClient().allocate_interface(device_name=fqdn, iface_name=iface_name) + + +@inputstep("Wait for confirmation", assignee=Assignee.SYSTEM) +def confirm_continue_move_fiber() -> FormGenerator: + """Wait for confirmation from an operator that the physical fiber has been moved.""" + + class ProvisioningResultPage(SubmitFormPage): + model_config = ConfigDict(title="Please confirm before continuing") + + info_label: Label = "New EdgePort has been deployed, wait for the physical connection to be moved." + + yield ProvisioningResultPage + + return {} + + +@inputstep("Wait for confirmation", assignee=Assignee.SYSTEM) +def confirm_graphs_looks_good_in_moodi() -> FormGenerator: + """Wait for confirmation from an operator that the new Migration looks good so far.""" + + class ProvisioningResultPage(SubmitFormPage): + model_config = ConfigDict(title="Please confirm before continuing") + + info_label: Label = "Do you confirm that everything looks good in Moodi before continuing the workflow?" + + yield ProvisioningResultPage + + return {} + + +@inputstep("Wait for confirmation", assignee=Assignee.SYSTEM) +def confirm_l3_core_service_migrations_are_complete() -> FormGenerator: + """Wait for confirmation from an operator that all L3 core services have been completed successfully.""" + + class ProvisioningResultPage(SubmitFormPage): + model_config = ConfigDict(title="Please confirm before continuing") + + info_label: Label = "Do you confirm that all L3 core service migrations have been completed successfully?" + + yield ProvisioningResultPage + + return {} + + +@inputstep("Wait for confirmation", assignee=Assignee.SYSTEM) +def confirm_l2_circuit_migrations_are_complete() -> FormGenerator: + """Wait for confirmation from an operator that all L2 circuit migrations have been completed successfully.""" + + class ProvisioningResultPage(SubmitFormPage): + model_config = ConfigDict(title="Please confirm before continuing") + + info_label: Label = "Do you confirm that all L2 circuit migrations have been completed successfully?" + + yield ProvisioningResultPage + + return {} + + +@step("Migrate L3 core services to new node") +def migrate_l3_core_services_to_new_node(subscription_id: UUIDstr, tt_number: TTNumber) -> State: + """Migrate all L3 core services from the old EdgePort to the new EdgePort. + + This sub migrations do not modify the L3 core services. + The source and destination EdgePort remain the same for each service. + The migration playbook is executed once for each service to apply the configuration on the new node and as a result, + the service bindings port and BGP sessions related to this edge port of each service will be moved to the new node. + """ + l3_core_services = get_active_l3_services_linked_to_edge_port(subscription_id) + edge_port = EdgePort.from_subscription(subscription_id) + + for l3_core_service in l3_core_services: + start_process_task.apply_async( # type: ignore[attr-defined] + args=[ + "migrate_l3_core_service", + [ + {"subscription_id": str(l3_core_service.subscription_id)}, + { + "tt_number": tt_number, + "skip_moodi": True, + "is_human_initiated_wf": False, + "source_edge_port": str(edge_port.subscription_id), + }, + { + "destination_edge_port": str(edge_port.subscription_id), + }, + ], + ], + countdown=random.choice([2, 3, 4, 5]), # noqa: S311 + ) + + return {"l3_core_services": l3_core_services} + + +@step("Migrate L2 circuits to new node") +def migrate_l2_circuits_to_new_node(subscription_id: UUIDstr, tt_number: TTNumber) -> State: + """Migrate Layer2 circuits from the old EdgePort to the new EdgePort.""" + layer2_circuits = get_active_l2_circuit_services_linked_to_edge_port(subscription_id) + edge_port = EdgePort.from_subscription(subscription_id) + + for l2_core_service in layer2_circuits: + start_process_task.apply_async( # type: ignore[attr-defined] + args=[ + "migrate_layer_2_circuit", + [ + {"subscription_id": str(l2_core_service.subscription_id)}, + { + "tt_number": tt_number, + "skip_moodi": True, + "is_human_initiated_wf": False, + "source_edge_port": str(edge_port.subscription_id), + "destination_edge_port": str(edge_port.subscription_id), + }, + ], + ], + countdown=random.choice([2, 3, 4, 5]), # noqa: S311 + ) + + return {"layer2_circuits": layer2_circuits} + + +@step("[DRY RUN] Disable configuration on old router") +def disable_old_config_dry( + subscription: EdgePort, + process_id: UUIDstr, + tt_number: str, +) -> LSOState: + """Perform a dry run of disabling the old configuration on the routers.""" + layer3_services = get_active_l3_services_linked_to_edge_port(str(subscription.subscription_id)) + layer2_circuits = get_active_l2_circuit_services_linked_to_edge_port(str(subscription.subscription_id)) + + extra_vars = { + "verb": "deactivate", + "config_object": "deactivate", + "dry_run": True, + "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} " f"- Deploy config for #TODO", + "l3_core_services": [json.loads(json_dumps(layer3_service)) for layer3_service in layer3_services], + "l2_circuits": [json.loads(json_dumps(layer2_circuit)) for layer2_circuit in layer2_circuits], + } + + return { + "playbook_name": "gap_ansible/playbooks/iptrunks_migration.yaml", + "inventory": { + "all": { + "hosts": { + # TODO + } + } + }, + "extra_vars": extra_vars, + } + + +@step("[FOR REAL] Disable configuration on old router") +def disable_old_config_real( + subscription: EdgePort, + process_id: UUIDstr, + tt_number: str, +) -> LSOState: + """Disable old configuration on the routers.""" + layer3_services = get_active_l3_services_linked_to_edge_port(str(subscription.subscription_id)) + layer2_circuits = get_active_l2_circuit_services_linked_to_edge_port(str(subscription.subscription_id)) + + extra_vars = { + "verb": "deactivate", + "config_object": "deactivate", + "dry_run": False, + "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} " f"- Deploy config for # TODO", + "l3_core_services": [json.loads(json_dumps(layer3_service)) for layer3_service in layer3_services], + "l2_circuits": [json.loads(json_dumps(layer2_circuit)) for layer2_circuit in layer2_circuits], + } + + return { + "playbook_name": "gap_ansible/playbooks/iptrunks_migration.yaml", + "inventory": { + "all": { + "hosts": { + # TODO + } + } + }, + "extra_vars": extra_vars, + } + + +@inputstep("Verify pre-check results", assignee=Assignee.SYSTEM) +def inform_operator_traffic_check() -> FormGenerator: + """Wait for confirmation from an operator that the results from the pre-checks look OK. + + In case the results look OK, the workflow can continue. If the results don't look OK, the workflow can still be + aborted at this time, without the subscription going out of sync. Moodi will also not start, and the subscription + model has not been updated yet. Effectively, this prevents any changes inside the orchestrator from occurring. The + one thing that must be rolled back manually, is the deactivated configuration that sits on the source device. + """ + + class PreCheckPage(SubmitFormPage): + model_config = ConfigDict(title="Please confirm before continuing") + + info_label_1: Label = "Please verify that traffic has moved as expected." + info_label_2: Label = "If traffic is misbehaving, this is your last chance to abort this workflow cleanly." + + yield PreCheckPage + return {} + + +@workflow( + "Migrate an Edge Port", + initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator), + target=Target.MODIFY, +) +def migrate_edge_port() -> StepList: + """Migrate an Edge Port.""" + return ( + begin + >> store_process_subscription(Target.MODIFY) + >> lso_interaction(disable_old_config_dry) + >> lso_interaction(disable_old_config_real) + >> inform_operator_traffic_check + >> unsync + >> update_subscription_model + >> start_moodi() + # TODO: Add LAG de-allocation step for future Nokia-to-Nokia migration if needed. + >> reserve_interfaces_in_netbox + >> lso_interaction(create_edge_port_dry) + >> lso_interaction(create_edge_port_real) + >> confirm_continue_move_fiber + >> confirm_graphs_looks_good_in_moodi + >> resync + >> migrate_l3_core_services_to_new_node + >> confirm_l3_core_service_migrations_are_complete + >> confirm_graphs_looks_good_in_moodi + >> migrate_l2_circuits_to_new_node + >> confirm_l2_circuit_migrations_are_complete + >> confirm_graphs_looks_good_in_moodi + >> stop_moodi() + >> done + ) diff --git a/gso/workflows/l2_circuit/migrate_layer2_circuit.py b/gso/workflows/l2_circuit/migrate_layer2_circuit.py new file mode 100644 index 0000000000000000000000000000000000000000..e7d4d9b294cce4bd0ab5d8b2b0926fa28567a967 --- /dev/null +++ b/gso/workflows/l2_circuit/migrate_layer2_circuit.py @@ -0,0 +1,20 @@ +"""This workflow migrates an L2 Core Service to a new Edge Port. + +It can be triggered by an operator or automatically by the system during Edge Port migration which is a separate +workflow. + +System-triggered migration: +When the system migrates an Edge Port, it runs the workflow automatically. The source and destination Edge Ports are +set to the same values. Then here migration only applies the configuration to the router and fill the drift between +core DB as source of truth and the actual network since the intent of network has changed in the previous workflow +even though the L2 Circuit Service is not changed. + +Operator-triggered migration: +When an operator initiates the workflow, they are required to specify both the source and destination EdgePorts. +During the migration process, the system updates the related edge_port reference to replace the source +EdgePort with the destination EdgePort and applies the necessary configuration changes to the router. + +Important Note: +Since an L2 Circuit Service has multiple side, the workflow must be run separately for each side to fully +migrate the service. +""" diff --git a/gso/workflows/l3_core_service/migrate_l3_core_service.py b/gso/workflows/l3_core_service/migrate_l3_core_service.py index e10675aae1191cb0b8932d4f5f579ca3564348bd..704190fbe9c761fa80afa0ef1a266fbdd58282af 100644 --- a/gso/workflows/l3_core_service/migrate_l3_core_service.py +++ b/gso/workflows/l3_core_service/migrate_l3_core_service.py @@ -17,7 +17,7 @@ from orchestrator.forms import FormPage, SubmitFormPage from orchestrator.targets import Target from orchestrator.utils.errors import ProcessFailureError from orchestrator.utils.json import json_dumps -from orchestrator.workflow import StepList, begin, done, inputstep, step +from orchestrator.workflow import StepList, begin, conditional, done, inputstep, step from orchestrator.workflows.steps import resync, store_process_subscription, unsync from orchestrator.workflows.utils import wrap_modify_initial_input_form from pydantic import ConfigDict, Field @@ -29,7 +29,7 @@ from gso.products.product_types.l3_core_service import L3CoreService from gso.services.lso_client import LSOState, lso_interaction from gso.services.subscriptions import get_active_edge_port_subscriptions from gso.utils.types.tt_number import TTNumber -from gso.utils.workflow_steps import start_moodi, stop_moodi +from gso.utils.workflow_steps import IS_HUMAN_INITIATED_WF_KEY, SKIP_MOODI_KEY, start_moodi, stop_moodi from gso.workflows.shared import create_summary_form @@ -53,6 +53,8 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: tt_number: TTNumber divider: Divider = Field(None, exclude=True) + skip_moodi: bool = False + is_human_initiated_wf: bool = True source_edge_port: source_edge_port_selector | str # type: ignore[valid-type] source_ep_user_input = yield L3CoreServiceSourceEdgePortSelectionForm @@ -77,21 +79,27 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: destination_ep_user_input = yield L3CoreServiceEdgePortSelectionForm - summary_input = { - "source_edge_port": EdgePort.from_subscription(source_ep_user_input.source_edge_port).description, - "destination_edge_port": EdgePort.from_subscription( - destination_ep_user_input.destination_edge_port - ).description, - } - yield from create_summary_form(summary_input, subscription.l3_core_service_type, list(summary_input.keys())) - user_input = ( + if source_ep_user_input.is_human_initiated_wf: + summary_input = { + "source_edge_port": EdgePort.from_subscription(source_ep_user_input.source_edge_port).description, + "destination_edge_port": EdgePort.from_subscription( + destination_ep_user_input.destination_edge_port + ).description, + } + yield from create_summary_form( + summary_input, subscription.l3_core_service_type.value, list(summary_input.keys()) + ) + + return ( {"subscription_id": subscription_id, "subscription": subscription} | source_ep_user_input.model_dump() | destination_ep_user_input.model_dump() + | { + IS_HUMAN_INITIATED_WF_KEY: source_ep_user_input.is_human_initiated_wf, + SKIP_MOODI_KEY: source_ep_user_input.skip_moodi, + } ) - return user_input - @step("Show BGP neighbors") def show_bgp_neighbors( @@ -320,15 +328,17 @@ def update_subscription_model( ) def migrate_l3_core_service() -> StepList: """Migrate a L3 Core Service to a destination Edge Port.""" + is_human_initiated_wf = conditional(lambda state: bool(state.get(IS_HUMAN_INITIATED_WF_KEY))) + return ( begin >> store_process_subscription(Target.MODIFY) - >> lso_interaction(show_bgp_neighbors) # TODO: send OTRS email with pre-check results - >> lso_interaction(deactivate_bgp_dry) - >> lso_interaction(deactivate_bgp_real) - >> lso_interaction(deactivate_sbp_dry) - >> lso_interaction(deactivate_sbp_real) - >> inform_operator_traffic_check + >> is_human_initiated_wf(lso_interaction(show_bgp_neighbors)) # TODO: send OTRS email with pre-check results + >> is_human_initiated_wf(lso_interaction(deactivate_bgp_dry)) + >> is_human_initiated_wf(lso_interaction(deactivate_bgp_real)) + >> is_human_initiated_wf(lso_interaction(deactivate_sbp_dry)) + >> is_human_initiated_wf(lso_interaction(deactivate_sbp_real)) + >> is_human_initiated_wf(inform_operator_traffic_check) >> unsync >> start_moodi() # TODO: include results from first LSO run >> generate_scoped_subscription_model diff --git a/test/workflows/__init__.py b/test/workflows/__init__.py index d01dde086f1d5f2d3d95d2b63c4540547c94b412..4edb847047b22ed4e4201836054a25606cb0f8cc 100644 --- a/test/workflows/__init__.py +++ b/test/workflows/__init__.py @@ -272,6 +272,16 @@ def resume_workflow( return result, step_log +def resume_suspended_workflow( + result, + process: ProcessStat, + step_log: list[tuple[Step, Process]], + input_data: State | list[State], +) -> tuple[Process, list]: + assert_suspended(result) + return resume_workflow(process, step_log, input_data) + + def assert_lso_success(result: Process, process_stat: ProcessStat, step_log: list): """Assert a successful LSO execution in a workflow.""" assert_awaiting_callback(result) @@ -304,3 +314,9 @@ def assert_lso_interaction_failure(result: Process, process_stat: ProcessStat, s assert_failed(result) return result, step_log + + +def assert_stop_moodi(result: Process, process_stat: ProcessStat, step_log: list): + """Assert a successful LSO execution in a workflow.""" + result, step_log = resume_suspended_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM) + return assert_lso_success(result, process_stat, step_log) diff --git a/test/workflows/edge_port/test_create_edge_port.py b/test/workflows/edge_port/test_create_edge_port.py index 4a4e9770df9c8919f85a696df18f226dd8ef7d64..145dbbc092b4ecce7d42a0dadd68e2b128bf3264 100644 --- a/test/workflows/edge_port/test_create_edge_port.py +++ b/test/workflows/edge_port/test_create_edge_port.py @@ -14,6 +14,7 @@ from test.services.conftest import MockedNetboxClient from test.workflows import ( assert_complete, assert_lso_interaction_success, + assert_stop_moodi, extract_state, run_workflow, ) @@ -91,9 +92,11 @@ def test_successful_edge_port_creation( initial_data = [{"product": product_id}, *input_form_wizard_data] result, process_stat, step_log = run_workflow("create_edge_port", initial_data) - for _ in range(2): + for _ in range(3): result, step_log = assert_lso_interaction_success(result, process_stat, step_log) + result, step_log = assert_stop_moodi(result, process_stat, step_log) + assert_complete(result) state = extract_state(result) @@ -105,7 +108,7 @@ def test_successful_edge_port_creation( assert subscription.edge_port.ga_id == "GA-12345" assert subscription.description == f"Edge Port lag-21 on {router_fqdn}, GAAR, {subscription.edge_port.ga_id}" assert len(subscription.edge_port.edge_port_ae_members) == 2 - assert mock_execute_playbook.call_count == 2 + assert mock_execute_playbook.call_count == 4 @pytest.mark.workflow() @@ -123,9 +126,11 @@ def test_successful_edge_port_creation_with_auto_ga_id_creation( initial_data[1]["ga_id"] = None result, process_stat, step_log = run_workflow("create_edge_port", initial_data) - for _ in range(2): + for _ in range(3): result, step_log = assert_lso_interaction_success(result, process_stat, step_log) + result, step_log = assert_stop_moodi(result, process_stat, step_log) + assert_complete(result) state = extract_state(result) @@ -134,7 +139,7 @@ def test_successful_edge_port_creation_with_auto_ga_id_creation( assert subscription.status == "active" assert subscription.edge_port.ga_id.startswith("GA-5000") - assert mock_execute_playbook.call_count == 2 + assert mock_execute_playbook.call_count == 4 def test_edge_port_creation_with_invalid_input( diff --git a/test/workflows/edge_port/test_migrate_edge_port.py b/test/workflows/edge_port/test_migrate_edge_port.py new file mode 100644 index 0000000000000000000000000000000000000000..6c7c55d790bbbe6ef2b4a7c8743041c9c26389c4 --- /dev/null +++ b/test/workflows/edge_port/test_migrate_edge_port.py @@ -0,0 +1,130 @@ +from unittest.mock import patch + +import pytest + +from gso.products import Layer2CircuitServiceType +from gso.products.product_types.edge_port import EdgePort +from gso.products.product_types.l3_core_service import L3CoreServiceType +from gso.products.product_types.router import Router +from gso.utils.shared_enums import Vendor +from test import USER_CONFIRM_EMPTY_FORM +from test.services.conftest import MockedNetboxClient +from test.workflows import ( + assert_complete, + assert_lso_interaction_success, + assert_stop_moodi, + extract_state, + resume_suspended_workflow, + run_workflow, +) + + +@pytest.fixture() +def _netbox_client_mock(): + with ( + patch("gso.services.netbox_client.NetboxClient.get_device_by_name") as mock_get_device_by_name, + patch("gso.services.netbox_client.NetboxClient.get_available_interfaces") as mock_get_available_interfaces, + patch("gso.services.netbox_client.NetboxClient.get_available_services_lags") as mock_available_services_lags, + patch("gso.services.netbox_client.NetboxClient.create_interface") as mock_create_interface, + patch("gso.services.netbox_client.NetboxClient.attach_interface_to_lag") as mock_attach_interface_to_lag, + patch("gso.services.netbox_client.NetboxClient.reserve_interface") as mock_reserve_interface, + patch("gso.services.netbox_client.NetboxClient.allocate_interface") as mock_allocate_interface, + ): + mock_get_device_by_name.return_value = MockedNetboxClient().get_device_by_name() + mock_get_available_interfaces.return_value = MockedNetboxClient().get_available_interfaces() + mock_available_services_lags.return_value = MockedNetboxClient().get_available_services_lags() + mock_create_interface.return_value = MockedNetboxClient().create_interface() + mock_attach_interface_to_lag.return_value = MockedNetboxClient().attach_interface_to_lag() + mock_reserve_interface.return_value = MockedNetboxClient().reserve_interface() + mock_allocate_interface.return_value = MockedNetboxClient().allocate_interface() + + yield + + +@pytest.fixture() +def partner(partner_factory, faker): + return partner_factory(name="GAAR", email=faker.email()) + + +@pytest.fixture() +def input_form_wizard_data(request, router_subscription_factory, partner, faker): + create_edge_port_step = { + "tt_number": faker.tt_number(), + "partner_name": partner["name"], + "node": str(router_subscription_factory(vendor=Vendor.NOKIA).subscription_id), + } + create_edge_port_interface_step = { + "name": "lag-21", + "description": faker.sentence(), + "ae_members": [ + { + "interface_name": f"Interface{interface}", + "interface_description": faker.sentence(), + } + for interface in range(2) + ], + } + summary_view_step = {} + + return [ + create_edge_port_step, + create_edge_port_interface_step, + summary_view_step, + ] + + +@pytest.mark.workflow() +@patch("gso.tasks.start_process.start_process_task.apply_async") +@patch("gso.services.lso_client._send_request") +def test_successful_edge_port_migration( + mock_execute_playbook, + start_process_task_apply_async, + input_form_wizard_data, + faker, + _netbox_client_mock, # noqa: PT019 + test_client, + edge_port_subscription_factory, + partner, + l3_core_service_subscription_factory, + layer_2_circuit_subscription_factory, +): + edge_port = edge_port_subscription_factory(partner=partner) + for service_type in [service_type for service_type in L3CoreServiceType if not service_type.startswith("IMPORTED")]: + l3_core_service = l3_core_service_subscription_factory(partner=partner, l3_core_service_type=service_type) + l3_core_service.l3_core_service.ap_list[0].sbp.edge_port = edge_port.edge_port + l3_core_service.save() + + for service_type in [ + service_type for service_type in Layer2CircuitServiceType if not service_type.startswith("IMPORTED") + ]: + layer_2_circuit_subscription_factory( + partner=partner, layer_2_circuit_side_a_edgeport=edge_port, layer_2_circuit_service_type=service_type + ) + + initial_data = [{"subscription_id": str(edge_port.subscription_id)}, *input_form_wizard_data] + result, process_stat, step_log = run_workflow("migrate_edge_port", initial_data) + + # confirm inform_operator_traffic_check + result, step_log = resume_suspended_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM) + + # Dry and Real run for disabling config and creating a new edge port + for _ in range(3): + result, step_log = assert_lso_interaction_success(result, process_stat, step_log) + + # all the steps in the workflow that needs user confirmation + for _ in range(6): + result, step_log = resume_suspended_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM) + + result, step_log = assert_stop_moodi(result, process_stat, step_log) + + assert_complete(result) + state = extract_state(result) + subscription_id = state["subscription_id"] + subscription = EdgePort.from_subscription(subscription_id) + + assert subscription.status == "active" + router_fqdn = Router.from_subscription(input_form_wizard_data[0]["node"]).router.router_fqdn + assert subscription.edge_port.ga_id is not None + assert subscription.description == f"Edge Port lag-21 on {router_fqdn}, GAAR, {subscription.edge_port.ga_id}" + assert len(subscription.edge_port.edge_port_ae_members) == 2 + assert mock_execute_playbook.call_count == 4 diff --git a/test/workflows/iptrunk/test_create_iptrunk.py b/test/workflows/iptrunk/test_create_iptrunk.py index c9e5abdc65aaa1092cc62783171a2c498aaad1a2..7e875e1917e6b9ed6f2c805dabb0fcb132fad70c 100644 --- a/test/workflows/iptrunk/test_create_iptrunk.py +++ b/test/workflows/iptrunk/test_create_iptrunk.py @@ -16,9 +16,8 @@ from test.workflows import ( assert_failed, assert_lso_interaction_failure, assert_lso_interaction_success, - assert_suspended, extract_state, - resume_workflow, + resume_suspended_workflow, run_workflow, ) @@ -134,8 +133,7 @@ def test_successful_iptrunk_creation_with_standard_lso_result( for _ in range(6): result, step_log = assert_lso_interaction_success(result, process_stat, step_log) - assert_suspended(result) - result, step_log = resume_workflow(process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM) + result, step_log = resume_suspended_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM) assert_complete(result) @@ -227,8 +225,7 @@ def test_successful_iptrunk_creation_with_juniper_interface_names( for _ in range(6): result, step_log = assert_lso_interaction_success(result, process_stat, step_log) - assert_suspended(result) - result, step_log = resume_workflow(process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM) + result, step_log = resume_suspended_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM) assert_complete(result) assert mock_execute_playbook.call_count == 6 diff --git a/test/workflows/iptrunk/test_migrate_iptrunk.py b/test/workflows/iptrunk/test_migrate_iptrunk.py index ce7d256e6fdd7660713ff83a0251fec6826d0824..e3c3ba5f6b2c61e9836222e5bb1e8581783afc61 100644 --- a/test/workflows/iptrunk/test_migrate_iptrunk.py +++ b/test/workflows/iptrunk/test_migrate_iptrunk.py @@ -13,6 +13,7 @@ from test.workflows import ( assert_lso_interaction_success, assert_suspended, extract_state, + resume_suspended_workflow, resume_workflow, run_workflow, ) @@ -118,7 +119,7 @@ def interface_lists_are_equal(list1, list2): @patch("gso.services.netbox_client.NetboxClient.free_interface") @patch("gso.services.netbox_client.NetboxClient.delete_interface") @patch("gso.workflows.iptrunk.migrate_iptrunk.SharePointClient") -def test_migrate_iptrunk_success( # noqa: PLR0915 +def test_migrate_iptrunk_success( mock_sharepoint_client, mocked_delete_interface, mocked_free_interface, @@ -152,8 +153,7 @@ def test_migrate_iptrunk_success( # noqa: PLR0915 for _ in range(8): result, step_log = assert_lso_interaction_success(result, process_stat, step_log) - assert_suspended(result) - result, step_log = resume_workflow(process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM) + result, step_log = resume_suspended_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM) for _ in range(8): result, step_log = assert_lso_interaction_success(result, process_stat, step_log) @@ -165,8 +165,7 @@ def test_migrate_iptrunk_success( # noqa: PLR0915 result, step_log = assert_lso_interaction_success(result, process_stat, step_log) # Continue workflow after it has displayed a checklist URL. - assert_suspended(result) - result, step_log = resume_workflow(process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM) + result, step_log = resume_suspended_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM) assert_complete(result) diff --git a/test/workflows/l3_core_service/test_create_l3_core_service.py b/test/workflows/l3_core_service/test_create_l3_core_service.py index 4cbb754fa5b91ada23707ba21f88f3a30c0f0939..b3c43e59715fa9dafd916a94700a074afbc722f2 100644 --- a/test/workflows/l3_core_service/test_create_l3_core_service.py +++ b/test/workflows/l3_core_service/test_create_l3_core_service.py @@ -12,9 +12,9 @@ from test.services.conftest import MockedSharePointClient from test.workflows import ( assert_complete, assert_lso_interaction_success, - assert_suspended, + assert_stop_moodi, extract_state, - resume_workflow, + resume_suspended_workflow, run_workflow, ) @@ -85,20 +85,21 @@ def test_create_l3_core_service_success( "v6_bgp_peer": base_bgp_peer_input() | {"add_v6_multicast": faker.boolean(), "peer_address": faker.ipv6()}, }, ] - lso_interaction_count = 6 + lso_interaction_count = 7 result, process_stat, step_log = run_workflow("create_l3_core_service", form_input_data) for _ in range(lso_interaction_count): result, step_log = assert_lso_interaction_success(result, process_stat, step_log) - assert_suspended(result) - result, step_log = resume_workflow(process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM) + result, step_log = resume_suspended_workflow(result, process_stat, step_log, USER_CONFIRM_EMPTY_FORM) + + result, step_log = assert_stop_moodi(result, process_stat, step_log) assert_complete(result) state = extract_state(result) subscription = L3CoreService.from_subscription(state["subscription_id"]) - assert mock_lso_client.call_count == lso_interaction_count + assert mock_lso_client.call_count == lso_interaction_count + 1 assert subscription.status == SubscriptionLifecycle.ACTIVE assert len(subscription.l3_core_service.ap_list) == 1 assert ( diff --git a/test/workflows/l3_core_service/test_migrate_l3_core_service.py b/test/workflows/l3_core_service/test_migrate_l3_core_service.py index eef3f281e12ed329b5901ef303930d14c998d0ff..04a69df97d99cfb3ca4fac7d98bc59f63550687e 100644 --- a/test/workflows/l3_core_service/test_migrate_l3_core_service.py +++ b/test/workflows/l3_core_service/test_migrate_l3_core_service.py @@ -9,9 +9,9 @@ from test import USER_CONFIRM_EMPTY_FORM from test.workflows import ( assert_complete, assert_lso_interaction_success, - assert_suspended, + assert_stop_moodi, extract_state, - resume_workflow, + resume_suspended_workflow, run_workflow, ) @@ -52,16 +52,17 @@ def test_migrate_l3_core_service_success( for _ in range(5): result, step_log = assert_lso_interaction_success(result, process_stat, step_log) - assert_suspended(result) - result, step_log = resume_workflow(process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM) + result, step_log = resume_suspended_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM) - for _ in range(4): + for _ in range(5): result, step_log = assert_lso_interaction_success(result, process_stat, step_log) + result, step_log = assert_stop_moodi(result, process_stat, step_log) + assert_complete(result) state = extract_state(result) subscription = L3CoreService.from_subscription(state["subscription_id"]) - assert mock_execute_playbook.call_count == 9 + assert mock_execute_playbook.call_count == 11 assert subscription.insync assert len(subscription.l3_core_service.ap_list) == 1 assert str(subscription.l3_core_service.ap_list[0].sbp.edge_port.owner_subscription_id) == destination_edge_port @@ -113,8 +114,7 @@ def test_migrate_l3_core_service_scoped_emission( for _ in range(4): result, step_log = assert_lso_interaction_success(result, process_stat, step_log) - assert_suspended(result) - result, step_log = resume_workflow(process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM) + result, step_log = resume_suspended_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM) result, step_log = assert_lso_interaction_success(result, process_stat, step_log) @@ -130,13 +130,15 @@ def test_migrate_l3_core_service_scoped_emission( state["subscription"]["l3_core_service"] == state["__old_subscriptions__"][subscription_id]["l3_core_service"] ) # Subscription is unchanged for now - for _ in range(3): + for _ in range(4): result, step_log = assert_lso_interaction_success(result, process_stat, step_log) + result, step_log = assert_stop_moodi(result, process_stat, step_log) + assert_complete(result) state = extract_state(result) subscription = L3CoreService.from_subscription(state["subscription_id"]) - assert mock_execute_playbook.call_count == 9 + assert mock_execute_playbook.call_count == 11 assert subscription.insync assert len(subscription.l3_core_service.ap_list) == 5 assert str(subscription.l3_core_service.ap_list[3].sbp.edge_port.owner_subscription_id) == destination_edge_port diff --git a/test/workflows/router/test_create_router.py b/test/workflows/router/test_create_router.py index 249b268dbf48af606cf9068acad0ae505f17fdd3..67d317b1e7632111dc5c897ea2da08cd803e9bc9 100644 --- a/test/workflows/router/test_create_router.py +++ b/test/workflows/router/test_create_router.py @@ -17,6 +17,7 @@ from test.workflows import ( assert_lso_interaction_success, assert_suspended, extract_state, + resume_suspended_workflow, resume_workflow, run_workflow, ) @@ -102,8 +103,7 @@ def test_create_nokia_router_success( result, step_log = assert_lso_interaction_success(result, process_stat, step_log) - assert_suspended(result) - result, step_log = resume_workflow(process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM) + result, step_log = resume_suspended_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM) assert_complete(result) diff --git a/test/workflows/switch/test_create_switch.py b/test/workflows/switch/test_create_switch.py index 1209b48a16a1352e3b658fc51d0d0e6e92ac90e4..a6e02d83cbc2f816a48c9b494ae4d64cbaf41a99 100644 --- a/test/workflows/switch/test_create_switch.py +++ b/test/workflows/switch/test_create_switch.py @@ -13,6 +13,7 @@ from test.workflows import ( assert_lso_interaction_success, assert_suspended, extract_state, + resume_suspended_workflow, resume_workflow, run_workflow, ) @@ -54,8 +55,7 @@ def test_create_switch_success( result, step_log = assert_lso_interaction_success(result, process_stat, step_log) # Sharepoint list created - assert_suspended(result) - result, step_log = resume_workflow(process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM) + result, step_log = resume_suspended_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM) assert_complete(result)