Skip to content
Snippets Groups Projects
create_iptrunk.py 21.33 KiB
"""A creation workflow that deploys a new IP trunk service."""

import json
from uuid import uuid4

from orchestrator.config.assignee import Assignee
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Choice, Label, UniqueConstrainedList
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr
from orchestrator.utils.json import json_dumps
from orchestrator.workflow import StepList, conditional, done, init, inputstep, step, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from orchestrator.workflows.utils import wrap_create_initial_input_form
from pydantic import validator
from pydantic_forms.core import ReadOnlyField
from pynetbox.models.dcim import Interfaces

from gso.products.product_blocks.iptrunk import (
    IptrunkInterfaceBlockInactive,
    IptrunkSideBlockInactive,
    IptrunkType,
    PhysicalPortCapacity,
)
from gso.products.product_types.iptrunk import IptrunkInactive, IptrunkProvisioning
from gso.products.product_types.router import Router
from gso.services import infoblox, subscriptions
from gso.services.lso_client import execute_playbook, lso_interaction
from gso.services.netbox_client import NetboxClient
from gso.services.partners import get_partner_by_name
from gso.settings import load_oss_params
from gso.utils.helpers import (
    LAGMember,
    available_interfaces_choices,
    available_lags_choices,
    get_router_vendor,
    validate_interface_name_list,
    validate_iptrunk_unique_interface,
    validate_router_in_netbox,
    validate_tt_number,
)
from gso.utils.shared_enums import Vendor


def initial_input_form_generator(product_name: str) -> FormGenerator:
    """Gather input from the user in three steps. General information, and information on both sides of the trunk."""
    routers = {}
    for router in subscriptions.get_active_router_subscriptions(
        includes=["subscription_id", "description"]
    ) + subscriptions.get_provisioning_router_subscriptions(includes=["subscription_id", "description"]):
        #  Add both provisioning and active routers, since trunks are required for promoting a router to active.
        routers[str(router["subscription_id"])] = router["description"]

    class CreateIptrunkForm(FormPage):
        class Config:
            title = product_name

        tt_number: str
        partner: str = ReadOnlyField("GEANT")
        geant_s_sid: str | None
        iptrunk_description: str
        iptrunk_type: IptrunkType
        iptrunk_speed: PhysicalPortCapacity
        iptrunk_minimum_links: int

        @validator("tt_number", allow_reuse=True)
        def validate_tt_number(cls, tt_number: str) -> str:
            return validate_tt_number(tt_number)

    initial_user_input = yield CreateIptrunkForm

    router_enum_a = Choice("Select a router", zip(routers.keys(), routers.items(), strict=True))  # type: ignore[arg-type]

    class SelectRouterSideA(FormPage):
        class Config:
            title = "Select a router for side A of the trunk."

        side_a_node_id: router_enum_a  # type: ignore[valid-type]

        @validator("side_a_node_id", allow_reuse=True)
        def validate_device_exists_in_netbox(cls, side_a_node_id: UUIDstr) -> str | None:
            return validate_router_in_netbox(side_a_node_id)

    user_input_router_side_a = yield SelectRouterSideA
    router_a = user_input_router_side_a.side_a_node_id.name
    router_a_fqdn = Router.from_subscription(router_a).router.router_fqdn

    class JuniperAeMembers(UniqueConstrainedList[LAGMember]):
        min_items = initial_user_input.iptrunk_minimum_links

    if get_router_vendor(router_a) == Vendor.NOKIA:

        class NokiaLAGMemberA(LAGMember):
            interface_name: available_interfaces_choices(  # type: ignore[valid-type]
                router_a,
                initial_user_input.iptrunk_speed,
            )

        class NokiaAeMembersA(UniqueConstrainedList[NokiaLAGMemberA]):
            min_items = initial_user_input.iptrunk_minimum_links

        ae_members_side_a = NokiaAeMembersA
    else:
        ae_members_side_a = JuniperAeMembers  # type: ignore[assignment]

    class CreateIptrunkSideAForm(FormPage):
        class Config:
            title = f"Provide subscription details for side A of the trunk.({router_a_fqdn})"

        side_a_ae_iface: available_lags_choices(router_a) or str  # type: ignore[valid-type]
        side_a_ae_geant_a_sid: str | None
        side_a_ae_members: ae_members_side_a  # type: ignore[valid-type]

        @validator("side_a_ae_members", allow_reuse=True)
        def validate_side_a_ae_members(cls, side_a_ae_members: list[LAGMember]) -> list[LAGMember]:
            validate_iptrunk_unique_interface(side_a_ae_members)
            vendor = get_router_vendor(router_a)
            validate_interface_name_list(side_a_ae_members, vendor)
            return side_a_ae_members

    user_input_side_a = yield CreateIptrunkSideAForm
    # Remove the selected router for side A, to prevent any loops
    routers.pop(str(router_a))
    router_enum_b = Choice("Select a router", zip(routers.keys(), routers.items(), strict=True))  # type: ignore[arg-type]

    class SelectRouterSideB(FormPage):
        class Config:
            title = "Select a router for side B of the trunk."

        side_b_node_id: router_enum_b  # type: ignore[valid-type]

        @validator("side_b_node_id", allow_reuse=True)
        def validate_device_exists_in_netbox(cls, side_b_node_id: UUIDstr) -> str | None:
            return validate_router_in_netbox(side_b_node_id)

    user_input_router_side_b = yield SelectRouterSideB
    router_b = user_input_router_side_b.side_b_node_id.name
    router_b_fqdn = Router.from_subscription(router_b).router.router_fqdn

    if get_router_vendor(router_b) == Vendor.NOKIA:

        class NokiaLAGMemberB(LAGMember):
            interface_name: available_interfaces_choices(  # type: ignore[valid-type]
                router_b,
                initial_user_input.iptrunk_speed,
            )

        class NokiaAeMembersB(UniqueConstrainedList):
            min_items = len(user_input_side_a.side_a_ae_members)
            max_items = len(user_input_side_a.side_a_ae_members)
            item_type = NokiaLAGMemberB

        ae_members_side_b = NokiaAeMembersB
    else:
        ae_members_side_b = JuniperAeMembers  # type: ignore[assignment]

    class CreateIptrunkSideBForm(FormPage):
        class Config:
            title = f"Provide subscription details for side B of the trunk.({router_b_fqdn})"

        side_b_ae_iface: available_lags_choices(router_b) or str  # type: ignore[valid-type]
        side_b_ae_geant_a_sid: str | None
        side_b_ae_members: ae_members_side_b  # type: ignore[valid-type]

        @validator("side_b_ae_members", allow_reuse=True)
        def validate_side_b_ae_members(cls, side_b_ae_members: list[LAGMember]) -> list[LAGMember]:
            validate_iptrunk_unique_interface(side_b_ae_members)
            vendor = get_router_vendor(router_b)
            validate_interface_name_list(side_b_ae_members, vendor)
            return side_b_ae_members

    user_input_side_b = yield CreateIptrunkSideBForm

    return (
        initial_user_input.dict()
        | user_input_router_side_a.dict()
        | user_input_side_a.dict()
        | user_input_router_side_b.dict()
        | user_input_side_b.dict()
    )


@step("Create subscription")
def create_subscription(product: UUIDstr, partner: str) -> State:
    """Create a new subscription object in the database."""
    subscription = IptrunkInactive.from_product_id(product, get_partner_by_name(partner)["partner_id"])

    return {
        "subscription": subscription,
        "subscription_id": subscription.subscription_id,
    }


@step("Get information from IPAM")
def get_info_from_ipam(subscription: IptrunkInactive) -> State:
    """Allocate IP resources in :term:`IPAM`."""
    subscription.iptrunk.iptrunk_ipv4_network = infoblox.allocate_v4_network(
        "TRUNK",
        subscription.iptrunk.iptrunk_description,
    )
    subscription.iptrunk.iptrunk_ipv6_network = infoblox.allocate_v6_network(
        "TRUNK",
        subscription.iptrunk.iptrunk_description,
    )

    return {"subscription": subscription}


