create_iptrunk.py 22.04 KiB
"""A creation workflow that deploys a new IP trunk service."""
import json
from typing import Annotated
from uuid import uuid4
from annotated_types import Len
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Choice, Label
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr
from orchestrator.utils.json import json_dumps
from orchestrator.workflow import StepList, begin, conditional, done, step, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from orchestrator.workflows.utils import wrap_create_initial_input_form
from pydantic import AfterValidator, ConfigDict, field_validator
from pydantic_forms.validators import ReadOnlyField, validate_unique_list
from pynetbox.models.dcim import Interfaces
from gso.products.product_blocks.iptrunk import (
IptrunkInterfaceBlockInactive,
IptrunkSideBlockInactive,
IptrunkType,
PhysicalPortCapacity,
)
from gso.products.product_types.iptrunk import IptrunkInactive, IptrunkProvisioning
from gso.products.product_types.router import Router
from gso.services import infoblox, subscriptions
from gso.services.lso_client import execute_playbook, lso_interaction
from gso.services.netbox_client import NetboxClient
from gso.services.partners import get_partner_by_name
from gso.services.sharepoint import SharePointClient
from gso.settings import load_oss_params
from gso.utils.helpers import (
LAGMember,
available_interfaces_choices,
available_lags_choices,
get_router_vendor,
validate_interface_name_list,
validate_iptrunk_unique_interface,
validate_router_in_netbox,
validate_tt_number,
)
from gso.utils.shared_enums import Vendor
from gso.utils.workflow_steps import prompt_sharepoint_checklist_url
def initial_input_form_generator(product_name: str) -> FormGenerator:
"""Gather input from the user in three steps. General information, and information on both sides of the trunk."""
routers = {}
for router in subscriptions.get_active_router_subscriptions(
includes=["subscription_id", "description"]
) + subscriptions.get_provisioning_router_subscriptions(includes=["subscription_id", "description"]):
# Add both provisioning and active routers, since trunks are required for promoting a router to active.
routers[str(router["subscription_id"])] = router["description"]
class CreateIptrunkForm(FormPage):
model_config = ConfigDict(title=product_name)
tt_number: str
partner: ReadOnlyField("GEANT", default_type=str) # type: ignore[valid-type]
geant_s_sid: str | None = None
iptrunk_description: str | None = None
iptrunk_type: IptrunkType
iptrunk_speed: PhysicalPortCapacity
iptrunk_number_of_members: int
@field_validator("tt_number")
def validate_tt_number(cls, tt_number: str) -> str:
return validate_tt_number(tt_number)
initial_user_input = yield CreateIptrunkForm
class VerifyMinimumLinksForm(FormPage):
info_label: Label = (
f"This is the calculated minimum-links for this LAG: " f"{initial_user_input.iptrunk_number_of_members - 1}"
)
iptrunk_minimum_links: int = initial_user_input.iptrunk_number_of_members - 1
info_label2: Label = "Please confirm or modify."
verify_minimum_links = yield VerifyMinimumLinksForm
router_enum_a = Choice("Select a router", zip(routers.keys(), routers.items(), strict=True)) # type: ignore[arg-type]
class SelectRouterSideA(FormPage):
model_config = ConfigDict(title="Select a router for side A of the trunk.")
side_a_node_id: router_enum_a # type: ignore[valid-type]
@field_validator("side_a_node_id")
def validate_device_exists_in_netbox(cls, side_a_node_id: UUIDstr) -> str | None:
return validate_router_in_netbox(side_a_node_id)
user_input_router_side_a = yield SelectRouterSideA
router_a = user_input_router_side_a.side_a_node_id.name
router_a_fqdn = Router.from_subscription(router_a).router.router_fqdn
juniper_ae_members = Annotated[
list[LAGMember],
AfterValidator(validate_unique_list),
Len(
min_length=initial_user_input.iptrunk_number_of_members,
max_length=initial_user_input.iptrunk_number_of_members,
),
]
if get_router_vendor(router_a) == Vendor.NOKIA:
class NokiaLAGMemberA(LAGMember):
interface_name: available_interfaces_choices( # type: ignore[valid-type]
router_a,
initial_user_input.iptrunk_speed,
)
ae_members_side_a_type = Annotated[
list[NokiaLAGMemberA],
AfterValidator(validate_unique_list),
Len(
min_length=initial_user_input.iptrunk_number_of_members,
max_length=initial_user_input.iptrunk_number_of_members,
),
]
else:
ae_members_side_a_type = juniper_ae_members # type: ignore[assignment, misc]
class CreateIptrunkSideAForm(FormPage):
model_config = ConfigDict(title=f"Provide subscription details for side A of the trunk.({router_a_fqdn})")
side_a_ae_iface: available_lags_choices(router_a) or str # type: ignore[valid-type]
side_a_ae_geant_a_sid: str | None
side_a_ae_members: ae_members_side_a_type
@field_validator("side_a_ae_members")
def validate_side_a_ae_members(cls, side_a_ae_members: list[LAGMember]) -> list[LAGMember]:
validate_iptrunk_unique_interface(side_a_ae_members)
vendor = get_router_vendor(router_a)
validate_interface_name_list(side_a_ae_members, vendor)
return side_a_ae_members
user_input_side_a = yield CreateIptrunkSideAForm
# Remove the selected router for side A, to prevent any loops
routers.pop(str(router_a))
router_enum_b = Choice("Select a router", zip(routers.keys(), routers.items(), strict=True)) # type: ignore[arg-type]
class SelectRouterSideB(FormPage):
model_config = ConfigDict(title="Select a router for side B of the trunk.")
side_b_node_id: router_enum_b # type: ignore[valid-type]
@field_validator("side_b_node_id")
def validate_device_exists_in_netbox(cls, side_b_node_id: UUIDstr) -> str | None:
return validate_router_in_netbox(side_b_node_id)
user_input_router_side_b = yield SelectRouterSideB
router_b = user_input_router_side_b.side_b_node_id.name
router_b_fqdn = Router.from_subscription(router_b).router.router_fqdn
if get_router_vendor(router_b) == Vendor.NOKIA:
class NokiaLAGMemberB(LAGMember):
interface_name: available_interfaces_choices( # type: ignore[valid-type]
router_b,
initial_user_input.iptrunk_speed,
)
ae_members_side_b = Annotated[
list[NokiaLAGMemberB],
AfterValidator(validate_unique_list),
Len(
min_length=len(user_input_side_a.side_a_ae_members), max_length=len(user_input_side_a.side_a_ae_members)
),
]
else:
ae_members_side_b = juniper_ae_members # type: ignore[assignment, misc]
class CreateIptrunkSideBForm(FormPage):
model_config = ConfigDict(title=f"Provide subscription details for side B of the trunk.({router_b_fqdn})")
side_b_ae_iface: available_lags_choices(router_b) or str # type: ignore[valid-type]
side_b_ae_geant_a_sid: str | None
side_b_ae_members: ae_members_side_b
@field_validator("side_b_ae_members")
def validate_side_b_ae_members(cls, side_b_ae_members: list[LAGMember]) -> list[LAGMember]:
validate_iptrunk_unique_interface(side_b_ae_members)
vendor = get_router_vendor(router_b)
validate_interface_name_list(side_b_ae_members, vendor)
return side_b_ae_members
user_input_side_b = yield CreateIptrunkSideBForm
return (
initial_user_input.model_dump()
| verify_minimum_links.model_dump()
| user_input_router_side_a.model_dump()
| user_input_side_a.model_dump()
| user_input_router_side_b.model_dump()
| user_input_side_b.model_dump()
)
@step("Create subscription")
def create_subscription(product: UUIDstr, partner: str) -> State:
"""Create a new subscription object in the database."""
subscription = IptrunkInactive.from_product_id(product, get_partner_by_name(partner)["partner_id"])
return {
"subscription": subscription,
"subscription_id": subscription.subscription_id,
}
@step("Get information from IPAM")
def get_info_from_ipam(subscription: IptrunkInactive) -> State:
"""Allocate IP resources in :term:`IPAM`."""
subscription.iptrunk.iptrunk_ipv4_network = infoblox.allocate_v4_network(
"TRUNK",
subscription.iptrunk.iptrunk_description,
)
subscription.iptrunk.iptrunk_ipv6_network = infoblox.allocate_v6_network(
"TRUNK",
subscription.iptrunk.iptrunk_description,
)
return {"subscription": subscription}
@step("Initialize subscription")
def initialize_subscription(
subscription: IptrunkInactive,
geant_s_sid: str | None,
iptrunk_type: IptrunkType,
iptrunk_description: str | None,
iptrunk_speed: PhysicalPortCapacity,
iptrunk_minimum_links: int,
side_a_node_id: str,
side_a_ae_iface: str,
side_a_ae_geant_a_sid: str | None,
side_a_ae_members: list[dict],
side_b_node_id: str,
side_b_ae_iface: str,
side_b_ae_geant_a_sid: str | None,
side_b_ae_members: list[dict],
) -> State:
"""Take all input from the user, and store it in the database."""
oss_params = load_oss_params()
side_a = Router.from_subscription(side_a_node_id).router
side_b = Router.from_subscription(side_b_node_id).router
subscription.iptrunk.geant_s_sid = geant_s_sid
subscription.iptrunk.iptrunk_description = iptrunk_description
subscription.iptrunk.iptrunk_type = iptrunk_type
subscription.iptrunk.iptrunk_speed = iptrunk_speed
subscription.iptrunk.iptrunk_isis_metric = oss_params.GENERAL.isis_high_metric
subscription.iptrunk.iptrunk_minimum_links = iptrunk_minimum_links
subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node = side_a
subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_iface = side_a_ae_iface
subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_geant_a_sid = side_a_ae_geant_a_sid
for member in side_a_ae_members:
subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members.append(
IptrunkInterfaceBlockInactive.new(subscription_id=uuid4(), **member),
)
subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node = side_b
subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_iface = side_b_ae_iface
subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_geant_a_sid = side_b_ae_geant_a_sid
for member in side_b_ae_members:
subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_members.append(
IptrunkInterfaceBlockInactive.new(subscription_id=uuid4(), **member),
)
side_names = sorted([side_a.router_site.site_name, side_b.router_site.site_name])
subscription.description = f"IP trunk {side_names[0]} {side_names[1]}, geant_s_sid:{geant_s_sid}"
return {"subscription": subscription}
@step("[DRY RUN] Provision IP trunk interface")
def provision_ip_trunk_iface_dry(
subscription: IptrunkInactive,
callback_route: str,
process_id: UUIDstr,
tt_number: str,
) -> State:
"""Perform a dry run of deploying configuration on both sides of the trunk."""
extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)),
"dry_run": True,
"verb": "deploy",
"config_object": "trunk_interface",
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy config for "
f"{subscription.iptrunk.geant_s_sid}",
}
execute_playbook(
playbook_name="iptrunks.yaml",
callback_route=callback_route,
inventory=f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}\n"
f"{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}\n",
extra_vars=extra_vars,
)
return {"subscription": subscription}
@step("[FOR REAL] Provision IP trunk interface")
def provision_ip_trunk_iface_real(
subscription: IptrunkInactive,
callback_route: str,
process_id: UUIDstr,
tt_number: str,
) -> State:
"""Deploy IP trunk configuration on both sides."""
extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)),
"dry_run": False,
"verb": "deploy",
"config_object": "trunk_interface",
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy config for "
f"{subscription.iptrunk.geant_s_sid}",
}
execute_playbook(
playbook_name="iptrunks.yaml",
callback_route=callback_route,
inventory=f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}\n"
f"{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}\n",
extra_vars=extra_vars,
)
return {"subscription": subscription}
@step("Check IP connectivity of the trunk")
def check_ip_trunk_connectivity(
subscription: IptrunkInactive,
callback_route: str,
) -> State:
"""Check successful connectivity across the new trunk."""
extra_vars = {"wfo_ip_trunk_json": json.loads(json_dumps(subscription)), "check": "ping"}
execute_playbook(
playbook_name="iptrunks_checks.yaml",
callback_route=callback_route,
inventory=subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn, # type: ignore[arg-type]
extra_vars=extra_vars,
)
return {"subscription": subscription}
@step("[DRY RUN] Provision IP trunk ISIS interface")
def provision_ip_trunk_isis_iface_dry(
subscription: IptrunkInactive,
callback_route: str,
process_id: UUIDstr,
tt_number: str,
) -> State:
"""Perform a dry run of deploying :term:`ISIS` configuration."""
extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)),
"dry_run": True,
"verb": "deploy",
"config_object": "isis_interface",
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy config for "
f"{subscription.iptrunk.geant_s_sid}",
}
execute_playbook(
playbook_name="iptrunks.yaml",
callback_route=callback_route,
inventory=f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}\n"
f"{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}\n",
extra_vars=extra_vars,
)
return {"subscription": subscription}
@step("[FOR REAL] Provision IP trunk ISIS interface")
def provision_ip_trunk_isis_iface_real(
subscription: IptrunkInactive,
callback_route: str,
process_id: UUIDstr,
tt_number: str,
) -> State:
"""Deploy :term:`ISIS` configuration on both sides."""
extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)),
"dry_run": False,
"verb": "deploy",
"config_object": "isis_interface",
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy config for "
f"{subscription.iptrunk.geant_s_sid}",
}
execute_playbook(
playbook_name="iptrunks.yaml",
callback_route=callback_route,
inventory=f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}\n"
f"{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}\n",
extra_vars=extra_vars,
)
return {"subscription": subscription}
@step("Check ISIS adjacency")
def check_ip_trunk_isis(
subscription: IptrunkInactive,
callback_route: str,
) -> State:
"""Run an Ansible playbook to confirm :term:`ISIS` adjacency."""
extra_vars = {"wfo_ip_trunk_json": json.loads(json_dumps(subscription)), "check": "isis"}
execute_playbook(
playbook_name="iptrunks_checks.yaml",
callback_route=callback_route,
inventory=subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn, # type: ignore[arg-type]
extra_vars=extra_vars,
)
return {"subscription": subscription}
@step("Register DNS records for both sides of the trunk")
def register_dns_records(subscription: IptrunkInactive) -> State:
"""Register :term:`DNS` records for both sides of the newly created IPtrunk."""
for index, side in enumerate(subscription.iptrunk.iptrunk_sides):
fqdn = f"{side.iptrunk_side_ae_iface}-0.{side.iptrunk_side_node.router_fqdn}"
if not (subscription.iptrunk.iptrunk_ipv4_network and subscription.iptrunk.iptrunk_ipv6_network):
msg = f"Missing IP resources in trunk, cannot allocate DNS record for side {fqdn}!"
raise ValueError(msg)
ipv4_addr = subscription.iptrunk.iptrunk_ipv4_network[index]
ipv6_addr = subscription.iptrunk.iptrunk_ipv6_network[index + 1]
infoblox.create_host_by_ip(fqdn, ipv4_addr, ipv6_addr, "TRUNK", str(subscription.subscription_id))
return {"subscription": subscription}
@step("NextBox integration")
def reserve_interfaces_in_netbox(subscription: IptrunkInactive) -> State:
"""Create the :term:`LAG` interfaces in NetBox and attach the lag interfaces to the physical interfaces."""
nbclient = NetboxClient()
for trunk_side in subscription.iptrunk.iptrunk_sides:
if get_router_vendor(trunk_side.iptrunk_side_node.owner_subscription_id) == Vendor.NOKIA:
# Create :term:`LAG` interfaces
lag_interface: Interfaces = nbclient.create_interface(
iface_name=trunk_side.iptrunk_side_ae_iface, # type: ignore[arg-type]
interface_type="lag",
device_name=trunk_side.iptrunk_side_node.router_fqdn, # type: ignore[arg-type]
description=str(subscription.subscription_id),
enabled=True,
)
# Attach physical interfaces to :term:`LAG`
# Update interface description to subscription ID
# Reserve interfaces
for interface in trunk_side.iptrunk_side_ae_members:
nbclient.attach_interface_to_lag(
device_name=trunk_side.iptrunk_side_node.router_fqdn, # type: ignore[arg-type]
lag_name=lag_interface.name,
iface_name=interface.interface_name, # type: ignore[arg-type]
description=str(subscription.subscription_id),
)
nbclient.reserve_interface(
device_name=trunk_side.iptrunk_side_node.router_fqdn, # type: ignore[arg-type]
iface_name=interface.interface_name, # type: ignore[arg-type]
)
return {
"subscription": subscription,
}
def _allocate_interfaces_in_netbox(iptrunk_side: IptrunkSideBlockInactive) -> None:
for interface in iptrunk_side.iptrunk_side_ae_members:
fqdn = iptrunk_side.iptrunk_side_node.router_fqdn
iface_name = interface.interface_name
if not fqdn or not iface_name:
msg = f"FQDN and/or interface name missing in subscription {interface.owner_subscription_id}"
raise ValueError(msg)
NetboxClient().allocate_interface(device_name=fqdn, iface_name=iface_name)
@step("Allocate interfaces in Netbox for side A")
def netbox_allocate_side_a_interfaces(subscription: IptrunkInactive) -> None:
"""Allocate the :term:`LAG` interfaces for the Nokia router on side A."""
_allocate_interfaces_in_netbox(subscription.iptrunk.iptrunk_sides[0])
@step("Allocate interfaces in Netbox for side B")
def netbox_allocate_side_b_interfaces(subscription: IptrunkInactive) -> None:
"""Allocate the :term:`LAG` interfaces for the Nokia router on side B."""
_allocate_interfaces_in_netbox(subscription.iptrunk.iptrunk_sides[1])
@step("Create a new SharePoint checklist item")
def create_new_sharepoint_checklist(subscription: IptrunkProvisioning, tt_number: str) -> State:
"""Create a new checklist item in SharePoint for approving this IPtrunk."""
new_list_item_url = SharePointClient().add_list_item(
"ip_trunk",
{"Title": f"{subscription.description} - {subscription.iptrunk.geant_s_sid}", "TT_NUMBER": tt_number},
)
return {"checklist_url": new_list_item_url}
@workflow(
"Create IP trunk",
initial_input_form=wrap_create_initial_input_form(initial_input_form_generator),
target=Target.CREATE,
)
def create_iptrunk() -> StepList:
"""Create a new IP trunk.
* Create the subscription object in the database
* Gather relevant information from Infoblox
* Reserve interfaces in Netbox
* Deploy configuration on the two sides of the trunk, first as a dry run
* Check connectivity on the new trunk
* Deploy the new :term:`ISIS` metric on the trunk, first as a dry run
* Verify :term:`ISIS` adjacency
* Allocate the interfaces in Netbox
* Set the subscription to active in the database
"""
side_a_is_nokia = conditional(lambda state: get_router_vendor(state["side_a_node_id"]) == Vendor.NOKIA)
side_b_is_nokia = conditional(lambda state: get_router_vendor(state["side_b_node_id"]) == Vendor.NOKIA)
return (
begin
>> create_subscription
>> store_process_subscription(Target.CREATE)
>> initialize_subscription
>> get_info_from_ipam
>> reserve_interfaces_in_netbox
>> lso_interaction(provision_ip_trunk_iface_dry)
>> lso_interaction(provision_ip_trunk_iface_real)
>> lso_interaction(check_ip_trunk_connectivity)
>> lso_interaction(provision_ip_trunk_isis_iface_dry)
>> lso_interaction(provision_ip_trunk_isis_iface_real)
>> lso_interaction(check_ip_trunk_isis)
>> register_dns_records
>> side_a_is_nokia(netbox_allocate_side_a_interfaces)
>> side_b_is_nokia(netbox_allocate_side_b_interfaces)
>> set_status(SubscriptionLifecycle.PROVISIONING)
>> create_new_sharepoint_checklist
>> prompt_sharepoint_checklist_url
>> resync
>> done
)