Skip to content
Snippets Groups Projects
Commit bb30510e authored by Neda Moeini's avatar Neda Moeini
Browse files

Update import CLI for L2Circuits.

parent 0bb5f9ea
No related branches found
No related tags found
1 merge request!307Feature/l2circuits
......@@ -17,13 +17,14 @@ from pydantic import BaseModel, ValidationError, field_validator, model_validato
from sqlalchemy.exc import SQLAlchemyError
from gso.db.models import PartnerTable
from gso.products import ProductType
from gso.products import Layer2CircuitServiceType, ProductType
from gso.products.product_blocks.bgp_session import IPFamily
from gso.products.product_blocks.edge_port import EdgePortType, EncapsulationType
from gso.products.product_blocks.iptrunk import IptrunkType
from gso.products.product_blocks.layer_2_circuit import Layer2CircuitType
from gso.products.product_blocks.router import RouterRole
from gso.products.product_blocks.switch import SwitchModel
from gso.products.product_types.nren_l3_core_service import NRENL3CoreServiceType
from gso.products.product_types.edge_port import EdgePort
from gso.services.partners import (
PartnerEmail,
PartnerName,
......@@ -38,7 +39,7 @@ from gso.services.subscriptions import (
)
from gso.utils.shared_enums import SBPType, Vendor
from gso.utils.types.base_site import BaseSiteValidatorModel
from gso.utils.types.interfaces import LAGMember, LAGMemberList, PhysicalPortCapacity
from gso.utils.types.interfaces import BandwidthString, LAGMember, LAGMemberList, PhysicalPortCapacity
from gso.utils.types.ip_address import (
AddressSpace,
IPAddress,
......@@ -49,7 +50,7 @@ from gso.utils.types.ip_address import (
IPV6Netmask,
PortNumber,
)
from gso.utils.types.virtual_identifiers import VLAN_ID
from gso.utils.types.virtual_identifiers import VC_ID, VLAN_ID
app: typer.Typer = typer.Typer()
......@@ -327,6 +328,49 @@ class LanSwitchInterconnectImportModel(BaseModel):
switch_side: LanSwitchInterconnectSwitchSideImportModel
class Layer2CircuitServiceImportModel(BaseModel):
"""Import Layer 2 Circuit Service model."""
class ServiceBindingPortInput(BaseModel):
"""Service Binding Port model."""
edge_port: UUIDstr
vlan_id: VLAN_ID
service_type: Layer2CircuitServiceType
partner: str
geant_sid: str
vc_id: VC_ID
layer_2_circuit_side_a: ServiceBindingPortInput
layer_2_circuit_side_b: ServiceBindingPortInput
layer_2_circuit_type: Layer2CircuitType
vlan_range_lower_bound: VLAN_ID | None = None
vlan_range_upper_bound: VLAN_ID | None = None
policer_enabled: bool = False
policer_bandwidth: BandwidthString | None = None
policer_burst_rate: BandwidthString | None = None
@field_validator("partner")
def check_if_partner_exists(cls, value: str) -> str:
"""Validate that the partner exists."""
try:
get_partner_by_name(value)
except PartnerNotFoundError as e:
msg = f"Partner {value} not found"
raise ValueError(msg) from e
return value
@model_validator(mode="after")
def check_if_edge_ports_exist(self) -> Self:
"""Check if the edge ports exist."""
for side in [self.layer_2_circuit_side_a, self.layer_2_circuit_side_b]:
if not EdgePort.from_subscription(side.edge_port):
msg = f"Edge Port {side.edge_port} not found"
raise ValueError(msg)
return self
T = TypeVar(
"T",
SiteImportModel,
......@@ -339,6 +383,7 @@ T = TypeVar(
EdgePortImportModel,
NRENL3CoreServiceImportModel,
LanSwitchInterconnectImportModel,
Layer2CircuitServiceImportModel,
)
common_filepath_option = typer.Option(
......@@ -608,7 +653,7 @@ def import_nren_l3_core_service(filepath: str = common_filepath_option) -> None:
for nren_l3_core_service in nren_l3_core_service_list:
partner = nren_l3_core_service["partner"]
service_type = NRENL3CoreServiceType(nren_l3_core_service["service_type"])
service_type = nren_l3_core_service["service_type"]
typer.echo(f"Creating imported {service_type} for {partner}")
try:
......@@ -650,3 +695,44 @@ def import_lan_switch_interconnect(filepath: str = common_filepath_option) -> No
"lan_switch_interconnect_description",
LanSwitchInterconnectImportModel,
)
@app.command()
def import_layer_2_circuit_service(filepath: str = common_filepath_option) -> None:
"""Import Layer 2 Circuit services into GSO."""
successfully_imported_data = []
layer_2_circuit_service_list = _read_data(Path(filepath))
for layer_2_circuit_service in layer_2_circuit_service_list:
partner = layer_2_circuit_service["partner"]
service_type = Layer2CircuitServiceType(layer_2_circuit_service["service_type"])
typer.echo(f"Creating imported {service_type} for {partner}")
try:
initial_data = Layer2CircuitServiceImportModel(**layer_2_circuit_service)
start_process("create_imported_layer_2_circuit", [initial_data.model_dump()])
successfully_imported_data.append(initial_data.vc_id)
typer.echo(
f"Successfully created imported {service_type} with virtual circuit ID {initial_data.vc_id}"
f" for {partner}"
)
except ValidationError as e:
typer.echo(f"Validation error: {e}")
typer.echo("Waiting for the dust to settle before importing new products...")
time.sleep(1)
# Migrate new products from imported to "full" counterpart.
imported_products = get_subscriptions(
product_types=[ProductType.IMPORTED_EXPRESSROUTE, ProductType.IMPORTED_GEANT_PLUS],
lifecycles=[SubscriptionLifecycle.ACTIVE],
includes=["subscription_id"],
)
for subscription_id in imported_products:
typer.echo(f"Importing {subscription_id}")
start_process("import_layer_2_circuit", [subscription_id])
if successfully_imported_data:
typer.echo("Successfully created imported Layer 2 Circuit services:")
for item in successfully_imported_data:
typer.echo(f"- {item}")
......@@ -18,7 +18,9 @@ def import_layer_2_circuit_subscription(subscription_id: UUIDstr) -> State:
old_layer_2_circuit_subscription = ImportedLayer2Circuit.from_subscription(subscription_id)
if old_layer_2_circuit_subscription.layer_2_circuit_service_type == Layer2CircuitServiceType.IMPORTED_GEANT_PLUS:
new_subscription_id = get_product_id_by_name(ProductName.GEANT_PLUS)
elif old_layer_2_circuit_subscription.layer_2_circuit_service_type == Layer2CircuitServiceType.IMPORTED_EXPRESSROUTE:
elif (
old_layer_2_circuit_subscription.layer_2_circuit_service_type == Layer2CircuitServiceType.IMPORTED_EXPRESSROUTE
):
new_subscription_id = get_product_id_by_name(ProductName.EXPRESSROUTE)
else:
msg = f"This {old_layer_2_circuit_subscription.layer_2_circuit_service_type} is already imported."
......@@ -28,9 +30,7 @@ def import_layer_2_circuit_subscription(subscription_id: UUIDstr) -> State:
return {"subscription": new_subscription}
@workflow(
"Import Layer 2 Circuit", target=Target.MODIFY, initial_input_form=wrap_modify_initial_input_form(None)
)
@workflow("Import Layer 2 Circuit", target=Target.MODIFY, initial_input_form=wrap_modify_initial_input_form(None))
def import_layer_2_circuit() -> StepList:
"""Modify an imported subscription into a layer 2 circuit subscription to complete the import."""
return (
......
......@@ -8,6 +8,7 @@ from gso.cli.imports import (
import_edge_port,
import_iptrunks,
import_lan_switch_interconnect,
import_layer_2_circuit_service,
import_nren_l3_core_service,
import_office_routers,
import_opengear,
......@@ -19,12 +20,14 @@ from gso.cli.imports import (
from gso.products.product_blocks.bgp_session import IPFamily
from gso.products.product_blocks.edge_port import EdgePortType, EncapsulationType
from gso.products.product_blocks.iptrunk import IptrunkType
from gso.products.product_blocks.layer_2_circuit import Layer2CircuitType
from gso.products.product_blocks.router import RouterRole
from gso.products.product_blocks.site import SiteTier
from gso.products.product_blocks.switch import SwitchModel
from gso.products.product_types.layer_2_circuit import Layer2CircuitServiceType
from gso.products.product_types.router import Router
from gso.products.product_types.site import Site
from gso.utils.helpers import iso_from_ipv4
from gso.utils.helpers import generate_unique_vc_id, iso_from_ipv4
from gso.utils.shared_enums import Vendor
from gso.utils.types.interfaces import PhysicalPortCapacity
from gso.utils.types.ip_address import AddressSpace
......@@ -389,6 +392,36 @@ def nren_l3_core_service_data(temp_file, faker, partner_factory, edge_port_subsc
return _nren_l3_core_service_data
@pytest.fixture()
def layer_2_circuit_data(temp_file, faker, partner_factory, edge_port_subscription_factory):
def _layer_2_circuit_data(**kwargs):
layer_2_circuit_input_data = {
"partner": partner_factory()["name"],
"service_type": Layer2CircuitServiceType.GEANT_PLUS,
"geant_sid": faker.geant_sid(),
"vc_id": generate_unique_vc_id(),
"layer_2_circuit_side_a": {
"edge_port": edge_port_subscription_factory(),
"vlan_id": faker.vlan_id(),
},
"layer_2_circuit_side_b": {
"edge_port": edge_port_subscription_factory(),
"vlan_id": faker.vlan_id(),
},
"layer_2_circuit_type": Layer2CircuitType.TAGGED,
"vlan_range_lower_bound": faker.vlan_id(),
"vlan_range_upper_bound": faker.vlan_id(),
"policer_enabled": False,
}
layer_2_circuit_input_data.update(**kwargs)
temp_file.write_text(json.dumps([layer_2_circuit_input_data]))
return {"path": str(temp_file), "data": layer_2_circuit_input_data}
return _layer_2_circuit_data
###########
# TESTS #
###########
......@@ -710,3 +743,24 @@ def test_import_nren_l3_core_service_with_invalid_edge_port(
captured_output, _ = capfd.readouterr()
assert f"Edge Port {fake_uuid} not found" in captured_output
assert mock_start_process.call_count == 0
@patch("gso.cli.imports.time.sleep")
@patch("gso.cli.imports.start_process")
def test_import_layer_2_circuit_success(mock_start_process, mock_sleep, layer_2_circuit_data):
import_layer_2_circuit_service(layer_2_circuit_data()["path"])
assert mock_start_process.call_count == 1
assert mock_sleep.call_count == 1
@patch("gso.cli.imports.time.sleep")
@patch("gso.cli.imports.start_process")
def test_import_layer_2_circuit_with_invalid_partner(
mock_start_process, mock_sleep, layer_2_circuit_data, edge_port_subscription_factory, capfd, faker
):
broken_data = layer_2_circuit_data(partner="INVALID")
import_layer_2_circuit_service(broken_data["path"])
captured_output, _ = capfd.readouterr()
assert "Partner INVALID not found" in captured_output
assert mock_start_process.call_count == 0
......@@ -14,9 +14,7 @@ def test_import_layer_2_circuit_success(layer_2_circuit_service_type, layer_2_ci
imported_layer_2_circuit = layer_2_circuit_subscription_factory(
layer_2_circuit_service_type=layer_2_circuit_service_type
)
result, _, _ = run_workflow(
"import_layer_2_circuit", [{"subscription_id": imported_layer_2_circuit}]
)
result, _, _ = run_workflow("import_layer_2_circuit", [{"subscription_id": imported_layer_2_circuit}])
subscription = Layer2Circuit.from_subscription(imported_layer_2_circuit)
assert_complete(result)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment