-
Karel van Klink authoredKarel van Klink authored
create_iptrunk.py 13.94 KiB
from uuid import uuid4
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Choice, UniqueConstrainedList
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr
from orchestrator.workflow import StepList, done, init, step, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from orchestrator.workflows.utils import wrap_create_initial_input_form
from pydantic import validator
from pynetbox.models.dcim import Interfaces
from gso.products.product_blocks.iptrunk import (
IptrunkInterfaceBlockInactive,
IptrunkType,
PhyPortCapacity,
)
from gso.products.product_blocks.router import RouterVendor
from gso.products.product_types.iptrunk import IptrunkInactive, IptrunkProvisioning
from gso.products.product_types.router import Router
from gso.services import infoblox, provisioning_proxy, subscriptions
from gso.services.crm import customer_selector
from gso.services.netbox_client import NetboxClient
from gso.services.provisioning_proxy import pp_interaction
from gso.utils.helpers import (
LAGMember,
available_interfaces_choices,
available_lags_choices,
get_router_vendor,
validate_iptrunk_unique_interface,
validate_router_in_netbox,
)
def initial_input_form_generator(product_name: str) -> FormGenerator:
# TODO: implement more strict validation:
# * interface names must be validated
routers = {}
for router in subscriptions.get_active_router_subscriptions(includes=["subscription_id", "description"]):
routers[str(router["subscription_id"])] = router["description"]
class CreateIptrunkForm(FormPage):
class Config:
title = product_name
tt_number: str
customer: customer_selector() # type: ignore[valid-type]
geant_s_sid: str
iptrunk_description: str
iptrunk_type: IptrunkType
iptrunk_speed: PhyPortCapacity
iptrunk_minimum_links: int
initial_user_input = yield CreateIptrunkForm
router_enum_a = Choice("Select a router", zip(routers.keys(), routers.items(), strict=True)) # type: ignore[arg-type]
class SelectRouterSideA(FormPage):
class Config:
title = "Select a router for side A of the trunk."
side_a_node_id: router_enum_a # type: ignore[valid-type]
@validator("side_a_node_id", allow_reuse=True)
def validate_device_exists_in_netbox(cls, side_a_node_id: UUIDstr) -> str | None:
return validate_router_in_netbox(side_a_node_id)
user_input_router_side_a = yield SelectRouterSideA
router_a = user_input_router_side_a.side_a_node_id.name
class JuniperAeMembers(UniqueConstrainedList[LAGMember]):
min_items = initial_user_input.iptrunk_minimum_links
if get_router_vendor(router_a) == RouterVendor.NOKIA:
class NokiaLAGMemberA(LAGMember):
interface_name: available_interfaces_choices( # type: ignore[valid-type]
router_a,
initial_user_input.iptrunk_speed,
)
class NokiaAeMembersA(UniqueConstrainedList[NokiaLAGMemberA]):
min_items = initial_user_input.iptrunk_minimum_links
ae_members_side_a = NokiaAeMembersA
else:
ae_members_side_a = JuniperAeMembers # type: ignore[assignment]
class CreateIptrunkSideAForm(FormPage):
class Config:
title = "Provide subscription details for side A of the trunk."
side_a_ae_iface: available_lags_choices(router_a) or str # type: ignore[valid-type]
side_a_ae_geant_a_sid: str
side_a_ae_members: ae_members_side_a # type: ignore[valid-type]
@validator("side_a_ae_members", allow_reuse=True)
def validate_iptrunk_unique_interface_side_a(cls, side_a_ae_members: list[LAGMember]) -> list[LAGMember]:
return validate_iptrunk_unique_interface(side_a_ae_members)
user_input_side_a = yield CreateIptrunkSideAForm
# Remove the selected router for side A, to prevent any loops
routers.pop(str(router_a))
router_enum_b = Choice("Select a router", zip(routers.keys(), routers.items(), strict=True)) # type: ignore[arg-type]
class SelectRouterSideB(FormPage):
class Config:
title = "Select a router for side B of the trunk."
side_b_node_id: router_enum_b # type: ignore[valid-type]
@validator("side_b_node_id", allow_reuse=True)
def validate_device_exists_in_netbox(cls, side_b_node_id: UUIDstr) -> str | None:
return validate_router_in_netbox(side_b_node_id)
user_input_router_side_b = yield SelectRouterSideB
router_b = user_input_router_side_b.side_b_node_id.name
if get_router_vendor(router_b) == RouterVendor.NOKIA:
class NokiaLAGMemberB(LAGMember):
interface_name: available_interfaces_choices( # type: ignore[valid-type]
router_b,
initial_user_input.iptrunk_speed,
)
class NokiaAeMembersB(UniqueConstrainedList):
min_items = len(user_input_side_a.side_a_ae_members)
max_items = len(user_input_side_a.side_a_ae_members)
item_type = NokiaLAGMemberB
ae_members_side_b = NokiaAeMembersB
else:
ae_members_side_b = JuniperAeMembers # type: ignore[assignment]
class CreateIptrunkSideBForm(FormPage):
class Config:
title = "Provide subscription details for side B of the trunk."
side_b_ae_iface: available_lags_choices(router_b) or str # type: ignore[valid-type]
side_b_ae_geant_a_sid: str
side_b_ae_members: ae_members_side_b # type: ignore[valid-type]
@validator("side_b_ae_members", allow_reuse=True)
def validate_iptrunk_unique_interface_side_b(cls, side_b_ae_members: list[LAGMember]) -> list[LAGMember]:
return validate_iptrunk_unique_interface(side_b_ae_members)
user_input_side_b = yield CreateIptrunkSideBForm
return (
initial_user_input.dict()
| user_input_router_side_a.dict()
| user_input_side_a.dict()
| user_input_router_side_b.dict()
| user_input_side_b.dict()
)
@step("Create subscription")
def create_subscription(product: UUIDstr, customer: UUIDstr) -> State:
subscription = IptrunkInactive.from_product_id(product, customer)
return {
"subscription": subscription,
"subscription_id": subscription.subscription_id,
}
@step("Get information from IPAM")
def get_info_from_ipam(subscription: IptrunkProvisioning) -> State:
subscription.iptrunk.iptrunk_ipv4_network = infoblox.allocate_v4_network(
"TRUNK",
subscription.iptrunk.iptrunk_description,
)
subscription.iptrunk.iptrunk_ipv6_network = infoblox.allocate_v6_network(
"TRUNK",
subscription.iptrunk.iptrunk_description,
)
return {"subscription": subscription}
@step("Initialize subscription")
def initialize_subscription(
subscription: IptrunkInactive,
geant_s_sid: str,
iptrunk_type: IptrunkType,
iptrunk_description: str,
iptrunk_speed: PhyPortCapacity,
iptrunk_minimum_links: int,
side_a_node_id: str,
side_a_ae_iface: str,
side_a_ae_geant_a_sid: str,
side_a_ae_members: list[dict],
side_b_node_id: str,
side_b_ae_iface: str,
side_b_ae_geant_a_sid: str,
side_b_ae_members: list[dict],
) -> State:
subscription.iptrunk.geant_s_sid = geant_s_sid
subscription.iptrunk.iptrunk_description = iptrunk_description
subscription.iptrunk.iptrunk_type = iptrunk_type
subscription.iptrunk.iptrunk_speed = iptrunk_speed
subscription.iptrunk.iptrunk_isis_metric = 90000
subscription.iptrunk.iptrunk_minimum_links = iptrunk_minimum_links
subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node = Router.from_subscription(side_a_node_id).router
subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_iface = side_a_ae_iface
subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_geant_a_sid = side_a_ae_geant_a_sid
for member in side_a_ae_members:
subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members.append(
IptrunkInterfaceBlockInactive.new(subscription_id=uuid4(), **member),
)
subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node = Router.from_subscription(side_b_node_id).router
subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_iface = side_b_ae_iface
subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_geant_a_sid = side_b_ae_geant_a_sid
for member in side_b_ae_members:
subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_members.append(
IptrunkInterfaceBlockInactive.new(subscription_id=uuid4(), **member),
)
subscription.description = f"IP trunk, geant_s_sid:{geant_s_sid}"
subscription = IptrunkProvisioning.from_other_lifecycle(subscription, SubscriptionLifecycle.PROVISIONING)
return {"subscription": subscription}
@step("Provision IP trunk interface [DRY RUN]")
def provision_ip_trunk_iface_dry(
subscription: IptrunkProvisioning,
callback_route: str,
process_id: UUIDstr,
tt_number: str,
) -> State:
provisioning_proxy.provision_ip_trunk(subscription, process_id, callback_route, tt_number, "trunk_interface", True)
return {"subscription": subscription}
@step("Provision IP trunk interface [FOR REAL]")
def provision_ip_trunk_iface_real(
subscription: IptrunkProvisioning,
callback_route: str,
process_id: UUIDstr,
tt_number: str,
) -> State:
provisioning_proxy.provision_ip_trunk(subscription, process_id, callback_route, tt_number, "trunk_interface", False)
return {"subscription": subscription}
@step("Check IP connectivity of the trunk")
def check_ip_trunk_connectivity(
subscription: IptrunkProvisioning,
callback_route: str,
process_id: UUIDstr,
tt_number: str,
) -> State:
provisioning_proxy.check_ip_trunk(subscription, process_id, callback_route, tt_number, "ping")
return {"subscription": subscription}
@step("Provision IP trunk ISIS interface [DRY RUN]")
def provision_ip_trunk_isis_iface_dry(
subscription: IptrunkProvisioning,
callback_route: str,
process_id: UUIDstr,
tt_number: str,
) -> State:
provisioning_proxy.provision_ip_trunk(subscription, process_id, callback_route, tt_number, "isis_interface")
return {"subscription": subscription}
@step("Provision IP trunk ISIS interface [FOR REAL]")
def provision_ip_trunk_isis_iface_real(
subscription: IptrunkProvisioning,
callback_route: str,
process_id: UUIDstr,
tt_number: str,
) -> State:
provisioning_proxy.provision_ip_trunk(subscription, process_id, callback_route, tt_number, "isis_interface", False)
return {"subscription": subscription}
@step("Check ISIS adjacency")
def check_ip_trunk_isis(
subscription: IptrunkProvisioning,
callback_route: str,
process_id: UUIDstr,
tt_number: str,
) -> State:
provisioning_proxy.check_ip_trunk(subscription, process_id, callback_route, tt_number, "isis")
return {"subscription": subscription}
@step("NextBox integration")
def reserve_interfaces_in_netbox(subscription: IptrunkProvisioning) -> State:
"""Create the LAG interfaces in NetBox and attach the lag interfaces to the physical interfaces."""
nbclient = NetboxClient()
for trunk_side in subscription.iptrunk.iptrunk_sides:
if trunk_side.iptrunk_side_node.router_vendor == RouterVendor.NOKIA:
# Create LAG interfaces
lag_interface: Interfaces = nbclient.create_interface(
iface_name=trunk_side.iptrunk_side_ae_iface,
type="lag",
device_name=trunk_side.iptrunk_side_node.router_fqdn,
description=str(subscription.subscription_id),
enabled=True,
)
# Attach physical interfaces to LAG
# Update interface description to subscription ID
# Reserve interfaces
for interface in trunk_side.iptrunk_side_ae_members:
nbclient.attach_interface_to_lag(
device_name=trunk_side.iptrunk_side_node.router_fqdn,
lag_name=lag_interface.name,
iface_name=interface.interface_name,
description=str(subscription.subscription_id),
)
nbclient.reserve_interface(
device_name=trunk_side.iptrunk_side_node.router_fqdn,
iface_name=interface.interface_name,
)
return {
"subscription": subscription,
}
@step("Allocate interfaces in Netbox")
def allocate_interfaces_in_netbox(subscription: IptrunkProvisioning) -> State:
"""Allocate the LAG interfaces in NetBox and attach the lag interfaces to the physical interfaces."""
for trunk_side in subscription.iptrunk.iptrunk_sides:
if trunk_side.iptrunk_side_node.router_vendor == RouterVendor.NOKIA:
for interface in trunk_side.iptrunk_side_ae_members:
NetboxClient().allocate_interface(
device_name=trunk_side.iptrunk_side_node.router_fqdn,
iface_name=interface.interface_name,
)
return {
"subscription": subscription,
}
@workflow(
"Create IP trunk",
initial_input_form=wrap_create_initial_input_form(initial_input_form_generator),
target=Target.CREATE,
)
def create_iptrunk() -> StepList:
return (
init
>> create_subscription
>> store_process_subscription(Target.CREATE)
>> initialize_subscription
>> get_info_from_ipam
>> reserve_interfaces_in_netbox
>> pp_interaction(provision_ip_trunk_iface_dry)
>> pp_interaction(provision_ip_trunk_iface_real)
>> pp_interaction(check_ip_trunk_connectivity)
>> pp_interaction(provision_ip_trunk_isis_iface_dry)
>> pp_interaction(provision_ip_trunk_isis_iface_real)
>> pp_interaction(check_ip_trunk_isis)
>> allocate_interfaces_in_netbox
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
)