@step("Initialize subscription")
def initialize_subscription(
    subscription: IptrunkInactive,
    geant_s_sid: str | None,
    iptrunk_type: IptrunkType,
    iptrunk_description: str,
    iptrunk_speed: PhysicalPortCapacity,
    iptrunk_minimum_links: int,
    side_a_node_id: str,
    side_a_ae_iface: str,
    side_a_ae_geant_a_sid: str | None,
    side_a_ae_members: list[dict],
    side_b_node_id: str,
    side_b_ae_iface: str,
    side_b_ae_geant_a_sid: str | None,
    side_b_ae_members: list[dict],
) -> State:
    """Take all input from the user, and store it in the database."""
    oss_params = load_oss_params()
    side_a = Router.from_subscription(side_a_node_id).router
    side_b = Router.from_subscription(side_b_node_id).router
    subscription.iptrunk.geant_s_sid = geant_s_sid
    subscription.iptrunk.iptrunk_description = iptrunk_description
    subscription.iptrunk.iptrunk_type = iptrunk_type
    subscription.iptrunk.iptrunk_speed = iptrunk_speed
    subscription.iptrunk.iptrunk_isis_metric = oss_params.GENERAL.isis_high_metric
    subscription.iptrunk.iptrunk_minimum_links = iptrunk_minimum_links

    subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node = side_a
    subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_iface = side_a_ae_iface
    subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_geant_a_sid = side_a_ae_geant_a_sid
    for member in side_a_ae_members:
        subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members.append(
            IptrunkInterfaceBlockInactive.new(subscription_id=uuid4(), **member),
        )

    subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node = side_b
    subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_iface = side_b_ae_iface
    subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_geant_a_sid = side_b_ae_geant_a_sid
    for member in side_b_ae_members:
        subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_members.append(
            IptrunkInterfaceBlockInactive.new(subscription_id=uuid4(), **member),
        )
    side_names = sorted([side_a.router_site.site_name, side_b.router_site.site_name])
    subscription.description = f"IP trunk {side_names[0]} {side_names[1]}, geant_s_sid:{geant_s_sid}"

    return {"subscription": subscription}


@step("[DRY RUN] Provision IP trunk interface")
def provision_ip_trunk_iface_dry(
    subscription: IptrunkInactive,
    callback_route: str,
    process_id: UUIDstr,
    tt_number: str,
) -> State:
    """Perform a dry run of deploying configuration on both sides of the trunk."""
    extra_vars = {
        "wfo_trunk_json": json.loads(json_dumps(subscription)),
        "dry_run": True,
        "verb": "deploy",
        "config_object": "trunk_interface",
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy config for "
        f"{subscription.iptrunk.geant_s_sid}",
    }

    execute_playbook(
        playbook_name="iptrunks.yaml",
        callback_route=callback_route,
        inventory=f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}\n"
        f"{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}\n",
        extra_vars=extra_vars,
    )

    return {"subscription": subscription}


@step("[FOR REAL] Provision IP trunk interface")
def provision_ip_trunk_iface_real(
    subscription: IptrunkInactive,
    callback_route: str,
    process_id: UUIDstr,
    tt_number: str,
) -> State:
    """Deploy IP trunk configuration on both sides."""
    extra_vars = {
        "wfo_trunk_json": json.loads(json_dumps(subscription)),
        "dry_run": False,
        "verb": "deploy",
        "config_object": "trunk_interface",
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy config for "
        f"{subscription.iptrunk.geant_s_sid}",
    }

    execute_playbook(
        playbook_name="iptrunks.yaml",
        callback_route=callback_route,
        inventory=f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}\n"
        f"{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}\n",
        extra_vars=extra_vars,
    )

    return {"subscription": subscription}


@step("Check IP connectivity of the trunk")
def check_ip_trunk_connectivity(
    subscription: IptrunkInactive,
    callback_route: str,
) -> State:
    """Check successful connectivity across the new trunk."""
    extra_vars = {"wfo_ip_trunk_json": json.loads(json_dumps(subscription)), "check": "ping"}

    execute_playbook(
        playbook_name="iptrunks_checks.yaml",
        callback_route=callback_route,
        inventory=subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn,
        extra_vars=extra_vars,
    )

    return {"subscription": subscription}


