Skip to content
Snippets Groups Projects
Select Git revision
  • 1f2df23e2b08b4a1da11fd9de9fa40fb591af8cf
  • develop default protected
  • master protected
  • authorship-fix-from-develop
  • 1048-service-config-backfilling
  • feature/nat-1211-edgeport-lacp-xmit
  • fix/nat-1120-sdp-validation
  • NAT-1154-import-edge-port-update
  • fix/l3-imports
  • feature/10GGBS-NAT-980
  • fix/NAT-1009/fix-redeploy-base-config-if-there-is-a-vprn
  • 4.24
  • 4.23
  • 4.22
  • 4.21
  • 4.20
  • 4.19
  • 4.18
  • 4.17
  • 4.16
  • 4.15
  • 4.14
  • 4.13
  • 4.12
  • 4.11
  • 4.10
  • 4.8
  • 4.5
  • 4.4
  • 4.3
  • 4.2
31 results

create_iptrunk.py

Blame
  • create_iptrunk.py 27.90 KiB
    """A creation workflow that deploys a new IP trunk service.
    
    This the workflow that brings the subscription from `INACTIVE` to `PROVISIONING`. The deployment of a new IP trunk
    consist in the following steps:
    
    - Fill the form with the necessary fields:
        - SID
        - Type
        - Speed
        - Nodes
        - LAG interfaces with description
        - LAG members with description
    - WFO will query IPAM to retrieve the IPv4/IPv6 Networks necessary for the trunk. The container to use is specified in
    `oss-params.json`
    - The configuration necessary to deploy the LAG is generated and applied to the destination nodes using the Ansible
    playbook `iptrunks.yaml` This is done first in a dry mode (without committing) and then in a real mode committing the
    configuration. The commit message has the `subscription_id` and the `process_id`. Included in this, is the configuration
    necessary to enable LLDP on the physical interfaces.
    - Once the LAG interface is deployed, another Ansible playbook is called to verify that IP traffic can actually flow
    over the trunk ( `iptrunk_checks.yaml`)
    - Once the check is passed, the ISIS configuration will take place using the same `iptrunks.yaml`. Also in this case
    first there is a dry run and then a commit.
    - After this step the ISIS adjacency gets checked using `iptrunks_checks.yaml`
    
    The trunk is deployed with an initial ISIS metric of 90.000 to prevent traffic to pass.
    """
    
    import json
    from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network
    from typing import Annotated
    from uuid import uuid4
    
    from annotated_types import Len
    from orchestrator.forms import FormPage
    from orchestrator.forms.validators import Choice, Label
    from orchestrator.targets import Target
    from orchestrator.types import SubscriptionLifecycle
    from orchestrator.utils.errors import ProcessFailureError
    from orchestrator.utils.json import json_dumps
    from orchestrator.workflow import StepList, begin, conditional, done, step, step_group, workflow
    from orchestrator.workflows.steps import resync, set_status, store_process_subscription
    from orchestrator.workflows.utils import wrap_create_initial_input_form
    from ping3 import ping
    from pydantic import ConfigDict
    from pydantic_forms.types import FormGenerator, State, UUIDstr
    from pydantic_forms.validators import ReadOnlyField
    from pynetbox.models.dcim import Interfaces
    
    from gso.products.product_blocks.iptrunk import (
        IptrunkInterfaceBlockInactive,
        IptrunkSideBlockInactive,
        IptrunkType,
    )
    from gso.products.product_types.iptrunk import Iptrunk, IptrunkInactive, IptrunkProvisioning
    from gso.products.product_types.router import Router
    from gso.services import infoblox, subscriptions
    from gso.services.lso_client import LSOState, lso_interaction
    from gso.services.netbox_client import NetboxClient
    from gso.services.partners import get_partner_by_name
    from gso.services.sharepoint import SharePointClient
    from gso.services.subscriptions import (
        generate_unique_id,
        get_non_terminated_iptrunk_subscriptions,
    )
    from gso.settings import load_gso_service_config, load_oss_params
    from gso.utils.helpers import (
        available_interfaces_choices,
        available_lags_choices,
        calculate_recommended_minimum_links,
        get_router_vendor,
    )
    from gso.utils.shared_enums import Vendor
    from gso.utils.types.interfaces import JuniperLAGMember, LAGMember, LAGMemberList, PhysicalPortCapacity
    from gso.utils.types.netbox_router import NetboxEnabledRouter
    from gso.utils.types.tt_number import TTNumber
    from gso.utils.workflow_steps import prompt_sharepoint_checklist_url
    from gso.workflows.shared import create_summary_form
    
    
    def initial_input_form_generator(product_name: str) -> FormGenerator:
        """Gather input from the user in three steps. General information, and information on both sides of the trunk."""
        #  Add both provisioning and active routers, since trunks are required for promoting a router to active.
        active_and_provisioning_routers = subscriptions.get_active_router_subscriptions(
            includes=["subscription_id", "description"]
        ) + subscriptions.get_provisioning_router_subscriptions(includes=["subscription_id", "description"])
        routers = {str(router["subscription_id"]): router["description"] for router in active_and_provisioning_routers}
    
        # Get version choices from config
        iptrunk_versions = list(load_gso_service_config().IP_TRUNK.version.keys())
        iptrunk_version_choices = Choice("Select version", [(v, v) for v in iptrunk_versions])  # type: ignore[arg-type]
    
        class CreateIptrunkForm(FormPage):
            model_config = ConfigDict(title=product_name)
    
            tt_number: TTNumber
            partner: ReadOnlyField("GEANT", default_type=str)  # type: ignore[valid-type]
            iptrunk_description: str | None = None
            iptrunk_type: IptrunkType
            iptrunk_speed: PhysicalPortCapacity
            iptrunk_number_of_members: int
            iptrunk_description_suffix: str | None = None
            iptrunk_config_version: iptrunk_version_choices  # type: ignore[valid-type]
    
        initial_user_input = yield CreateIptrunkForm
        recommended_minimum_links = calculate_recommended_minimum_links(
            initial_user_input.iptrunk_number_of_members, initial_user_input.iptrunk_speed
        )
    
        class VerifyMinimumLinksForm(FormPage):
            info_label: Label = f"This is the calculated minimum-links for this LAG: {recommended_minimum_links}"
            iptrunk_minimum_links: int = recommended_minimum_links
            info_label2: Label = "Please confirm or modify."
    
        verify_minimum_links = yield VerifyMinimumLinksForm
        router_enum_a = Choice("Select a router", zip(routers.keys(), routers.items(), strict=True))  # type: ignore[arg-type]
    
        class SelectRouterSideA(FormPage):
            model_config = ConfigDict(title="Select a router for side A of the trunk.")
    
            side_a_node_id: NetboxEnabledRouter[router_enum_a]  # type: ignore[valid-type]
    
        user_input_router_side_a = yield SelectRouterSideA
        router_a = user_input_router_side_a.side_a_node_id.name
        router_a_fqdn = Router.from_subscription(router_a).router.router_fqdn
    
        juniper_ae_members = Annotated[
            LAGMemberList[JuniperLAGMember],
            Len(
                min_length=initial_user_input.iptrunk_number_of_members,
                max_length=initial_user_input.iptrunk_number_of_members,
            ),
        ]
    
        if get_router_vendor(router_a) == Vendor.NOKIA:
    
            class NokiaLAGMemberA(LAGMember):
                interface_name: available_interfaces_choices(  # type: ignore[valid-type]
                    router_a,
                    initial_user_input.iptrunk_speed,
                )
    
            ae_members_side_a_type = Annotated[
                LAGMemberList[NokiaLAGMemberA],
                Len(
                    min_length=initial_user_input.iptrunk_number_of_members,
                    max_length=initial_user_input.iptrunk_number_of_members,
                ),
            ]
        else:
            ae_members_side_a_type = juniper_ae_members  # type: ignore[assignment, misc]
    
        class CreateIptrunkSideAForm(FormPage):
            model_config = ConfigDict(title=f"Provide subscription details for side A of the trunk. ({router_a_fqdn})")
    
            side_a_ae_iface: available_lags_choices(router_a) or str  # type: ignore[valid-type]
            side_a_ae_members: ae_members_side_a_type
    
        user_input_side_a = yield CreateIptrunkSideAForm
        # Remove the selected router for side A, to prevent any loops
        routers.pop(str(router_a))
        router_enum_b = Choice("Select a router", zip(routers.keys(), routers.items(), strict=True))  # type: ignore[arg-type]
    
        class SelectRouterSideB(FormPage):
            model_config = ConfigDict(title="Select a router for side B of the trunk.")
    
            side_b_node_id: NetboxEnabledRouter[router_enum_b]  # type: ignore[valid-type]
    
        user_input_router_side_b = yield SelectRouterSideB
        router_b = user_input_router_side_b.side_b_node_id.name
        router_b_fqdn = Router.from_subscription(router_b).router.router_fqdn
    
        if get_router_vendor(router_b) == Vendor.NOKIA:
    
            class NokiaLAGMemberB(LAGMember):
                interface_name: available_interfaces_choices(  # type: ignore[valid-type]
                    router_b,
                    initial_user_input.iptrunk_speed,
                )
    
            ae_members_side_b = Annotated[
                LAGMemberList[NokiaLAGMemberB],
                Len(
                    min_length=len(user_input_side_a.side_a_ae_members), max_length=len(user_input_side_a.side_a_ae_members)
                ),
            ]
        else:
            ae_members_side_b = juniper_ae_members  # type: ignore[assignment, misc]
    
        class CreateIptrunkSideBForm(FormPage):
            model_config = ConfigDict(title=f"Provide subscription details for side B of the trunk. ({router_b_fqdn})")
    
            side_b_ae_iface: available_lags_choices(router_b) or str  # type: ignore[valid-type]
            side_b_ae_members: ae_members_side_b
    
        user_input_side_b = yield CreateIptrunkSideBForm
        input_forms_data = (
            initial_user_input.model_dump()
            | verify_minimum_links.model_dump()
            | user_input_router_side_a.model_dump()
            | user_input_side_a.model_dump()
            | user_input_router_side_b.model_dump()
            | user_input_side_b.model_dump()
        )
        summary_form_data = input_forms_data | {"side_a_node": router_a_fqdn, "side_b_node": router_b_fqdn}
        summary_fields = [
            "iptrunk_type",
            "iptrunk_speed",
            "iptrunk_description",
            "iptrunk_minimum_links",
            "iptrunk_description_suffix",
            "iptrunk_config_version",
            "side_a_node",
            "side_a_ae_iface",
            "side_a_ae_members",
            "side_b_node",
            "side_b_ae_iface",
            "side_b_ae_members",
        ]
        yield from create_summary_form(summary_form_data, product_name, summary_fields)
    
        return input_forms_data
    
    
    @step("Create subscription")
    def create_subscription(product: UUIDstr, partner: str) -> State:
        """Create a new subscription object in the database."""
        subscription = IptrunkInactive.from_product_id(product, get_partner_by_name(partner).partner_id)
    
        return {
            "subscription": subscription,
            "subscription_id": subscription.subscription_id,
        }
    
    
    @step("Get information from IPAM")
    def get_info_from_ipam(subscription: IptrunkInactive) -> State:
        """Allocate IP resources in IPAM."""
        new_ipv4_network = infoblox.allocate_v4_network(
            "TRUNK",
            subscription.iptrunk.iptrunk_description,
        )
        new_ipv6_network = infoblox.allocate_v6_network(
            "TRUNK",
            subscription.iptrunk.iptrunk_description,
        )
        subscription.iptrunk.iptrunk_ipv4_network = new_ipv4_network
        subscription.iptrunk.iptrunk_ipv6_network = new_ipv6_network
    
        return {
            "subscription": subscription,
            "new_ipv4_network": str(new_ipv4_network),
            "new_ipv6_network": str(new_ipv6_network),
        }
    
    
    @step("Check if assigned networks are already taken by other trunk subscription")
    def check_existing_trunk_allocations(subscription: IptrunkInactive) -> None:
        """Check if there already is a trunk with the same network resources assigned to it."""
        if not subscription.iptrunk.iptrunk_ipv4_network or not subscription.iptrunk.iptrunk_ipv6_network:
            msg = "Missing IP resources in subscription object."
            raise ProcessFailureError(
                msg, details=[subscription.iptrunk.iptrunk_ipv4_network, subscription.iptrunk.iptrunk_ipv6_network]
            )
    
        all_trunks = [
            Iptrunk.from_subscription(trunk["subscription_id"])
            for trunk in get_non_terminated_iptrunk_subscriptions()
            if trunk["subscription_id"] != subscription.subscription_id
        ]
        overlapping_ipv4_networks = [
            (trunk.description, trunk.iptrunk.iptrunk_ipv4_network)
            for trunk in all_trunks
            if trunk.iptrunk.iptrunk_ipv4_network.overlaps(subscription.iptrunk.iptrunk_ipv4_network)
        ]
        overlapping_ipv6_networks = [
            (trunk.description, trunk.iptrunk.iptrunk_ipv6_network)
            for trunk in all_trunks
            if trunk.iptrunk.iptrunk_ipv6_network.overlaps(subscription.iptrunk.iptrunk_ipv6_network)
        ]
    
        if overlapping_ipv4_networks or overlapping_ipv6_networks:
            msg = "Newly assigned IP networks overlap with existing IP trunk subscriptions, please investigate."
            raise ProcessFailureError(msg, details=[overlapping_ipv4_networks, overlapping_ipv6_networks])
    
    
    @step("Check for existing DNS records in the assigned IPv4 network")
    def dig_all_hosts_v4(new_ipv4_network: str) -> None:
        """Check if any hosts have already been assigned inside the IPv4 network in Netbox."""
        registered_hosts = [host for host in IPv4Network(new_ipv4_network) if infoblox.find_host_by_ip(IPv4Address(host))]
    
        if registered_hosts:
            msg = "One or more hosts in the assigned IPv4 network are already registered, please investigate."
            raise ProcessFailureError(msg, details=registered_hosts)
    
    
    @step("Check for existing DNS records in the assigned IPv6 network")
    def dig_all_hosts_v6(new_ipv6_network: str) -> None:
        """Check if any hosts have already been assigned inside the IPv6 network in Netbox."""
        registered_hosts = [host for host in IPv6Network(new_ipv6_network) if infoblox.find_host_by_ip(IPv6Address(host))]
    
        if registered_hosts:
            msg = "One or more hosts in the assigned IPv6 network are already registered, please investigate."
            raise ProcessFailureError(msg, details=registered_hosts)
    
    
    @step("Ping all hosts in the assigned IPv4 network")
    def ping_all_hosts_v4(new_ipv4_network: str) -> None:
        """Ping all hosts in the IPv4 network to verify they are not in use."""
        unavailable_hosts = [host for host in IPv4Network(new_ipv4_network) if ping(str(host), timeout=1)]
    
        if unavailable_hosts:
            msg = "One or more hosts in the assigned IPv4 network are responding to ping, please investigate."
            raise ProcessFailureError(msg, details=unavailable_hosts)
    
    
    @step("Ping all hosts in the assigned IPv6 network")
    def ping_all_hosts_v6(new_ipv6_network: str) -> State:
        """Ping all hosts in the IPv6 network to verify they are not in use."""
        unavailable_hosts = [host for host in IPv6Network(new_ipv6_network) if ping(str(host), timeout=1)]
    
        if unavailable_hosts:
            msg = "One or more hosts in the assigned IPv6 network are responding to ping, please investigate."
            raise ProcessFailureError(msg, details=unavailable_hosts)
    
        return {"__remove_keys": ["new_ipv4_network", "new_ipv6_network"]}
    
    
    @step("Initialize subscription")
    def initialize_subscription(
        subscription: IptrunkInactive,
        iptrunk_type: IptrunkType,
        iptrunk_description: str | None,
        iptrunk_speed: PhysicalPortCapacity,
        iptrunk_minimum_links: int,
        iptrunk_description_suffix: str | None,
        iptrunk_config_version: str,
        side_a_node_id: str,
        side_a_ae_iface: str,
        side_a_ae_members: list[dict],
        side_b_node_id: str,
        side_b_ae_iface: str,
        side_b_ae_members: list[dict],
    ) -> State:
        """Take all input from the user, and store it in the database."""
        oss_params = load_oss_params()
        side_a = Router.from_subscription(side_a_node_id).router
        side_b = Router.from_subscription(side_b_node_id).router
        gs_id = generate_unique_id(prefix="GS")
        subscription.iptrunk.gs_id = gs_id
        subscription.iptrunk.iptrunk_description = iptrunk_description
        subscription.iptrunk.iptrunk_type = iptrunk_type
        subscription.iptrunk.iptrunk_speed = iptrunk_speed
        subscription.iptrunk.iptrunk_isis_metric = oss_params.GENERAL.isis_high_metric
        subscription.iptrunk.iptrunk_minimum_links = iptrunk_minimum_links
        subscription.iptrunk.iptrunk_description_suffix = iptrunk_description_suffix
        subscription.iptrunk.iptrunk_config_version = iptrunk_config_version
    
        subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node = side_a
        subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_iface = side_a_ae_iface
        side_a_ga_id = generate_unique_id(prefix="GA")
        subscription.iptrunk.iptrunk_sides[0].ga_id = side_a_ga_id
        for member in side_a_ae_members:
            subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members.append(
                IptrunkInterfaceBlockInactive.new(subscription_id=uuid4(), **member),
            )
    
        subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node = side_b
        subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_iface = side_b_ae_iface
        side_b_ga_id = generate_unique_id(prefix="GA")
        subscription.iptrunk.iptrunk_sides[1].ga_id = side_b_ga_id
        for member in side_b_ae_members:
            subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_members.append(
                IptrunkInterfaceBlockInactive.new(subscription_id=uuid4(), **member),
            )
        side_names = sorted([side_a.router_site.site_name, side_b.router_site.site_name])
        description = f"IP trunk {side_names[0]} {side_names[1]}"
        if iptrunk_description_suffix:
            description += f" {iptrunk_description_suffix}"
        if gs_id:
            description += f", {gs_id}"
        subscription.description = description
    
        return {"subscription": subscription}
    
    
    @step("[DRY RUN] Provision IP trunk interface")
    def provision_ip_trunk_iface_dry(subscription: IptrunkInactive, process_id: UUIDstr, tt_number: str) -> LSOState:
        """Perform a dry run of deploying configuration on both sides of the trunk."""
        extra_vars = {
            "wfo_trunk_json": json.loads(json_dumps(subscription)),
            "dry_run": True,
            "verb": "deploy",
            "config_object": "trunk_interface",
            "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy config for "
            f"{subscription.iptrunk.gs_id}",
        }
    
        return {
            "playbook_name": "gap_ansible/playbooks/iptrunks.yaml",
            "inventory": {
                "all": {
                    "hosts": {
                        subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn: None,
                        subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn: None,
                    }
                }
            },
            "extra_vars": extra_vars,
        }
    
    
    @step("[FOR REAL] Provision IP trunk interface")
    def provision_ip_trunk_iface_real(subscription: IptrunkInactive, process_id: UUIDstr, tt_number: str) -> LSOState:
        """Deploy IP trunk configuration on both sides."""
        extra_vars = {
            "wfo_trunk_json": json.loads(json_dumps(subscription)),
            "dry_run": False,
            "verb": "deploy",
            "config_object": "trunk_interface",
            "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy config for "
            f"{subscription.iptrunk.gs_id}",
        }
    
        return {
            "playbook_name": "gap_ansible/playbooks/iptrunks.yaml",
            "inventory": {
                "all": {
                    "hosts": {
                        subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn: None,
                        subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn: None,
                    }
                }
            },
            "extra_vars": extra_vars,
        }
    
    
    @step("Check IP connectivity of the trunk")
    def check_ip_trunk_connectivity(subscription: IptrunkInactive) -> LSOState:
        """Check successful connectivity across the new trunk."""
        extra_vars = {"wfo_ip_trunk_json": json.loads(json_dumps(subscription)), "check": "ping"}
    
        return {
            "playbook_name": "gap_ansible/playbooks/iptrunks_checks.yaml",
            "inventory": {"all": {"hosts": {subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn: None}}},
            "extra_vars": extra_vars,
        }
    
    
    @step("[DRY RUN] Provision IP trunk ISIS interface")
    def provision_ip_trunk_isis_iface_dry(subscription: IptrunkInactive, process_id: UUIDstr, tt_number: str) -> LSOState:
        """Perform a dry run of deploying ISIS configuration."""
        extra_vars = {
            "wfo_trunk_json": json.loads(json_dumps(subscription)),
            "dry_run": True,
            "verb": "deploy",
            "config_object": "isis_interface",
            "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy config for "
            f"{subscription.iptrunk.gs_id}",
        }
    
        return {
            "playbook_name": "gap_ansible/playbooks/iptrunks.yaml",
            "inventory": {
                "all": {
                    "hosts": {
                        subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn: None,
                        subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn: None,
                    }
                }
            },
            "extra_vars": extra_vars,
        }
    
    
    @step("[FOR REAL] Provision IP trunk ISIS interface")
    def provision_ip_trunk_isis_iface_real(subscription: IptrunkInactive, process_id: UUIDstr, tt_number: str) -> LSOState:
        """Deploy ISIS configuration on both sides."""
        extra_vars = {
            "wfo_trunk_json": json.loads(json_dumps(subscription)),
            "dry_run": False,
            "verb": "deploy",
            "config_object": "isis_interface",
            "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy config for "
            f"{subscription.iptrunk.gs_id}",
        }
    
        return {
            "playbook_name": "gap_ansible/playbooks/iptrunks.yaml",
            "inventory": {
                "all": {
                    "hosts": {
                        subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn: None,
                        subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn: None,
                    }
                }
            },
            "extra_vars": extra_vars,
        }
    
    
    @step("Check ISIS adjacency")
    def check_ip_trunk_isis(subscription: IptrunkInactive) -> LSOState:
        """Run an Ansible playbook to confirm ISIS adjacency."""
        extra_vars = {"wfo_ip_trunk_json": json.loads(json_dumps(subscription)), "check": "isis"}
    
        return {
            "playbook_name": "gap_ansible/playbooks/iptrunks_checks.yaml",
            "inventory": {"all": {"hosts": {subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn: None}}},
            "extra_vars": extra_vars,
        }
    
    
    @step("Register DNS records for both sides of the trunk")
    def register_dns_records(subscription: IptrunkInactive) -> State:
        """Register DNS records for both sides of the newly created IP trunk."""
        for index, side in enumerate(subscription.iptrunk.iptrunk_sides):
            fqdn = f"{side.iptrunk_side_ae_iface}-0.{side.iptrunk_side_node.router_fqdn}"
            if not (subscription.iptrunk.iptrunk_ipv4_network and subscription.iptrunk.iptrunk_ipv6_network):
                msg = f"Missing IP resources in trunk, cannot allocate DNS record for side {fqdn}!"
                raise ValueError(msg)
            ipv4_addr = subscription.iptrunk.iptrunk_ipv4_network[index]
            ipv6_addr = subscription.iptrunk.iptrunk_ipv6_network[index + 1]
    
            infoblox.create_host_by_ip(
                fqdn, "TRUNK", str(subscription.subscription_id), ipv4_address=ipv4_addr, ipv6_address=ipv6_addr
            )
    
        return {"subscription": subscription}
    
    
    @step("NextBox integration")
    def reserve_interfaces_in_netbox(subscription: IptrunkInactive) -> State:
        """Create the LAG interfaces in NetBox and attach the LAG interfaces to the physical interfaces."""
        nbclient = NetboxClient()
        for trunk_side in subscription.iptrunk.iptrunk_sides:
            if get_router_vendor(trunk_side.iptrunk_side_node.owner_subscription_id) == Vendor.NOKIA:
                # Create LAG interfaces
                lag_interface: Interfaces = nbclient.create_interface(
                    iface_name=trunk_side.iptrunk_side_ae_iface,  # type: ignore[arg-type]
                    interface_type="lag",
                    device_name=trunk_side.iptrunk_side_node.router_fqdn,  # type: ignore[arg-type]
                    description=str(subscription.subscription_id),
                    enabled=True,
                )
                # Attach physical interfaces to LAG
                # Update interface description to subscription ID
                # Reserve interfaces
                for interface in trunk_side.iptrunk_side_ae_members:
                    nbclient.attach_interface_to_lag(
                        device_name=trunk_side.iptrunk_side_node.router_fqdn,  # type: ignore[arg-type]
                        lag_name=lag_interface.name,
                        iface_name=interface.interface_name,  # type: ignore[arg-type]
                        description=str(subscription.subscription_id),
                    )
                    nbclient.reserve_interface(
                        device_name=trunk_side.iptrunk_side_node.router_fqdn,  # type: ignore[arg-type]
                        iface_name=interface.interface_name,  # type: ignore[arg-type]
                    )
        return {
            "subscription": subscription,
        }
    
    
    def _allocate_interfaces_in_netbox(iptrunk_side: IptrunkSideBlockInactive) -> None:
        for interface in iptrunk_side.iptrunk_side_ae_members:
            fqdn = iptrunk_side.iptrunk_side_node.router_fqdn
            iface_name = interface.interface_name
            if not fqdn or not iface_name:
                msg = f"FQDN and/or interface name missing in subscription {interface.owner_subscription_id}"
                raise ValueError(msg)
    
            NetboxClient().allocate_interface(device_name=fqdn, iface_name=iface_name)
    
    
    @step("Allocate interfaces in Netbox for side A")
    def netbox_allocate_side_a_interfaces(subscription: IptrunkInactive) -> None:
        """Allocate the LAG interfaces for the Nokia router on side A."""
        _allocate_interfaces_in_netbox(subscription.iptrunk.iptrunk_sides[0])
    
    
    @step("Allocate interfaces in Netbox for side B")
    def netbox_allocate_side_b_interfaces(subscription: IptrunkInactive) -> None:
        """Allocate the LAG interfaces for the Nokia router on side B."""
        _allocate_interfaces_in_netbox(subscription.iptrunk.iptrunk_sides[1])
    
    
    @step("Create a new SharePoint checklist item")
    def create_new_sharepoint_checklist(subscription: IptrunkProvisioning, tt_number: str, process_id: UUIDstr) -> State:
        """Create a new checklist item in SharePoint for approving this IP trunk."""
        new_list_item_url = SharePointClient().add_list_item(
            list_name="ip_trunk",
            fields={
                "Title": f"{subscription.description} - {subscription.iptrunk.gs_id}",
                "TT_NUMBER": tt_number,
                "GAP_PROCESS_URL": f"{load_oss_params().GENERAL.public_hostname}/workflows/{process_id}",
                "ACTIVITY_TYPE": "Creation",
            },
        )
    
        return {"checklist_url": new_list_item_url}
    
    
    @workflow(
        "Create IP trunk",
        initial_input_form=wrap_create_initial_input_form(initial_input_form_generator),
        target=Target.CREATE,
    )
    def create_iptrunk() -> StepList:
        """Create a new IP trunk.
    
        * Create the subscription object in the database
        * Gather relevant information from Infoblox
        * Reserve interfaces in Netbox
        * Deploy configuration on the two sides of the trunk, first as a dry run
        * Check connectivity on the new trunk
        * Deploy the new ISIS metric on the trunk, first as a dry run
        * Verify ISIS adjacency
        * Allocate the interfaces in Netbox
        * Set the subscription to active in the database
        """
        side_a_is_nokia = conditional(lambda state: get_router_vendor(state["side_a_node_id"]) == Vendor.NOKIA)
        side_b_is_nokia = conditional(lambda state: get_router_vendor(state["side_b_node_id"]) == Vendor.NOKIA)
    
        assign_ip_networks = step_group(
            name="Assign IP networks",
            steps=(
                begin
                >> get_info_from_ipam
                >> check_existing_trunk_allocations
                >> dig_all_hosts_v4
                >> dig_all_hosts_v6
                >> ping_all_hosts_v4
                >> ping_all_hosts_v6
            ),
        )
    
        return (
            begin
            >> create_subscription
            >> store_process_subscription(Target.CREATE)
            >> initialize_subscription
            >> assign_ip_networks
            >> reserve_interfaces_in_netbox
            >> lso_interaction(provision_ip_trunk_iface_dry)
            >> lso_interaction(provision_ip_trunk_iface_real)
            >> lso_interaction(check_ip_trunk_connectivity)
            >> lso_interaction(provision_ip_trunk_isis_iface_dry)
            >> lso_interaction(provision_ip_trunk_isis_iface_real)
            >> lso_interaction(check_ip_trunk_isis)
            >> register_dns_records
            >> side_a_is_nokia(netbox_allocate_side_a_interfaces)
            >> side_b_is_nokia(netbox_allocate_side_b_interfaces)
            >> set_status(SubscriptionLifecycle.PROVISIONING)
            >> create_new_sharepoint_checklist
            >> prompt_sharepoint_checklist_url
            >> resync
            >> done
        )