create_iptrunk.py 15.88 KiB
"""A creation workflow that deploys a new IP trunk service."""
from uuid import uuid4
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Choice, UniqueConstrainedList
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr
from orchestrator.workflow import StepList, done, init, step, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from orchestrator.workflows.utils import wrap_create_initial_input_form
from pydantic import validator
from pynetbox.models.dcim import Interfaces
from gso.products.product_blocks.iptrunk import (
IptrunkInterfaceBlockInactive,
IptrunkType,
PhyPortCapacity,
)
from gso.products.product_blocks.router import RouterVendor
from gso.products.product_types.iptrunk import IptrunkInactive, IptrunkProvisioning
from gso.products.product_types.router import Router
from gso.services import infoblox, provisioning_proxy, subscriptions
from gso.services.crm import customer_selector
from gso.services.netbox_client import NetboxClient
from gso.services.provisioning_proxy import pp_interaction
from gso.utils.helpers import (
LAGMember,
available_interfaces_choices,
available_lags_choices,
get_router_vendor,
validate_interface_name_list,
validate_iptrunk_unique_interface,
validate_router_in_netbox,
)
def initial_input_form_generator(product_name: str) -> FormGenerator:
"""Gather input from the user in three steps. General information, and information on both sides of the trunk."""
# TODO: implement more strict validation:
# * interface names must be validated
routers = {}
for router in subscriptions.get_active_router_subscriptions(includes=["subscription_id", "description"]):
routers[str(router["subscription_id"])] = router["description"]
class CreateIptrunkForm(FormPage):
class Config:
title = product_name
tt_number: str
customer: customer_selector() # type: ignore[valid-type]
geant_s_sid: str
iptrunk_description: str
iptrunk_type: IptrunkType
iptrunk_speed: PhyPortCapacity
iptrunk_minimum_links: int
initial_user_input = yield CreateIptrunkForm
router_enum_a = Choice("Select a router", zip(routers.keys(), routers.items(), strict=True)) # type: ignore[arg-type]
class SelectRouterSideA(FormPage):
class Config:
title = "Select a router for side A of the trunk."
side_a_node_id: router_enum_a # type: ignore[valid-type]
@validator("side_a_node_id", allow_reuse=True)
def validate_device_exists_in_netbox(cls, side_a_node_id: UUIDstr) -> str | None:
return validate_router_in_netbox(side_a_node_id)
user_input_router_side_a = yield SelectRouterSideA
router_a = user_input_router_side_a.side_a_node_id.name
class JuniperAeMembers(UniqueConstrainedList[LAGMember]):
min_items = initial_user_input.iptrunk_minimum_links
if get_router_vendor(router_a) == RouterVendor.NOKIA:
class NokiaLAGMemberA(LAGMember):
interface_name: available_interfaces_choices( # type: ignore[valid-type]
router_a,
initial_user_input.iptrunk_speed,
)
class NokiaAeMembersA(UniqueConstrainedList[NokiaLAGMemberA]):
min_items = initial_user_input.iptrunk_minimum_links
ae_members_side_a = NokiaAeMembersA
else:
ae_members_side_a = JuniperAeMembers # type: ignore[assignment]
class CreateIptrunkSideAForm(FormPage):
class Config:
title = "Provide subscription details for side A of the trunk."
side_a_ae_iface: available_lags_choices(router_a) or str # type: ignore[valid-type]
side_a_ae_geant_a_sid: str
side_a_ae_members: ae_members_side_a # type: ignore[valid-type]
@validator("side_a_ae_members", allow_reuse=True)
def validate_iptrunk_unique_interface_side_a(cls, side_a_ae_members: list[LAGMember]) -> list[LAGMember]:
return validate_iptrunk_unique_interface(side_a_ae_members)
@validator("side_a_ae_members", allow_reuse=True)
def validate_interface_name_members(cls, side_a_ae_members: list[LAGMember]) -> list[LAGMember]:
ret_val = None
if get_router_vendor(router_a) == RouterVendor.NOKIA:
ret_val = side_a_ae_members
else:
ret_val = validate_interface_name_list(side_a_ae_members)
return ret_val
user_input_side_a = yield CreateIptrunkSideAForm
# Remove the selected router for side A, to prevent any loops
routers.pop(str(router_a))
router_enum_b = Choice("Select a router", zip(routers.keys(), routers.items(), strict=True)) # type: ignore[arg-type]
class SelectRouterSideB(FormPage):
class Config:
title = "Select a router for side B of the trunk."
side_b_node_id: router_enum_b # type: ignore[valid-type]
@validator("side_b_node_id", allow_reuse=True)
def validate_device_exists_in_netbox(cls, side_b_node_id: UUIDstr) -> str | None:
return validate_router_in_netbox(side_b_node_id)
user_input_router_side_b = yield SelectRouterSideB
router_b = user_input_router_side_b.side_b_node_id.name
if get_router_vendor(router_b) == RouterVendor.NOKIA:
class NokiaLAGMemberB(LAGMember):
interface_name: available_interfaces_choices( # type: ignore[valid-type]
router_b,
initial_user_input.iptrunk_speed,
)
class NokiaAeMembersB(UniqueConstrainedList):
min_items = len(user_input_side_a.side_a_ae_members)
max_items = len(user_input_side_a.side_a_ae_members)
item_type = NokiaLAGMemberB
ae_members_side_b = NokiaAeMembersB
else:
ae_members_side_b = JuniperAeMembers # type: ignore[assignment]
class CreateIptrunkSideBForm(FormPage):
class Config:
title = "Provide subscription details for side B of the trunk."
side_b_ae_iface: available_lags_choices(router_b) or str # type: ignore[valid-type]
side_b_ae_geant_a_sid: str
side_b_ae_members: ae_members_side_b # type: ignore[valid-type]
@validator("side_b_ae_members", allow_reuse=True)
def validate_iptrunk_unique_interface_side_b(cls, side_b_ae_members: list[LAGMember]) -> list[LAGMember]:
return validate_iptrunk_unique_interface(side_b_ae_members)
user_input_side_b = yield CreateIptrunkSideBForm
return (
initial_user_input.dict()
| user_input_router_side_a.dict()
| user_input_side_a.dict()
| user_input_router_side_b.dict()
| user_input_side_b.dict()
)
@step("Create subscription")
def create_subscription(product: UUIDstr, customer: UUIDstr) -> State:
"""Create a new subscription object in the database."""
subscription = IptrunkInactive.from_product_id(product, customer)
return {
"subscription": subscription,
"subscription_id": subscription.subscription_id,
}
@step("Get information from IPAM")
def get_info_from_ipam(subscription: IptrunkProvisioning) -> State:
"""Allocate IP resources in :term:`IPAM`."""
subscription.iptrunk.iptrunk_ipv4_network = infoblox.allocate_v4_network(
"TRUNK",
subscription.iptrunk.iptrunk_description,
)
subscription.iptrunk.iptrunk_ipv6_network = infoblox.allocate_v6_network(
"TRUNK",
subscription.iptrunk.iptrunk_description,
)
return {"subscription": subscription}
@step("Initialize subscription")
def initialize_subscription(
subscription: IptrunkInactive,
geant_s_sid: str,
iptrunk_type: IptrunkType,
iptrunk_description: str,
iptrunk_speed: PhyPortCapacity,
iptrunk_minimum_links: int,
side_a_node_id: str,
side_a_ae_iface: str,
side_a_ae_geant_a_sid: str,
side_a_ae_members: list[dict],
side_b_node_id: str,
side_b_ae_iface: str,
side_b_ae_geant_a_sid: str,
side_b_ae_members: list[dict],
) -> State:
"""Take all input from the user, and store it in the database."""
subscription.iptrunk.geant_s_sid = geant_s_sid
subscription.iptrunk.iptrunk_description = iptrunk_description
subscription.iptrunk.iptrunk_type = iptrunk_type
subscription.iptrunk.iptrunk_speed = iptrunk_speed
subscription.iptrunk.iptrunk_isis_metric = 90000
subscription.iptrunk.iptrunk_minimum_links = iptrunk_minimum_links
subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node = Router.from_subscription(side_a_node_id).router
subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_iface = side_a_ae_iface
subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_geant_a_sid = side_a_ae_geant_a_sid
for member in side_a_ae_members:
subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members.append(
IptrunkInterfaceBlockInactive.new(subscription_id=uuid4(), **member),
)
subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node = Router.from_subscription(side_b_node_id).router
subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_iface = side_b_ae_iface
subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_geant_a_sid = side_b_ae_geant_a_sid
for member in side_b_ae_members:
subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_members.append(
IptrunkInterfaceBlockInactive.new(subscription_id=uuid4(), **member),
)
subscription.description = f"IP trunk, geant_s_sid:{geant_s_sid}"
subscription = IptrunkProvisioning.from_other_lifecycle(subscription, SubscriptionLifecycle.PROVISIONING)
return {"subscription": subscription}
@step("Provision IP trunk interface [DRY RUN]")
def provision_ip_trunk_iface_dry(
subscription: IptrunkProvisioning,
callback_route: str,
process_id: UUIDstr,
tt_number: str,
) -> State:
"""Perform a dry run of deploying configuration on both sides of the trunk."""
provisioning_proxy.provision_ip_trunk(
subscription,
process_id,
callback_route,
tt_number,
"trunk_interface",
dry_run=True,
)
return {"subscription": subscription}
@step("Provision IP trunk interface [FOR REAL]")
def provision_ip_trunk_iface_real(
subscription: IptrunkProvisioning,
callback_route: str,
process_id: UUIDstr,
tt_number: str,
) -> State:
"""Deploy IP trunk configuration on both sides."""
provisioning_proxy.provision_ip_trunk(
subscription,
process_id,
callback_route,
tt_number,
"trunk_interface",
dry_run=False,
)
return {"subscription": subscription}
@step("Check IP connectivity of the trunk")
def check_ip_trunk_connectivity(
subscription: IptrunkProvisioning,
callback_route: str,
process_id: UUIDstr,
tt_number: str,
) -> State:
"""Check successful connectivity across the new trunk."""
provisioning_proxy.check_ip_trunk(subscription, process_id, callback_route, tt_number, "ping")
return {"subscription": subscription}
@step("Provision IP trunk ISIS interface [DRY RUN]")
def provision_ip_trunk_isis_iface_dry(
subscription: IptrunkProvisioning,
callback_route: str,
process_id: UUIDstr,
tt_number: str,
) -> State:
"""Perform a dry run of deploying :term:`ISIS` configuration."""
provisioning_proxy.provision_ip_trunk(subscription, process_id, callback_route, tt_number, "isis_interface")
return {"subscription": subscription}
@step("Provision IP trunk ISIS interface [FOR REAL]")
def provision_ip_trunk_isis_iface_real(
subscription: IptrunkProvisioning,
callback_route: str,
process_id: UUIDstr,
tt_number: str,
) -> State:
"""Deploy :term:`ISIS` configuration on both sides."""
provisioning_proxy.provision_ip_trunk(
subscription,
process_id,
callback_route,
tt_number,
"isis_interface",
dry_run=False,
)
return {"subscription": subscription}
@step("Check ISIS adjacency")
def check_ip_trunk_isis(
subscription: IptrunkProvisioning,
callback_route: str,
process_id: UUIDstr,
tt_number: str,
) -> State:
"""Run an Ansible playbook to confirm :term:`ISIS` adjacency."""
provisioning_proxy.check_ip_trunk(subscription, process_id, callback_route, tt_number, "isis")
return {"subscription": subscription}
@step("NextBox integration")
def reserve_interfaces_in_netbox(subscription: IptrunkProvisioning) -> State:
"""Create the :term:`LAG` interfaces in NetBox and attach the lag interfaces to the physical interfaces."""
nbclient = NetboxClient()
for trunk_side in subscription.iptrunk.iptrunk_sides:
if get_router_vendor(trunk_side.iptrunk_side_node.owner_subscription_id) == RouterVendor.NOKIA:
# Create :term:`LAG` interfaces
lag_interface: Interfaces = nbclient.create_interface(
iface_name=trunk_side.iptrunk_side_ae_iface,
interface_type="lag",
device_name=trunk_side.iptrunk_side_node.router_fqdn,
description=str(subscription.subscription_id),
enabled=True,
)
# Attach physical interfaces to :term:`LAG`
# Update interface description to subscription ID
# Reserve interfaces
for interface in trunk_side.iptrunk_side_ae_members:
nbclient.attach_interface_to_lag(
device_name=trunk_side.iptrunk_side_node.router_fqdn,
lag_name=lag_interface.name,
iface_name=interface.interface_name,
description=str(subscription.subscription_id),
)
nbclient.reserve_interface(
device_name=trunk_side.iptrunk_side_node.router_fqdn,
iface_name=interface.interface_name,
)
return {
"subscription": subscription,
}
@step("Allocate interfaces in Netbox")
def allocate_interfaces_in_netbox(subscription: IptrunkProvisioning) -> State:
"""Allocate the :term:`LAG` interfaces in NetBox and attach the lag interfaces to the physical interfaces."""
for trunk_side in subscription.iptrunk.iptrunk_sides:
if get_router_vendor(trunk_side.iptrunk_side_node.owner_subscription_id) == RouterVendor.NOKIA:
for interface in trunk_side.iptrunk_side_ae_members:
NetboxClient().allocate_interface(
device_name=trunk_side.iptrunk_side_node.router_fqdn,
iface_name=interface.interface_name,
)
return {
"subscription": subscription,
}
@workflow(
"Create IP trunk",
initial_input_form=wrap_create_initial_input_form(initial_input_form_generator),
target=Target.CREATE,
)
def create_iptrunk() -> StepList:
"""Create a new IP trunk.
* Create the subscription object in the database
* Gather relevant information from Infoblox
* Reserve interfaces in Netbox
* Deploy configuration on the two sides of the trunk, first as a dry run
* Check connectivity on the new trunk
* Deploy the new :term:`ISIS` metric on the trunk, first as a dry run
* Verify :term:`ISIS` adjacency
* Allocate the interfaces in Netbox
* Set the subscription to active in the database
"""
return (
init
>> create_subscription
>> store_process_subscription(Target.CREATE)
>> initialize_subscription
>> get_info_from_ipam
>> reserve_interfaces_in_netbox
>> pp_interaction(provision_ip_trunk_iface_dry)
>> pp_interaction(provision_ip_trunk_iface_real)
>> pp_interaction(check_ip_trunk_connectivity)
>> pp_interaction(provision_ip_trunk_isis_iface_dry)
>> pp_interaction(provision_ip_trunk_isis_iface_real)
>> pp_interaction(check_ip_trunk_isis)
>> allocate_interfaces_in_netbox
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
)