Skip to content
Snippets Groups Projects
Commit e7cdda2c authored by Neda Moeini's avatar Neda Moeini
Browse files

Add IX Port creation workflow and input forms

parent 3a922478
Branches
Tags
No related merge requests found
Pipeline #94521 failed
......@@ -172,3 +172,6 @@ LazyWorkflowInstance("gso.workflows.vrf.create_vrf", "create_vrf")
LazyWorkflowInstance("gso.workflows.vrf.modify_vrf_router_list", "modify_vrf_router_list")
LazyWorkflowInstance("gso.workflows.vrf.redeploy_vrf", "redeploy_vrf")
LazyWorkflowInstance("gso.workflows.vrf.terminate_vrf", "terminate_vrf")
# IX Port workflows
LazyWorkflowInstance("gso.workflows.ix_port.create_ix_port", "create_ix_port")
\ No newline at end of file
"""Create IX Port Service Workflow"""
from uuid import uuid4
from orchestrator import step, workflow
from orchestrator.forms import FormPage, SubmitFormPage
from orchestrator.targets import Target
from orchestrator.types import SubscriptionLifecycle
from orchestrator.workflow import StepList, begin, done
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from orchestrator.workflows.utils import wrap_create_initial_input_form
from pydantic import ConfigDict, Field
from pydantic_forms.types import FormGenerator, State, UUIDstr
from pydantic_forms.validators import Divider
from gso.products.product_blocks.service_binding_port import BFDSettings
from gso.products.product_types.edge_port import EdgePort
from gso.products.product_types.ix_port import IXPort, IXPortInactive
from gso.services.partners import get_partner_by_name
from gso.services.subscriptions import generate_unique_id
from gso.utils.helpers import active_edge_port_selector, partner_choice
from gso.utils.types.ip_address import IPv4AddressType, IPv4Netmask, IPv6AddressType, IPv6Netmask
from gso.utils.types.tt_number import TTNumber
from gso.utils.types.virtual_identifiers import VLAN_ID
def initial_input_generator(product_name: str) -> FormGenerator:
"""Gather input from the operator about a new IX Port subscription."""
geant_partner_id = get_partner_by_name("GEANT").partner_id
class InitialIXPortForm(FormPage):
model_config = ConfigDict(title=f"{product_name}")
tt_number: TTNumber
partner: partner_choice()
initial_user_input = yield InitialIXPortForm
class ConfigureIXPortForm(SubmitFormPage):
model_config = ConfigDict(title=f"{product_name} - Configure IX Port")
gs_id: str = Field(default_factory=lambda: generate_unique_id("GS"))
edge_port: active_edge_port_selector(
partner_id=initial_user_input.partner if initial_user_input.partner != geant_partner_id else None
)
is_tagged: bool = True
vlan_id: VLAN_ID | None = None
divider1: Divider = Field(None, exclude=True)
ipv4_address: IPv4AddressType | None = None
ipv4_mask: IPv4Netmask | None = None
ipv6_address: IPv6AddressType | None = None
ipv6_mask: IPv6Netmask | None = None
custom_firewall_filters: bool = False
divider2: Divider = Field(None, exclude=True)
v4_bfd_settings: BFDSettings
v6_bfd_settings: BFDSettings
config_input = yield ConfigureIXPortForm
return {"product_name": product_name} | initial_user_input.model_dump() | config_input.model_dump()
@step("Create IXPort subscription")
def create_subscription(product: UUIDstr, partner: UUIDstr) -> State:
"""Create a new IXPort subscription object."""
subscription = IXPortInactive.from_product_id(product, partner)
return {"subscription": subscription, "subscription_id": subscription.subscription_id}
@step("Initialize IXPort subscription")
def initialize_subscription(
subscription: IXPortInactive,
gs_id: str,
edge_port: str,
is_tagged: bool,
vlan_id: VLAN_ID | None,
ipv4_address: IPv4AddressType | None,
ipv4_mask: IPv4Netmask | None,
ipv6_address: IPv6AddressType | None,
ipv6_mask: IPv6Netmask | None,
custom_firewall_filters: bool,
v4_bfd_settings: BFDSettings,
v6_bfd_settings: BFDSettings,
) -> State:
"""Initialize the IXPort subscription object."""
subscription.ix_port.l3_interface = subscription.ix_port.l3_interface.new(
uuid4(),
gs_id=gs_id,
edge_port=EdgePort.from_subscription(subscription_id=edge_port).edge_port,
is_tagged=is_tagged,
vlan_id=vlan_id,
ipv4_address=ipv4_address,
ipv4_mask=ipv4_mask,
ipv6_address=ipv6_address,
ipv6_mask=ipv6_mask,
custom_firewall_filters=custom_firewall_filters,
v4_bfd_settings=v4_bfd_settings,
v6_bfd_settings=v6_bfd_settings,
)
subscription = IXPort.from_other_lifecycle(subscription, SubscriptionLifecycle.PROVISIONING)
subscription.description = f"{subscription.product.name} - {gs_id}"
return {"subscription": subscription}
@workflow(
"Create IX Port Service",
initial_input_form=wrap_create_initial_input_form(initial_input_generator),
target=Target.CREATE,
)
def create_ix_port() -> StepList:
"""Create a new IX Port service subscription."""
return (
begin
>> create_subscription
>> store_process_subscription(Target.CREATE)
>> initialize_subscription
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment