Skip to content
Snippets Groups Projects
Commit 906939de authored by Neda Moeini's avatar Neda Moeini
Browse files

Update modification WF for L2Circuits.

parent 83cec5b9
No related branches found
No related tags found
1 merge request!307Feature/l2circuits
......@@ -7,11 +7,12 @@ from orchestrator.types import FormGenerator, UUIDstr
from orchestrator.workflow import StepList, step
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic import BaseModel, ConfigDict, Field
from pydantic import ConfigDict, Field
from pydantic_forms.validators import Divider, Label, ReadOnlyField
from gso.products import Layer2Circuit
from gso.products.product_blocks.layer_2_circuit import Layer2CircuitType
from gso.products.product_types.edge_port import EdgePort
from gso.products.product_types.layer_2_circuit import Layer2Circuit
from gso.services.partners import get_partner_by_id
from gso.utils.types.interfaces import BandwidthString
from gso.utils.types.tt_number import TTNumber
......@@ -34,47 +35,37 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
layer_2_circuit_input = yield ModifyL2CircuitForm
class ModifyLayer2CircuitSideA(BaseModel):
edge_port: ReadOnlyField(subscription.layer_2_circuit.layer_2_circuit_sides[0].sbp.edge_port.edge_port_name,
default_type=str)
geant_sid: ReadOnlyField(subscription.layer_2_circuit.layer_2_circuit_sides[0].sbp.geant_sid, default_type=str)
is_tagged: ReadOnlyField(subscription.layer_2_circuit.layer_2_circuit_sides[0].sbp.is_tagged, default_type=bool)
vlan_id: ReadOnlyField(subscription.layer_2_circuit.layer_2_circuit_sides[0].sbp.vlan_id, default_type=int)
custom_firewall_filters: ReadOnlyField(
subscription.layer_2_circuit.layer_2_circuit_sides[0].sbp.custom_firewall_filters, default_type=bool)
class ModifyLayer2CircuitSideB(BaseModel):
edge_port: ReadOnlyField(subscription.layer_2_circuit.layer_2_circuit_sides[1].sbp.edge_port.edge_port_name,
default_type=str)
geant_sid: ReadOnlyField(subscription.layer_2_circuit.layer_2_circuit_sides[1].sbp.geant_sid, default_type=str)
is_tagged: ReadOnlyField(subscription.layer_2_circuit.layer_2_circuit_sides[1].sbp.is_tagged, default_type=bool)
vlan_id: ReadOnlyField(subscription.layer_2_circuit.layer_2_circuit_sides[1].sbp.vlan_id, default_type=int)
custom_firewall_filters: ReadOnlyField(
subscription.layer_2_circuit.layer_2_circuit_sides[1].sbp.custom_firewall_filters, default_type=bool)
class ModifyLayer2CircuitServiceSidesPage(FormPage):
model_config = ConfigDict(title=f"{product_name} - Configure Edge Ports")
vlan_range_label: Label = Field("Please set a VLAN range, bounds including.", exclude=True)
if layer_2_circuit_input.layer_2_circuit_type == Layer2CircuitType.TAGGED:
vlan_range_label: Label = Field("Please set a VLAN range, bounds including.", exclude=True)
vlan_range_lower_bound: VLAN_ID
vlan_range_upper_bound: VLAN_ID
vlan_range_lower_bound: VLAN_ID = subscription.layer_2_circuit.vlan_range_lower_bound
vlan_range_upper_bound: VLAN_ID = subscription.layer_2_circuit.vlan_range_upper_bound
else:
vlan_range_lower_bound: ReadOnlyField(None, default_type=int)
vlan_range_upper_bound: ReadOnlyField(None, default_type=int)
vlan_divider: Divider = Field(None, exclude=True)
if layer_2_circuit_input.policer_enabled:
policer_bandwidth: BandwidthString
policer_bandwidth: BandwidthString = subscription.layer_2_circuit.bandwidth
else:
policer_bandwidth: ReadOnlyField(None, default_type=str)
policer_divider: Divider = Field(None, exclude=True)
layer_2_circuit_side_a: ModifyLayer2CircuitSideA
layer_2_circuit_side_a: ReadOnlyField(
EdgePort.from_subscription(
subscription.layer_2_circuit.layer_2_circuit_sides[0].sbp.edge_port.owner_subscription_id
).description,
default_type=str,
)
side_divider: Divider = Field(None, exclude=True)
layer_2_circuit_side_b: ModifyLayer2CircuitSideB
layer_2_circuit_side_b: ReadOnlyField(
EdgePort.from_subscription(
subscription.layer_2_circuit.layer_2_circuit_sides[1].sbp.edge_port.owner_subscription_id
).description,
default_type=str,
)
layer_2_circuit_sides = yield ModifyLayer2CircuitServiceSidesPage
......@@ -83,12 +74,12 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
@step("Update Layer 2 Circuit subscription")
def modify_layer_2_circuit_subscription(
subscription: Layer2Circuit,
layer_2_circuit_type: Layer2CircuitType,
vlan_range_lower_bound: VLAN_ID | None,
vlan_range_upper_bound: VLAN_ID | None,
policer_enabled: bool, # noqa: FBT001
policer_bandwidth: BandwidthString | None,
subscription: Layer2Circuit,
layer_2_circuit_type: Layer2CircuitType,
vlan_range_lower_bound: VLAN_ID | None,
vlan_range_upper_bound: VLAN_ID | None,
policer_enabled: bool, # noqa: FBT001
policer_bandwidth: BandwidthString | None,
):
"""Update the Layer 2 Circuit subscription with the new values."""
subscription.layer_2_circuit.layer_2_circuit_type = layer_2_circuit_type
......@@ -108,10 +99,10 @@ def modify_layer_2_circuit_subscription(
def modify_layer_2_circuit() -> StepList:
"""Modify an existing Layer 2 Circuit service subscription."""
return (
begin
>> store_process_subscription(Target.MODIFY)
>> unsync
>> modify_layer_2_circuit_subscription
>> resync
>> done
begin
>> store_process_subscription(Target.MODIFY)
>> unsync
>> modify_layer_2_circuit_subscription
>> resync
>> done
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment