Skip to content
Snippets Groups Projects
Commit 3dfe7933 authored by Mohammad Torkashvand's avatar Mohammad Torkashvand
Browse files

add bulk migration for an edge port

parent f82caa3c
No related branches found
No related tags found
2 merge requests!356Fix/nat 1009/fix redeploy base config if there is a vprn,!355add bulk migration for an edge port
Showing
with 698 additions and 26 deletions
...@@ -7,7 +7,6 @@ from orchestrator import OrchestratorCore, app_settings ...@@ -7,7 +7,6 @@ from orchestrator import OrchestratorCore, app_settings
from orchestrator.cli.main import app as cli_app from orchestrator.cli.main import app as cli_app
from orchestrator.graphql import SCALAR_OVERRIDES from orchestrator.graphql import SCALAR_OVERRIDES
from orchestrator.services.tasks import initialise_celery from orchestrator.services.tasks import initialise_celery
from orchestrator.settings import ExecutorType
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
import gso.products import gso.products
...@@ -37,18 +36,19 @@ def init_gso_app() -> OrchestratorCore: ...@@ -37,18 +36,19 @@ def init_gso_app() -> OrchestratorCore:
app.register_graphql(subscription_interface=custom_subscription_interface) app.register_graphql(subscription_interface=custom_subscription_interface)
app.include_router(api_router, prefix="/api") app.include_router(api_router, prefix="/api")
if app_settings.EXECUTOR == ExecutorType.WORKER: oss_params = load_oss_params()
config = load_oss_params() celery = Celery(
celery = Celery( "geant-service-orchestrator",
"geant-service-orchestrator", broker=oss_params.CELERY.broker_url,
broker=config.CELERY.broker_url, backend=oss_params.CELERY.result_backend,
backend=config.CELERY.result_backend, include=["orchestrator.services.tasks", "gso.tasks.start_process"],
include=["orchestrator.services.tasks"], )
) celery.conf.update(
celery.conf.update( result_expires=oss_params.CELERY.result_expires,
result_expires=config.CELERY.result_expires, task_always_eager=app_settings.TESTING,
) task_eager_propagates=app_settings.TESTING,
gso_initialise_celery(celery) )
gso_initialise_celery(celery)
return app return app
......
"""Add a migration workflow for an EdgePort.
Revision ID: efebcde91f2f
Revises: 8a65d0ed588e
Create Date: 2025-01-09 17:17:24.972289
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = 'efebcde91f2f'
down_revision = '16eef776a258'
branch_labels = None
depends_on = None
from orchestrator.migrations.helpers import create_workflow, delete_workflow
new_workflows = [
{
"name": "migrate_edge_port",
"target": "MODIFY",
"description": "Migrate an EdgePort",
"product_type": "EdgePort"
},
]
def upgrade() -> None:
conn = op.get_bind()
for workflow in new_workflows:
create_workflow(conn, workflow)
def downgrade() -> None:
conn = op.get_bind()
for workflow in new_workflows:
delete_workflow(conn, workflow["name"])
...@@ -133,6 +133,6 @@ ...@@ -133,6 +133,6 @@
}, },
"MOODI": { "MOODI": {
"host": "moodi.test.gap.geant.org", "host": "moodi.test.gap.geant.org",
"moodi_enabled": false "moodi_enabled": true
} }
} }
...@@ -17,12 +17,12 @@ from orchestrator.db import ( ...@@ -17,12 +17,12 @@ from orchestrator.db import (
db, db,
) )
from orchestrator.domain import SubscriptionModel from orchestrator.domain import SubscriptionModel
from orchestrator.services.subscriptions import query_in_use_by_subscriptions from orchestrator.services.subscriptions import query_depends_on_subscriptions, query_in_use_by_subscriptions
from orchestrator.types import SubscriptionLifecycle, UUIDstr from orchestrator.types import SubscriptionLifecycle, UUIDstr
from sqlalchemy import text from sqlalchemy import and_, text
from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.exc import SQLAlchemyError
from gso.products import ProductName, ProductType from gso.products import L2_CORE_SERVICE_PRODUCT_TYPE, L3_CORE_SERVICE_PRODUCT_TYPE, ProductName, ProductType
from gso.products.product_types.site import Site from gso.products.product_types.site import Site
SubscriptionType = dict[str, Any] SubscriptionType = dict[str, Any]
...@@ -185,6 +185,59 @@ def get_trunks_that_terminate_on_router( ...@@ -185,6 +185,59 @@ def get_trunks_that_terminate_on_router(
) )
def get_all_active_l3_core_services_on_top_of_edge_port(edge_port_id: UUIDstr) -> list[SubscriptionTable]:
"""Retrieve all active l3 core services that are on top of the given edge port.
Args:
edge_port_id: The ID of the edge port.
Returns:
A list of active services that are on top of the edge port.
"""
return (
query_in_use_by_subscriptions(UUID(edge_port_id))
.join(ProductTable)
.filter(
and_(
ProductTable.product_type.in_([L3_CORE_SERVICE_PRODUCT_TYPE]),
SubscriptionTable.status == SubscriptionLifecycle.ACTIVE,
)
)
.all()
)
def get_all_active_l2_core_services_on_top_of_edge_port(edge_port_id: UUIDstr) -> list[SubscriptionTable]:
"""Retrieve all active l2 core services that are on top of the given edge port."""
return (
query_in_use_by_subscriptions(UUID(edge_port_id))
.join(ProductTable)
.filter(
and_(
ProductTable.product_type.in_([L2_CORE_SERVICE_PRODUCT_TYPE]),
SubscriptionTable.status == SubscriptionLifecycle.ACTIVE,
)
)
.all()
)
def get_edge_port_subscription_of_l3_core_service(edge_port_subscription_id: UUIDstr) -> list[SubscriptionModel]:
"""Retrieve all active Edge Port subscriptions that are used by the given L3 Core Service."""
query = (
query_depends_on_subscriptions(
UUID(edge_port_subscription_id), filter_statuses=[SubscriptionLifecycle.ACTIVE.value]
)
.join(ProductTable)
.filter(
ProductTable.product_type.in_([ProductType.EDGE_PORT]),
SubscriptionTable.status == SubscriptionLifecycle.ACTIVE,
)
)
return [SubscriptionModel.from_subscription(subscription.subscription_id) for subscription in query.all()]
def get_product_id_by_name(product_name: ProductName) -> UUID: def get_product_id_by_name(product_name: ProductName) -> UUID:
"""Retrieve the UUID of a product by its name. """Retrieve the UUID of a product by its name.
......
"""celery tasks for gso."""
"""Celery task to start a process with the given workflow key and state."""
from celery import shared_task
@shared_task
def start_process_task(workflow_key: str, user_inputs: list[dict[str, str]]) -> None:
"""Start a process with the given workflow key and state."""
from orchestrator.services.processes import start_process # noqa: PLC0415
start_process(workflow_key, user_inputs=user_inputs)
...@@ -52,6 +52,7 @@ ...@@ -52,6 +52,7 @@
"create_site": "Create Site", "create_site": "Create Site",
"create_switch": "Create Switch", "create_switch": "Create Switch",
"create_edge_port": "Create Edge Port", "create_edge_port": "Create Edge Port",
"migrate_edge_port": "Migrate Edge Port",
"create_l3_core_service": "Create L3 Core Service", "create_l3_core_service": "Create L3 Core Service",
"create_lan_switch_interconnect": "Create LAN Switch Interconnect", "create_lan_switch_interconnect": "Create LAN Switch Interconnect",
"deploy_twamp": "Deploy TWAMP", "deploy_twamp": "Deploy TWAMP",
......
...@@ -22,6 +22,8 @@ from gso.settings import load_oss_params ...@@ -22,6 +22,8 @@ from gso.settings import load_oss_params
from gso.utils.helpers import generate_inventory_for_routers from gso.utils.helpers import generate_inventory_for_routers
from gso.utils.shared_enums import Vendor from gso.utils.shared_enums import Vendor
SKIP_MOODI_KEY = "__skip_moodi__"
def _deploy_base_config( def _deploy_base_config(
subscription: dict[str, Any], subscription: dict[str, Any],
...@@ -394,7 +396,7 @@ def prompt_sharepoint_checklist_url(checklist_url: str) -> FormGenerator: ...@@ -394,7 +396,7 @@ def prompt_sharepoint_checklist_url(checklist_url: str) -> FormGenerator:
return {} return {}
_is_moodi_enabled = conditional(lambda _: load_oss_params().MOODI.moodi_enabled) _is_moodi_enabled = conditional(lambda state: load_oss_params().MOODI.moodi_enabled and not state.get(SKIP_MOODI_KEY))
def start_moodi() -> StepList: def start_moodi() -> StepList:
......
...@@ -85,6 +85,7 @@ celery = OrchestratorWorker( ...@@ -85,6 +85,7 @@ celery = OrchestratorWorker(
"gso.schedules.send_email_notifications", "gso.schedules.send_email_notifications",
"gso.schedules.clean_old_tasks", "gso.schedules.clean_old_tasks",
"orchestrator.services.tasks", "orchestrator.services.tasks",
"gso.tasks.start_process",
], ],
) )
......
...@@ -116,6 +116,7 @@ LazyWorkflowInstance("gso.workflows.edge_port.terminate_edge_port", "terminate_e ...@@ -116,6 +116,7 @@ LazyWorkflowInstance("gso.workflows.edge_port.terminate_edge_port", "terminate_e
LazyWorkflowInstance("gso.workflows.edge_port.validate_edge_port", "validate_edge_port") LazyWorkflowInstance("gso.workflows.edge_port.validate_edge_port", "validate_edge_port")
LazyWorkflowInstance("gso.workflows.edge_port.create_imported_edge_port", "create_imported_edge_port") LazyWorkflowInstance("gso.workflows.edge_port.create_imported_edge_port", "create_imported_edge_port")
LazyWorkflowInstance("gso.workflows.edge_port.import_edge_port", "import_edge_port") LazyWorkflowInstance("gso.workflows.edge_port.import_edge_port", "import_edge_port")
LazyWorkflowInstance("gso.workflows.edge_port.migrate_edge_port", "migrate_edge_port")
# L3 Core Service workflows # L3 Core Service workflows
LazyWorkflowInstance("gso.workflows.l3_core_service.create_l3_core_service", "create_l3_core_service") LazyWorkflowInstance("gso.workflows.l3_core_service.create_l3_core_service", "create_l3_core_service")
......
"""A modification workflow that migrates an EdgePort to a different endpoint."""
import random
from typing import Annotated, Any
from uuid import uuid4
from annotated_types import Len
from orchestrator import step, workflow
from orchestrator.config.assignee import Assignee
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, UUIDstr
from orchestrator.utils.errors import ProcessFailureError
from orchestrator.workflow import StepList, begin, done, inputstep
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic import AfterValidator, ConfigDict
from pydantic_forms.validators import Label, ReadOnlyField, validate_unique_list
from pynetbox.models.dcim import Interfaces
from gso.products.product_blocks.edge_port import EdgePortAEMemberBlock
from gso.products.product_types.edge_port import EdgePort
from gso.products.product_types.router import Router
from gso.services.lso_client import LSOState, lso_interaction
from gso.services.netbox_client import NetboxClient
from gso.services.partners import get_partner_by_id
from gso.services.subscriptions import (
get_all_active_l2_core_services_on_top_of_edge_port,
get_all_active_l3_core_services_on_top_of_edge_port,
)
from gso.tasks.start_process import start_process_task
from gso.utils.helpers import (
active_pe_router_selector,
available_interfaces_choices,
available_service_lags_choices,
)
from gso.utils.types.interfaces import LAGMember
from gso.utils.types.tt_number import TTNumber
from gso.utils.workflow_steps import start_moodi, stop_moodi
from gso.workflows.shared import create_summary_form
def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
"""Gather input from the operator on the new router that the EdgePort should connect to."""
subscription = EdgePort.from_subscription(subscription_id)
form_title = f"Migrating {subscription.edge_port.edge_port_description} "
class MigrateEdgePortForm(FormPage):
model_config = ConfigDict(title=form_title)
tt_number: TTNumber
partner_name: ReadOnlyField(get_partner_by_id(subscription.customer_id).name, default_type=str) # type: ignore[valid-type]
node: active_pe_router_selector(excludes=[subscription.edge_port.node.subscription.subscription_id]) # type: ignore[valid-type]
initial_user_input = yield MigrateEdgePortForm
class EdgePortLAGMember(LAGMember):
interface_name: available_interfaces_choices( # type: ignore[valid-type]
router_id=initial_user_input.node, speed=subscription.edge_port.member_speed
)
lag_ae_members = Annotated[
list[EdgePortLAGMember],
AfterValidator(validate_unique_list),
Len(
min_length=len(subscription.edge_port.edge_port_ae_members),
max_length=len(subscription.edge_port.edge_port_ae_members),
),
]
class SelectInterfaceForm(FormPage):
model_config = ConfigDict(title="Select Interfaces")
name: available_service_lags_choices(router_id=initial_user_input.node) # type: ignore[valid-type]
description: str | None = None
ae_members: lag_ae_members
interface_form_input_data = yield SelectInterfaceForm
input_forms_data = initial_user_input.model_dump() | interface_form_input_data.model_dump()
summary_form_data = input_forms_data | {
"node": Router.from_subscription(initial_user_input.node).router.router_fqdn,
"partner_name": initial_user_input.partner_name,
"edge_port_name": input_forms_data["name"],
"edge_port_description": input_forms_data["description"],
"edge_port_ae_members": input_forms_data["ae_members"],
}
summary_fields = [
"node",
"partner_name",
"edge_port_name",
"edge_port_description",
"edge_port_ae_members",
]
yield from create_summary_form(summary_form_data, subscription.product.name, summary_fields)
return input_forms_data
@step("Update the EdgePort references")
def update_subscription_model(
subscription: EdgePort,
node: UUIDstr,
name: str,
partner_name: str,
ae_members: list[dict[str, Any]],
description: str | None = None,
) -> State:
"""Update the EdgePort subscription object in the service database with the new values."""
router = Router.from_subscription(node).router
subscription.edge_port.node = router
subscription.edge_port.edge_port_name = name
subscription.description = (
f"Edge Port {name} on {router.router_fqdn}, {partner_name}, {subscription.edge_port.ga_id or ""}"
)
subscription.edge_port.edge_port_description = description
edge_port_ae_members = [EdgePortAEMemberBlock.new(subscription_id=uuid4(), **member) for member in ae_members]
subscription.edge_port.edge_port_ae_members = (
edge_port_ae_members # TODO: Clear the existing AE members and inactivate them and remove them from netbox?
)
return {"subscription": subscription, "subscription_id": subscription.subscription_id}
@step("Reserve interfaces in NetBox")
def reserve_interfaces_in_netbox(subscription: EdgePort) -> State:
"""Create the LAG interfaces in NetBox and attach the LAG interfaces to the physical interfaces."""
nbclient = NetboxClient()
edge_port = subscription.edge_port
# Create LAG interfaces
lag_interface: Interfaces = nbclient.create_interface(
iface_name=edge_port.edge_port_name,
interface_type="lag",
device_name=edge_port.node.router_fqdn,
description=str(subscription.subscription_id),
enabled=True,
)
# Attach physical interfaces to LAG
# Update interface description to subscription ID
# Reserve interfaces
for interface in edge_port.edge_port_ae_members:
nbclient.attach_interface_to_lag(
device_name=edge_port.node.router_fqdn,
lag_name=lag_interface.name,
iface_name=interface.interface_name,
description=str(subscription.subscription_id),
)
nbclient.reserve_interface(
device_name=edge_port.node.router_fqdn,
iface_name=interface.interface_name,
)
return {
"subscription": subscription,
}
@step("[DRY RUN] Create edge port")
def create_edge_port_dry(
subscription: dict[str, Any], tt_number: str, process_id: UUIDstr, partner_name: str
) -> LSOState:
"""Create a new edge port in the network as a dry run."""
extra_vars = {
"dry_run": True,
"subscription": subscription,
"partner_name": partner_name,
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Create Edge Port",
"verb": "create",
}
return {
"playbook_name": "gap_ansible/playbooks/edge_port.yaml",
"inventory": {"all": {"hosts": {subscription["edge_port"]["node"]["router_fqdn"]: None}}},
"extra_vars": extra_vars,
}
@step("[FOR REAL] Create edge port")
def create_edge_port_real(
subscription: dict[str, Any], tt_number: str, process_id: UUIDstr, partner_name: str
) -> LSOState:
"""Create a new edge port in the network for real."""
extra_vars = {
"dry_run": False,
"subscription": subscription,
"partner_name": partner_name,
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Create Edge Port",
"verb": "create",
}
return {
"playbook_name": "gap_ansible/playbooks/edge_port.yaml",
"inventory": {"all": {"hosts": {subscription["edge_port"]["node"]["router_fqdn"]: None}}},
"extra_vars": extra_vars,
}
@step("Allocate interfaces in NetBox")
def allocate_interfaces_in_netbox(subscription: EdgePort) -> None:
"""Allocate the interfaces in NetBox."""
for interface in subscription.edge_port.edge_port_ae_members:
fqdn = subscription.edge_port.node.router_fqdn
iface_name = interface.interface_name
if not fqdn or not iface_name:
msg = "FQDN and/or interface name missing in subscription"
raise ProcessFailureError(msg, details=subscription.subscription_id)
NetboxClient().allocate_interface(device_name=fqdn, iface_name=iface_name)
@inputstep("Wait for confirmation", assignee=Assignee.SYSTEM)
def confirm_continue_move_fiber() -> FormGenerator:
"""Wait for confirmation from an operator that the physical fiber has been moved."""
class ProvisioningResultPage(FormPage):
model_config = ConfigDict(title="Please confirm before continuing")
info_label: Label = "New EdgePort has been deployed, wait for the physical connection to be moved."
yield ProvisioningResultPage
return {}
@inputstep("Wait for confirmation", assignee=Assignee.SYSTEM)
def confirm_things_looks_good_in_moodi() -> FormGenerator:
"""Wait for confirmation from an operator that the new Migration looks good so far."""
class ProvisioningResultPage(FormPage):
model_config = ConfigDict(title="Please confirm before continuing")
info_label: Label = "Do you confirm that everything looks good in the Moodi before continuing?"
yield ProvisioningResultPage
return {}
@step("Migrate L3 core services to new node")
def migrate_l3_core_services_to_new_node(subscription_id: UUIDstr, tt_number: TTNumber) -> State:
"""Migrate all L3 core services from the old EdgePort to the new EdgePort.
This sub migrations do not modify the L3 core services.
The source and destination EdgePort remain the same for each service.
The migration playbook is executed once for each service to apply the configuration on the new node and as a result,
the service bindings port and bgp sessions related to this edge port of each service will be moved to the new node.
"""
l3_core_services = get_all_active_l3_core_services_on_top_of_edge_port(subscription_id)
edge_port = EdgePort.from_subscription(subscription_id)
for l3_core_service in l3_core_services:
start_process_task.apply_async( # type: ignore[attr-defined]
args=[
"migrate_l3_core_service",
[
{"subscription_id": str(l3_core_service.subscription_id)},
{
"tt_number": tt_number,
"skip_moodi": True,
"source_edge_port": str(edge_port.subscription_id),
"destination_edge_port": str(edge_port.subscription_id),
},
],
],
countdown=random.choice([2, 3, 4, 5]), # noqa: S311
)
return {"l3_core_services": l3_core_services}
@step("Migrate L2 circuits to new node")
def migrate_l2_circuits_to_new_node(subscription_id: UUIDstr, tt_number: TTNumber) -> State:
"""Migrate Layer2 circuits from the old EdgePort to the new EdgePort."""
layer2_circuits = get_all_active_l2_core_services_on_top_of_edge_port(subscription_id)
edge_port = EdgePort.from_subscription(subscription_id)
for l2_core_service in layer2_circuits:
start_process_task.apply_async( # type: ignore[attr-defined]
args=[
"migrate_layer_2_circuit",
[
{"subscription_id": str(l2_core_service.subscription_id)},
{
"tt_number": tt_number,
"skip_moodi": True,
"source_edge_port": str(edge_port.subscription_id),
"destination_edge_port": str(edge_port.subscription_id),
},
],
],
countdown=random.choice([2, 3, 4, 5]), # noqa: S311
)
return {"layer2_circuits": layer2_circuits}
@step("[DRY RUN] Disable configuration on old router")
def disable_old_config_dry(
# subscription: EdgePort, TODO
process_id: UUIDstr,
tt_number: str,
) -> LSOState:
"""Perform a dry run of disabling the old configuration on the routers."""
extra_vars = {
"verb": "deactivate",
"config_object": "deactivate",
"dry_run": True,
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} " f"- Deploy config for #TODO",
}
return {
"playbook_name": "gap_ansible/playbooks/iptrunks_migration.yaml",
"inventory": {
"all": {
"hosts": {
# TODO
}
}
},
"extra_vars": extra_vars,
}
@step("[FOR REAL] Disable configuration on old router")
def disable_old_config_real(
# subscription: EdgePort, TODO
process_id: UUIDstr,
tt_number: str,
) -> LSOState:
"""Disable old configuration on the routers."""
extra_vars = {
"verb": "deactivate",
"config_object": "deactivate",
"dry_run": False,
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} " f"- Deploy config for # TODO",
}
return {
"playbook_name": "gap_ansible/playbooks/iptrunks_migration.yaml",
"inventory": {
"all": {
"hosts": {
# TODO
}
}
},
"extra_vars": extra_vars,
}
@workflow(
"Migrate an EdgePort",
initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator),
target=Target.MODIFY,
)
def migrate_edge_port() -> StepList:
"""Migrate an EdgePort."""
return (
begin
>> store_process_subscription(Target.MODIFY)
>> unsync
>> lso_interaction(disable_old_config_dry)
>> lso_interaction(disable_old_config_real)
>> update_subscription_model
>> start_moodi()
# TODO: Neda mentioned if in the future we have nokia-to-nokia migration, then we need another step to
# dealocate the lag-- but this is not needed for now right?
>> reserve_interfaces_in_netbox
>> lso_interaction(create_edge_port_dry)
>> lso_interaction(create_edge_port_real)
>> confirm_continue_move_fiber
>> confirm_things_looks_good_in_moodi
>> resync
# TODO: Explain this step that at this point related subscriptions are unsynced and prevent
# any changes on l3/l2 services
>> migrate_l3_core_services_to_new_node
>> confirm_things_looks_good_in_moodi
>> migrate_l2_circuits_to_new_node
>> confirm_things_looks_good_in_moodi
>> stop_moodi()
>> done
)
"""This workflow migrates an L2 Core Service to a new Edge Port.
It can be triggered by an operator or automatically by the system during Edge Port migration which is a separate
workflow.
System-triggered migration:
When the system migrates an Edge Port, it runs the workflow automatically. The source and destination Edge Ports are
set to the same values. Then here migration only applies the configuration to the router and fill the drift between
core DB as source of truth and the actual network since the intent of network has changed in the previous workflow
even though the L2 Circuit Service is not changed.
Operator-triggered migration:
When an operator initiates the workflow, they are required to specify both the source and destination EdgePorts.
During the migration process, the system updates the related access_point.sbp.edge_port reference to replace the source
EdgePort with the destination EdgePort and applies the necessary configuration changes to the router.
Important Note:
Since an L2 Circuit Service has multiple side, the workflow must be run separately for each side to fully
migrate the service.
"""
...@@ -115,3 +115,6 @@ def modify_layer_2_circuit() -> StepList: ...@@ -115,3 +115,6 @@ def modify_layer_2_circuit() -> StepList:
>> resync >> resync
>> done >> done
) )
# TODO: where is ansible playbook execution steps to reflect this changes on the network?
...@@ -201,14 +201,16 @@ def initialize_subscription( ...@@ -201,14 +201,16 @@ def initialize_subscription(
gs_id=sbp_gs_id, gs_id=sbp_gs_id,
) )
subscription.l3_core_service.ap_list.append( subscription.l3_core_service.ap_list.append(
AccessPortInactive.new( AccessPortInactive.new( # TODO: Ask Simone why we don't active them somewhere?
subscription_id=uuid4(), subscription_id=uuid4(),
ap_type=edge_port["ap_type"], ap_type=edge_port["ap_type"],
sbp=service_binding_port, sbp=service_binding_port,
custom_service_name=edge_port.get("custom_service_name"), custom_service_name=edge_port.get("custom_service_name"),
) )
) )
edge_port_fqdn_list.append(edge_port_subscription.edge_port.node.router_fqdn) edge_port_fqdn_list.append(
edge_port_subscription.edge_port.node.router_fqdn
) # TODO: why this is only one EP? what if we have multiple EP for an L3 Core Service
partner_name = get_partner_by_id(subscription.customer_id).name partner_name = get_partner_by_id(subscription.customer_id).name
subscription.description = f"{product_name} service for {partner_name}" subscription.description = f"{product_name} service for {partner_name}"
......
...@@ -272,6 +272,16 @@ def resume_workflow( ...@@ -272,6 +272,16 @@ def resume_workflow(
return result, step_log return result, step_log
def resume_suspend_workflow(
result,
process: ProcessStat,
step_log: list[tuple[Step, Process]],
input_data: State | list[State],
) -> tuple[Process, list]:
assert_suspended(result)
return resume_workflow(process, step_log, input_data)
def assert_lso_success(result: Process, process_stat: ProcessStat, step_log: list): def assert_lso_success(result: Process, process_stat: ProcessStat, step_log: list):
"""Assert a successful LSO execution in a workflow.""" """Assert a successful LSO execution in a workflow."""
assert_awaiting_callback(result) assert_awaiting_callback(result)
...@@ -304,3 +314,10 @@ def assert_lso_interaction_failure(result: Process, process_stat: ProcessStat, s ...@@ -304,3 +314,10 @@ def assert_lso_interaction_failure(result: Process, process_stat: ProcessStat, s
assert_failed(result) assert_failed(result)
return result, step_log return result, step_log
def assert_stop_moodi(result: Process, process_stat: ProcessStat, step_log: list):
"""Assert a successful LSO execution in a workflow."""
assert_suspended(result)
result, step_log = resume_workflow(process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
return assert_lso_success(result, process_stat, step_log)
...@@ -14,6 +14,7 @@ from test.services.conftest import MockedNetboxClient ...@@ -14,6 +14,7 @@ from test.services.conftest import MockedNetboxClient
from test.workflows import ( from test.workflows import (
assert_complete, assert_complete,
assert_lso_interaction_success, assert_lso_interaction_success,
assert_stop_moodi,
extract_state, extract_state,
run_workflow, run_workflow,
) )
...@@ -91,9 +92,11 @@ def test_successful_edge_port_creation( ...@@ -91,9 +92,11 @@ def test_successful_edge_port_creation(
initial_data = [{"product": product_id}, *input_form_wizard_data] initial_data = [{"product": product_id}, *input_form_wizard_data]
result, process_stat, step_log = run_workflow("create_edge_port", initial_data) result, process_stat, step_log = run_workflow("create_edge_port", initial_data)
for _ in range(2): for _ in range(3):
result, step_log = assert_lso_interaction_success(result, process_stat, step_log) result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
result, step_log = assert_stop_moodi(result, process_stat, step_log)
assert_complete(result) assert_complete(result)
state = extract_state(result) state = extract_state(result)
...@@ -105,7 +108,7 @@ def test_successful_edge_port_creation( ...@@ -105,7 +108,7 @@ def test_successful_edge_port_creation(
assert subscription.edge_port.ga_id == "GA-12345" assert subscription.edge_port.ga_id == "GA-12345"
assert subscription.description == f"Edge Port lag-21 on {router_fqdn}, GAAR, {subscription.edge_port.ga_id}" assert subscription.description == f"Edge Port lag-21 on {router_fqdn}, GAAR, {subscription.edge_port.ga_id}"
assert len(subscription.edge_port.edge_port_ae_members) == 2 assert len(subscription.edge_port.edge_port_ae_members) == 2
assert mock_execute_playbook.call_count == 2 assert mock_execute_playbook.call_count == 4
@pytest.mark.workflow() @pytest.mark.workflow()
...@@ -123,9 +126,11 @@ def test_successful_edge_port_creation_with_auto_ga_id_creation( ...@@ -123,9 +126,11 @@ def test_successful_edge_port_creation_with_auto_ga_id_creation(
initial_data[1]["ga_id"] = None initial_data[1]["ga_id"] = None
result, process_stat, step_log = run_workflow("create_edge_port", initial_data) result, process_stat, step_log = run_workflow("create_edge_port", initial_data)
for _ in range(2): for _ in range(3):
result, step_log = assert_lso_interaction_success(result, process_stat, step_log) result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
result, step_log = assert_stop_moodi(result, process_stat, step_log)
assert_complete(result) assert_complete(result)
state = extract_state(result) state = extract_state(result)
...@@ -134,7 +139,7 @@ def test_successful_edge_port_creation_with_auto_ga_id_creation( ...@@ -134,7 +139,7 @@ def test_successful_edge_port_creation_with_auto_ga_id_creation(
assert subscription.status == "active" assert subscription.status == "active"
assert subscription.edge_port.ga_id.startswith("GA-5000") assert subscription.edge_port.ga_id.startswith("GA-5000")
assert mock_execute_playbook.call_count == 2 assert mock_execute_playbook.call_count == 4
def test_edge_port_creation_with_invalid_input( def test_edge_port_creation_with_invalid_input(
......
from unittest.mock import patch
import pytest
from gso.products import Layer2CircuitServiceType
from gso.products.product_types.edge_port import EdgePort
from gso.products.product_types.l3_core_service import L3CoreServiceType
from gso.products.product_types.router import Router
from gso.utils.shared_enums import Vendor
from test import USER_CONFIRM_EMPTY_FORM
from test.services.conftest import MockedNetboxClient
from test.workflows import (
assert_complete,
assert_lso_interaction_success,
assert_stop_moodi,
extract_state,
resume_suspend_workflow,
run_workflow,
)
@pytest.fixture()
def _netbox_client_mock():
with (
patch("gso.services.netbox_client.NetboxClient.get_device_by_name") as mock_get_device_by_name,
patch("gso.services.netbox_client.NetboxClient.get_available_interfaces") as mock_get_available_interfaces,
patch("gso.services.netbox_client.NetboxClient.get_available_services_lags") as mock_available_services_lags,
patch("gso.services.netbox_client.NetboxClient.create_interface") as mock_create_interface,
patch("gso.services.netbox_client.NetboxClient.attach_interface_to_lag") as mock_attach_interface_to_lag,
patch("gso.services.netbox_client.NetboxClient.reserve_interface") as mock_reserve_interface,
patch("gso.services.netbox_client.NetboxClient.allocate_interface") as mock_allocate_interface,
):
mock_get_device_by_name.return_value = MockedNetboxClient().get_device_by_name()
mock_get_available_interfaces.return_value = MockedNetboxClient().get_available_interfaces()
mock_available_services_lags.return_value = MockedNetboxClient().get_available_services_lags()
mock_create_interface.return_value = MockedNetboxClient().create_interface()
mock_attach_interface_to_lag.return_value = MockedNetboxClient().attach_interface_to_lag()
mock_reserve_interface.return_value = MockedNetboxClient().reserve_interface()
mock_allocate_interface.return_value = MockedNetboxClient().allocate_interface()
yield
@pytest.fixture()
def partner(partner_factory, faker):
return partner_factory(name="GAAR", email=faker.email())
@pytest.fixture()
def input_form_wizard_data(request, router_subscription_factory, partner, faker):
create_edge_port_step = {
"tt_number": faker.tt_number(),
"partner_name": partner["name"],
"node": str(router_subscription_factory(vendor=Vendor.NOKIA).subscription_id),
}
create_edge_port_interface_step = {
"name": "lag-21",
"description": faker.sentence(),
"ae_members": [
{
"interface_name": f"Interface{interface}",
"interface_description": faker.sentence(),
}
for interface in range(2)
],
}
summary_view_step = {}
return [
create_edge_port_step,
create_edge_port_interface_step,
summary_view_step,
]
@pytest.mark.workflow()
@patch("gso.tasks.start_process.start_process_task.apply_async")
@patch("gso.services.lso_client._send_request")
def test_successful_edge_port_migration(
mock_execute_playbook,
start_process_task_apply_async,
input_form_wizard_data,
faker,
_netbox_client_mock, # noqa: PT019
test_client,
edge_port_subscription_factory,
partner,
l3_core_service_subscription_factory,
layer_2_circuit_subscription_factory,
):
edge_port = edge_port_subscription_factory(partner=partner)
for service_type in [service_type for service_type in L3CoreServiceType if not service_type.startswith("IMPORTED")]:
l3_core_service = l3_core_service_subscription_factory(partner=partner, l3_core_service_type=service_type)
l3_core_service.l3_core_service.ap_list[0].sbp.edge_port = edge_port.edge_port
l3_core_service.save()
for service_type in [
service_type for service_type in Layer2CircuitServiceType if not service_type.startswith("IMPORTED")
]:
layer_2_circuit_subscription_factory(
partner=partner, layer_2_circuit_side_a_edgeport=edge_port, layer_2_circuit_service_type=service_type
)
initial_data = [{"subscription_id": str(edge_port.subscription_id)}, *input_form_wizard_data]
result, process_stat, step_log = run_workflow("migrate_edge_port", initial_data)
# Dry and Real run for disabling config and creating a new edge port
for _ in range(3):
result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
# confirm continue moved fiber
result, step_log = resume_suspend_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
# confirm things in moodi looks good and start Migrate all L3 Core Services
result, step_log = resume_suspend_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
# confirm things in moodi looks good and start migration of L2 core services
result, step_log = resume_suspend_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
result, step_log = resume_suspend_workflow(result, process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
result, step_log = assert_stop_moodi(result, process_stat, step_log)
assert_complete(result)
state = extract_state(result)
subscription_id = state["subscription_id"]
subscription = EdgePort.from_subscription(subscription_id)
assert subscription.status == "active"
router_fqdn = Router.from_subscription(input_form_wizard_data[0]["node"]).router.router_fqdn
assert subscription.edge_port.ga_id is not None
assert subscription.description == f"Edge Port lag-21 on {router_fqdn}, GAAR, {subscription.edge_port.ga_id}"
assert len(subscription.edge_port.edge_port_ae_members) == 2
assert mock_execute_playbook.call_count == 4
...@@ -12,6 +12,7 @@ from test.services.conftest import MockedSharePointClient ...@@ -12,6 +12,7 @@ from test.services.conftest import MockedSharePointClient
from test.workflows import ( from test.workflows import (
assert_complete, assert_complete,
assert_lso_interaction_success, assert_lso_interaction_success,
assert_stop_moodi,
assert_suspended, assert_suspended,
extract_state, extract_state,
resume_workflow, resume_workflow,
...@@ -85,7 +86,7 @@ def test_create_l3_core_service_success( ...@@ -85,7 +86,7 @@ def test_create_l3_core_service_success(
"v6_bgp_peer": base_bgp_peer_input() | {"add_v6_multicast": faker.boolean(), "peer_address": faker.ipv6()}, "v6_bgp_peer": base_bgp_peer_input() | {"add_v6_multicast": faker.boolean(), "peer_address": faker.ipv6()},
}, },
] ]
lso_interaction_count = 6 lso_interaction_count = 7
result, process_stat, step_log = run_workflow("create_l3_core_service", form_input_data) result, process_stat, step_log = run_workflow("create_l3_core_service", form_input_data)
...@@ -95,10 +96,12 @@ def test_create_l3_core_service_success( ...@@ -95,10 +96,12 @@ def test_create_l3_core_service_success(
assert_suspended(result) assert_suspended(result)
result, step_log = resume_workflow(process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM) result, step_log = resume_workflow(process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
result, step_log = assert_stop_moodi(result, process_stat, step_log)
assert_complete(result) assert_complete(result)
state = extract_state(result) state = extract_state(result)
subscription = L3CoreService.from_subscription(state["subscription_id"]) subscription = L3CoreService.from_subscription(state["subscription_id"])
assert mock_lso_client.call_count == lso_interaction_count assert mock_lso_client.call_count == lso_interaction_count + 1
assert subscription.status == SubscriptionLifecycle.ACTIVE assert subscription.status == SubscriptionLifecycle.ACTIVE
assert len(subscription.l3_core_service.ap_list) == 1 assert len(subscription.l3_core_service.ap_list) == 1
assert ( assert (
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment