diff --git a/gso/services/subscriptions.py b/gso/services/subscriptions.py index 1458f000439fa8bcf936a08b864eb117296caf37..b2c2561d8cb4a213b172aea7ed89cc12aed8bd1c 100644 --- a/gso/services/subscriptions.py +++ b/gso/services/subscriptions.py @@ -287,3 +287,25 @@ def get_all_active_sites() -> list[dict[str, Any]]: } for subscription in get_active_site_subscriptions(includes=["subscription_id"]) ] + + +def is_virtual_circuit_id_available(virtual_circuit_id: str) -> bool: + """Check if the given virtual circuit ID is unique in the database. + + This function verifies if the specified virtual circuit ID is not already + present in the core database. + + :param virtual_circuit_id: The virtual circuit ID to check. + :type virtual_circuit_id: str + :return: True if the virtual circuit ID is unique (not found), False if it exists. + :rtype: bool + """ + exists = ( + ResourceTypeTable.query.join(SubscriptionInstanceValueTable) + .filter( + ResourceTypeTable.resource_type == "virtual_circuit_id", + SubscriptionInstanceValueTable.value == virtual_circuit_id, + ) + .scalar() + ) + return exists is None diff --git a/gso/utils/helpers.py b/gso/utils/helpers.py index c05517a23beed1b47a5aaf37786af5fe8b8d1eee..c4a5a3d1ca5c8587da3c230b5355d703f85c9e45 100644 --- a/gso/utils/helpers.py +++ b/gso/utils/helpers.py @@ -1,5 +1,6 @@ """Helper methods that are used across :term:`GSO`.""" +import random import re from typing import TYPE_CHECKING from uuid import UUID @@ -13,9 +14,11 @@ from gso.products.product_types.router import Router from gso.services import subscriptions from gso.services.netbox_client import NetboxClient from gso.services.partners import get_all_partners +from gso.services.subscriptions import is_virtual_circuit_id_available from gso.utils.shared_enums import Vendor from gso.utils.types.interfaces import PhysicalPortCapacity from gso.utils.types.ip_address import IPv4AddressType +from gso.utils.types.virtual_identifiers import VC_ID if TYPE_CHECKING: from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock @@ -248,3 +251,28 @@ def validate_edge_port_number_of_members_based_on_lacp(*, number_of_members: int if number_of_members > 1 and not enable_lacp: err_msg = "Number of members must be 1 if LACP is disabled." raise ValueError(err_msg) + + +def generate_unique_vc_id(max_attempts: int = 100) -> VC_ID | None: + """Generate a unique 8-digit VC_ID starting with '11'. + + This function attempts to generate an 8-digit VC_ID beginning with '11', + checking its uniqueness before returning it. A maximum attempt limit is + set to prevent infinite loops in case the ID space is saturated. + + :param max_attempts: The maximum number of attempts to generate a unique ID. + :type max_attempts: int + :return: A unique VC_ID instance if successful, None if no unique ID is found. + :rtype: Optional[VC_ID] + """ + + def create_vc_id() -> str: + """Generate an 8-digit VC_ID starting with "11".""" + return f"11{random.randint(100000, 999999)}" # noqa: S311 + + for _ in range(max_attempts): + vc_id = create_vc_id() + if is_virtual_circuit_id_available(vc_id): + return VC_ID(vc_id) + + return None diff --git a/gso/workflows/__init__.py b/gso/workflows/__init__.py index e9e612573b4e60aba0a52f20b421c0f2f47a580f..c2444925c1025d5cbab3b352c8aa3f592e70f15f 100644 --- a/gso/workflows/__init__.py +++ b/gso/workflows/__init__.py @@ -128,4 +128,3 @@ LazyWorkflowInstance("gso.workflows.nren_l3_core_service.migrate_nren_l3_core_se # Layer 2 Circuit workflows LazyWorkflowInstance("gso.workflows.l2_circuit.create_layer_2_circuit", "create_layer_2_circuit") LazyWorkflowInstance("gso.workflows.l2_circuit.modify_layer_2_circuit", "modify_layer_2_circuit") - diff --git a/gso/workflows/l2_circuit/create_layer_2_circuit.py b/gso/workflows/l2_circuit/create_layer_2_circuit.py index aa2726020a4922aa98b4b2d94d7236f71be54bd3..f30633cf918ffc0cfa95558e87ba318926137917 100644 --- a/gso/workflows/l2_circuit/create_layer_2_circuit.py +++ b/gso/workflows/l2_circuit/create_layer_2_circuit.py @@ -16,12 +16,13 @@ from pydantic_forms.validators import Divider, Label, ReadOnlyField from gso.products.product_blocks.layer_2_circuit import Layer2CircuitSideBlockInactive, Layer2CircuitType from gso.products.product_blocks.service_binding_port import ServiceBindingPortInactive +from gso.products.product_types.edge_port import EdgePort from gso.products.product_types.layer_2_circuit import Layer2CircuitInactive -from gso.utils.helpers import active_edge_port_selector, partner_choice +from gso.utils.helpers import active_edge_port_selector, generate_unique_vc_id, partner_choice from gso.utils.shared_enums import SBPType from gso.utils.types.interfaces import BandwidthString from gso.utils.types.tt_number import TTNumber -from gso.utils.types.virtual_identifiers import VC_ID, VLAN_ID +from gso.utils.types.virtual_identifiers import VLAN_ID def initial_input_generator(product_name: str) -> FormGenerator: @@ -51,18 +52,26 @@ def initial_input_generator(product_name: str) -> FormGenerator: class Layer2CircuitServiceSidesPage(FormPage): model_config = ConfigDict(title=f"{product_name} - Configure Edge Ports") + vlan_range_label: Label | None = Field(None, exclude=True) + vlan_range_lower_bound: VLAN_ID | None = None + vlan_range_upper_bound: VLAN_ID | None = None + + policer_bandwidth: BandwidthString | None = None + if initial_user_input.layer_2_circuit_type == Layer2CircuitType.TAGGED: vlan_range_label: Label = Field("Please set a VLAN range, bounds including.", exclude=True) vlan_range_lower_bound: VLAN_ID vlan_range_upper_bound: VLAN_ID - vlan_range_lower_bound: ReadOnlyField(None, default_type=int) - vlan_range_upper_bound: ReadOnlyField(None, default_type=int) + else: + vlan_range_lower_bound: ReadOnlyField(None, default_type=int) + vlan_range_upper_bound: ReadOnlyField(None, default_type=int) vlan_divider: Divider = Field(None, exclude=True) if initial_user_input.policer_enabled: policer_bandwidth: BandwidthString - policer_bandwidth: ReadOnlyField(None, default_type=str) + else: + policer_bandwidth: ReadOnlyField(None, default_type=str) policer_divider: Divider = Field(None, exclude=True) @@ -85,24 +94,26 @@ def create_subscription(product: UUIDstr, partner: str) -> State: @step("Initialize subscription") def initialize_subscription( - subscription: Layer2CircuitInactive, - layer_2_circuit_side_a: dict[str, Any], - layer_2_circuit_side_b: dict[str, Any], - virtual_circuit_id: VC_ID | None, - layer_2_circuit_type: Layer2CircuitType, - vlan_range_lower_bound: VLAN_ID | None, - vlan_range_upper_bound: VLAN_ID | None, - policer_enabled: bool, # noqa: FBT001 - policer_bandwidth: BandwidthString | None, + subscription: Layer2CircuitInactive, + layer_2_circuit_side_a: dict[str, Any], + layer_2_circuit_side_b: dict[str, Any], + layer_2_circuit_type: Layer2CircuitType, + vlan_range_lower_bound: VLAN_ID | None, + vlan_range_upper_bound: VLAN_ID | None, + policer_enabled: bool, # noqa: FBT001 + policer_bandwidth: BandwidthString | None, ) -> State: """Build a subscription object from all user input.""" - subscription.layer_2_circuit.layer_2_circuit_sides = [ - Layer2CircuitSideBlockInactive.new( - uuid4(), sbp=ServiceBindingPortInactive.new(uuid4(), **circuit_side, sbp_type=SBPType.L2) + layer_2_circuit_sides = [] + for circuit_side_data in [layer_2_circuit_side_a, layer_2_circuit_side_b]: + edge_port_subscription = EdgePort.from_subscription(subscription_id=circuit_side_data.pop("edge_port")) + sbp = ServiceBindingPortInactive.new( + uuid4(), edge_port=edge_port_subscription.edge_port, sbp_type=SBPType.L2, **circuit_side_data ) - for circuit_side in [layer_2_circuit_side_a, layer_2_circuit_side_b] - ] - subscription.layer_2_circuit.virtual_circuit_id = virtual_circuit_id + layer2_circuit_side = Layer2CircuitSideBlockInactive.new(uuid4(), sbp=sbp) + layer_2_circuit_sides.append(layer2_circuit_side) + subscription.layer_2_circuit.layer_2_circuit_sides = layer_2_circuit_sides + subscription.layer_2_circuit.virtual_circuit_id = generate_unique_vc_id() subscription.layer_2_circuit.layer_2_circuit_type = layer_2_circuit_type subscription.layer_2_circuit.vlan_range_lower_bound = vlan_range_lower_bound subscription.layer_2_circuit.vlan_range_upper_bound = vlan_range_upper_bound