@step("[DRY RUN] Provision IP trunk ISIS interface")
def provision_ip_trunk_isis_iface_dry(
    subscription: IptrunkInactive,
    callback_route: str,
    process_id: UUIDstr,
    tt_number: str,
) -> State:
    """Perform a dry run of deploying :term:`ISIS` configuration."""
    extra_vars = {
        "wfo_trunk_json": json.loads(json_dumps(subscription)),
        "dry_run": True,
        "verb": "deploy",
        "config_object": "isis_interface",
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy config for "
        f"{subscription.iptrunk.geant_s_sid}",
    }

    execute_playbook(
        playbook_name="iptrunks.yaml",
        callback_route=callback_route,
        inventory=f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}\n"
        f"{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}\n",
        extra_vars=extra_vars,
    )

    return {"subscription": subscription}


@step("[FOR REAL] Provision IP trunk ISIS interface")
def provision_ip_trunk_isis_iface_real(
    subscription: IptrunkInactive,
    callback_route: str,
    process_id: UUIDstr,
    tt_number: str,
) -> State:
    """Deploy :term:`ISIS` configuration on both sides."""
    extra_vars = {
        "wfo_trunk_json": json.loads(json_dumps(subscription)),
        "dry_run": False,
        "verb": "deploy",
        "config_object": "isis_interface",
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy config for "
        f"{subscription.iptrunk.geant_s_sid}",
    }

    execute_playbook(
        playbook_name="iptrunks.yaml",
        callback_route=callback_route,
        inventory=f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}\n"
        f"{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}\n",
        extra_vars=extra_vars,
    )

    return {"subscription": subscription}


@step("Check ISIS adjacency")
def check_ip_trunk_isis(
    subscription: IptrunkInactive,
    callback_route: str,
) -> State:
    """Run an Ansible playbook to confirm :term:`ISIS` adjacency."""
    extra_vars = {"wfo_ip_trunk_json": json.loads(json_dumps(subscription)), "check": "isis"}

    execute_playbook(
        playbook_name="iptrunks_checks.yaml",
        callback_route=callback_route,
        inventory=subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn,
        extra_vars=extra_vars,
    )

    return {"subscription": subscription}


@step("Register DNS records for both sides of the trunk")
def register_dns_records(subscription: IptrunkInactive) -> State:
    """Register :term:`DNS` records for both sides of the newly created IPtrunk."""
    for index, side in enumerate(subscription.iptrunk.iptrunk_sides):
        fqdn = f"{side.iptrunk_side_ae_iface}-0.{side.iptrunk_side_node.router_fqdn}"
        if not (subscription.iptrunk.iptrunk_ipv4_network and subscription.iptrunk.iptrunk_ipv6_network):
            msg = f"Missing IP resources in trunk, cannot allocate DNS record for side {fqdn}!"
            raise ValueError(msg)
        ipv4_addr = subscription.iptrunk.iptrunk_ipv4_network[index]
        ipv6_addr = subscription.iptrunk.iptrunk_ipv6_network[index + 1]

        infoblox.create_host_by_ip(fqdn, ipv4_addr, ipv6_addr, "TRUNK", str(subscription.subscription_id))

    return {"subscription": subscription}


@step("NextBox integration")
def reserve_interfaces_in_netbox(subscription: IptrunkInactive) -> State:
    """Create the :term:`LAG` interfaces in NetBox and attach the lag interfaces to the physical interfaces."""
    nbclient = NetboxClient()
    for trunk_side in subscription.iptrunk.iptrunk_sides:
        if get_router_vendor(trunk_side.iptrunk_side_node.owner_subscription_id) == Vendor.NOKIA:
            # Create :term:`LAG` interfaces
            lag_interface: Interfaces = nbclient.create_interface(
                iface_name=trunk_side.iptrunk_side_ae_iface,
                interface_type="lag",
                device_name=trunk_side.iptrunk_side_node.router_fqdn,
                description=str(subscription.subscription_id),
                enabled=True,
            )
            # Attach physical interfaces to :term:`LAG`
            # Update interface description to subscription ID
            # Reserve interfaces
            for interface in trunk_side.iptrunk_side_ae_members:
                nbclient.attach_interface_to_lag(
                    device_name=trunk_side.iptrunk_side_node.router_fqdn,
                    lag_name=lag_interface.name,
                    iface_name=interface.interface_name,
                    description=str(subscription.subscription_id),
                )
                nbclient.reserve_interface(
                    device_name=trunk_side.iptrunk_side_node.router_fqdn,
                    iface_name=interface.interface_name,
                )
    return {
        "subscription": subscription,
    }


def _allocate_interfaces_in_netbox(iptrunk_side: IptrunkSideBlockInactive) -> None:
    for interface in iptrunk_side.iptrunk_side_ae_members:
        fqdn = iptrunk_side.iptrunk_side_node.router_fqdn
        iface_name = interface.interface_name
        if not fqdn or not iface_name:
            msg = f"FQDN and/or interface name missing in subscription {interface.owner_subscription_id}"
            raise ValueError(msg)

        NetboxClient().allocate_interface(device_name=fqdn, iface_name=iface_name)


@step("Allocate interfaces in Netbox for side A")
def netbox_allocate_side_a_interfaces(subscription: IptrunkInactive) -> None:
    """Allocate the :term:`LAG` interfaces for the Nokia router on side A."""
    _allocate_interfaces_in_netbox(subscription.iptrunk.iptrunk_sides[0])


@step("Allocate interfaces in Netbox for side B")
def netbox_allocate_side_b_interfaces(subscription: IptrunkInactive) -> None:
    """Allocate the :term:`LAG` interfaces for the Nokia router on side B."""
    _allocate_interfaces_in_netbox(subscription.iptrunk.iptrunk_sides[1])


@inputstep("Prompt for new Sharepoint checklist", assignee=Assignee.SYSTEM)
def prompt_start_new_checklist(subscription: IptrunkProvisioning) -> FormGenerator:
    """Prompt the operator to start a new checklist in Sharepoint for approving this new IP trunk."""
    oss_params = load_oss_params()

    class SharepointPrompt(FormPage):
        class Config:
            title = "Start new checklist"

        info_label_1: Label = (
            f"Visit {oss_params.SHAREPOINT.checklist_site_url} and start a new Sharepoint checklist for an IPtrunk "  # type: ignore[assignment]
            f"from {subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn} to "
            f"{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}."
        )
        info_label_2: Label = "Once this is done, click proceed to finish the workflow."  # type: ignore[assignment]

    yield SharepointPrompt

    return {}


@workflow(
    "Create IP trunk",
    initial_input_form=wrap_create_initial_input_form(initial_input_form_generator),
    target=Target.CREATE,
)
def create_iptrunk() -> StepList:
    """Create a new IP trunk.

    * Create the subscription object in the database
    * Gather relevant information from Infoblox
    * Reserve interfaces in Netbox
    * Deploy configuration on the two sides of the trunk, first as a dry run
    * Check connectivity on the new trunk
    * Deploy the new :term:`ISIS` metric on the trunk, first as a dry run
    * Verify :term:`ISIS` adjacency
    * Allocate the interfaces in Netbox
    * Set the subscription to active in the database
    """
    side_a_is_nokia = conditional(lambda state: get_router_vendor(state["side_a_node_id"]) == Vendor.NOKIA)
    side_b_is_nokia = conditional(lambda state: get_router_vendor(state["side_b_node_id"]) == Vendor.NOKIA)

    return (
        init
        >> create_subscription
        >> store_process_subscription(Target.CREATE)
        >> initialize_subscription
        >> get_info_from_ipam
        >> reserve_interfaces_in_netbox
        >> lso_interaction(provision_ip_trunk_iface_dry)
        >> lso_interaction(provision_ip_trunk_iface_real)
        >> lso_interaction(check_ip_trunk_connectivity)
        >> lso_interaction(provision_ip_trunk_isis_iface_dry)
        >> lso_interaction(provision_ip_trunk_isis_iface_real)
        >> lso_interaction(check_ip_trunk_isis)
        >> register_dns_records
        >> side_a_is_nokia(netbox_allocate_side_a_interfaces)
        >> side_b_is_nokia(netbox_allocate_side_b_interfaces)
        >> set_status(SubscriptionLifecycle.PROVISIONING)
        >> prompt_start_new_checklist
        >> resync
        >> done
    )