"""A creation workflow that deploys a new IP trunk service.""" import json from typing import Annotated from uuid import uuid4 from annotated_types import Len from orchestrator.forms import FormPage from orchestrator.forms.validators import Choice, Label from orchestrator.targets import Target from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr from orchestrator.utils.json import json_dumps from orchestrator.workflow import StepList, conditional, done, init, step, workflow from orchestrator.workflows.steps import resync, set_status, store_process_subscription from orchestrator.workflows.utils import wrap_create_initial_input_form from pydantic import AfterValidator, ConfigDict, field_validator from pydantic_forms.validators import ReadOnlyField, validate_unique_list from pynetbox.models.dcim import Interfaces from gso.products.product_blocks.iptrunk import ( IptrunkInterfaceBlockInactive, IptrunkSideBlockInactive, IptrunkType, PhysicalPortCapacity, ) from gso.products.product_types.iptrunk import IptrunkInactive, IptrunkProvisioning from gso.products.product_types.router import Router from gso.services import infoblox, subscriptions from gso.services.lso_client import execute_playbook, lso_interaction from gso.services.netbox_client import NetboxClient from gso.services.partners import get_partner_by_name from gso.services.sharepoint import SharePointClient from gso.settings import load_oss_params from gso.utils.helpers import ( LAGMember, available_interfaces_choices, available_lags_choices, get_router_vendor, validate_interface_name_list, validate_iptrunk_unique_interface, validate_router_in_netbox, validate_tt_number, ) from gso.utils.shared_enums import Vendor from gso.utils.workflow_steps import prompt_sharepoint_checklist_url def initial_input_form_generator(product_name: str) -> FormGenerator: """Gather input from the user in three steps. General information, and information on both sides of the trunk.""" routers = {} for router in subscriptions.get_active_router_subscriptions( includes=["subscription_id", "description"] ) + subscriptions.get_provisioning_router_subscriptions(includes=["subscription_id", "description"]): # Add both provisioning and active routers, since trunks are required for promoting a router to active. routers[str(router["subscription_id"])] = router["description"] class CreateIptrunkForm(FormPage): model_config = ConfigDict(title=product_name) tt_number: str partner: ReadOnlyField("GEANT", default_type=str) # type: ignore[valid-type] geant_s_sid: str | None = None iptrunk_description: str | None = None iptrunk_type: IptrunkType iptrunk_speed: PhysicalPortCapacity iptrunk_number_of_members: int @field_validator("tt_number") def validate_tt_number(cls, tt_number: str) -> str: return validate_tt_number(tt_number) initial_user_input = yield CreateIptrunkForm class VerifyMinimumLinksForm(FormPage): info_label: Label = ( f"This is the calculated minimum-links for this LAG: " f"{initial_user_input.iptrunk_number_of_members - 1}" ) iptrunk_minimum_links: int = initial_user_input.iptrunk_number_of_members - 1 info_label2: Label = "Please confirm or modify." verify_minimum_links = yield VerifyMinimumLinksForm router_enum_a = Choice("Select a router", zip(routers.keys(), routers.items(), strict=True)) # type: ignore[arg-type] class SelectRouterSideA(FormPage): model_config = ConfigDict(title="Select a router for side A of the trunk.") side_a_node_id: router_enum_a # type: ignore[valid-type] @field_validator("side_a_node_id") def validate_device_exists_in_netbox(cls, side_a_node_id: UUIDstr) -> str | None: return validate_router_in_netbox(side_a_node_id) user_input_router_side_a = yield SelectRouterSideA router_a = user_input_router_side_a.side_a_node_id.name router_a_fqdn = Router.from_subscription(router_a).router.router_fqdn juniper_ae_members = Annotated[ list[LAGMember], AfterValidator(validate_unique_list), Len( min_length=initial_user_input.iptrunk_number_of_members, max_length=initial_user_input.iptrunk_number_of_members, ), ] if get_router_vendor(router_a) == Vendor.NOKIA: class NokiaLAGMemberA(LAGMember): interface_name: available_interfaces_choices( # type: ignore[valid-type] router_a, initial_user_input.iptrunk_speed, ) ae_members_side_a_type = Annotated[ list[NokiaLAGMemberA], AfterValidator(validate_unique_list), Len( min_length=initial_user_input.iptrunk_number_of_members, max_length=initial_user_input.iptrunk_number_of_members, ), ] else: ae_members_side_a_type = juniper_ae_members # type: ignore[assignment, misc] class CreateIptrunkSideAForm(FormPage): model_config = ConfigDict(title=f"Provide subscription details for side A of the trunk.({router_a_fqdn})") side_a_ae_iface: available_lags_choices(router_a) or str # type: ignore[valid-type] side_a_ae_geant_a_sid: str | None side_a_ae_members: ae_members_side_a_type @field_validator("side_a_ae_members") def validate_side_a_ae_members(cls, side_a_ae_members: list[LAGMember]) -> list[LAGMember]: validate_iptrunk_unique_interface(side_a_ae_members) vendor = get_router_vendor(router_a) validate_interface_name_list(side_a_ae_members, vendor) return side_a_ae_members user_input_side_a = yield CreateIptrunkSideAForm # Remove the selected router for side A, to prevent any loops routers.pop(str(router_a)) router_enum_b = Choice("Select a router", zip(routers.keys(), routers.items(), strict=True)) # type: ignore[arg-type] class SelectRouterSideB(FormPage): model_config = ConfigDict(title="Select a router for side B of the trunk.") side_b_node_id: router_enum_b # type: ignore[valid-type] @field_validator("side_b_node_id") def validate_device_exists_in_netbox(cls, side_b_node_id: UUIDstr) -> str | None: return validate_router_in_netbox(side_b_node_id) user_input_router_side_b = yield SelectRouterSideB router_b = user_input_router_side_b.side_b_node_id.name router_b_fqdn = Router.from_subscription(router_b).router.router_fqdn if get_router_vendor(router_b) == Vendor.NOKIA: class NokiaLAGMemberB(LAGMember): interface_name: available_interfaces_choices( # type: ignore[valid-type] router_b, initial_user_input.iptrunk_speed, ) ae_members_side_b = Annotated[ list[NokiaLAGMemberB], AfterValidator(validate_unique_list), Len( min_length=len(user_input_side_a.side_a_ae_members), max_length=len(user_input_side_a.side_a_ae_members) ), ] else: ae_members_side_b = juniper_ae_members # type: ignore[assignment, misc] class CreateIptrunkSideBForm(FormPage): model_config = ConfigDict(title=f"Provide subscription details for side B of the trunk.({router_b_fqdn})") side_b_ae_iface: available_lags_choices(router_b) or str # type: ignore[valid-type] side_b_ae_geant_a_sid: str | None side_b_ae_members: ae_members_side_b @field_validator("side_b_ae_members") def validate_side_b_ae_members(cls, side_b_ae_members: list[LAGMember]) -> list[LAGMember]: validate_iptrunk_unique_interface(side_b_ae_members) vendor = get_router_vendor(router_b) validate_interface_name_list(side_b_ae_members, vendor) return side_b_ae_members user_input_side_b = yield CreateIptrunkSideBForm return ( initial_user_input.dict() | verify_minimum_links.dict() | user_input_router_side_a.dict() | user_input_side_a.dict() | user_input_router_side_b.dict() | user_input_side_b.dict() ) @step("Create subscription") def create_subscription(product: UUIDstr, partner: str) -> State: """Create a new subscription object in the database.""" subscription = IptrunkInactive.from_product_id(product, get_partner_by_name(partner)["partner_id"]) return { "subscription": subscription, "subscription_id": subscription.subscription_id, } @step("Get information from IPAM") def get_info_from_ipam(subscription: IptrunkInactive) -> State: """Allocate IP resources in :term:`IPAM`.""" subscription.iptrunk.iptrunk_ipv4_network = infoblox.allocate_v4_network( "TRUNK", subscription.iptrunk.iptrunk_description, ) subscription.iptrunk.iptrunk_ipv6_network = infoblox.allocate_v6_network( "TRUNK", subscription.iptrunk.iptrunk_description, ) return {"subscription": subscription} @step("Initialize subscription") def initialize_subscription( subscription: IptrunkInactive, geant_s_sid: str | None, iptrunk_type: IptrunkType, iptrunk_description: str | None, iptrunk_speed: PhysicalPortCapacity, iptrunk_minimum_links: int, side_a_node_id: str, side_a_ae_iface: str, side_a_ae_geant_a_sid: str | None, side_a_ae_members: list[dict], side_b_node_id: str, side_b_ae_iface: str, side_b_ae_geant_a_sid: str | None, side_b_ae_members: list[dict], ) -> State: """Take all input from the user, and store it in the database.""" oss_params = load_oss_params() side_a = Router.from_subscription(side_a_node_id).router side_b = Router.from_subscription(side_b_node_id).router subscription.iptrunk.geant_s_sid = geant_s_sid subscription.iptrunk.iptrunk_description = iptrunk_description subscription.iptrunk.iptrunk_type = iptrunk_type subscription.iptrunk.iptrunk_speed = iptrunk_speed subscription.iptrunk.iptrunk_isis_metric = oss_params.GENERAL.isis_high_metric subscription.iptrunk.iptrunk_minimum_links = iptrunk_minimum_links subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node = side_a subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_iface = side_a_ae_iface subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_geant_a_sid = side_a_ae_geant_a_sid for member in side_a_ae_members: subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members.append( IptrunkInterfaceBlockInactive.new(subscription_id=uuid4(), **member), ) subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node = side_b subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_iface = side_b_ae_iface subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_geant_a_sid = side_b_ae_geant_a_sid for member in side_b_ae_members: subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_members.append( IptrunkInterfaceBlockInactive.new(subscription_id=uuid4(), **member), ) side_names = sorted([side_a.router_site.site_name, side_b.router_site.site_name]) subscription.description = f"IP trunk {side_names[0]} {side_names[1]}, geant_s_sid:{geant_s_sid}" return {"subscription": subscription} @step("[DRY RUN] Provision IP trunk interface") def provision_ip_trunk_iface_dry( subscription: IptrunkInactive, callback_route: str, process_id: UUIDstr, tt_number: str, ) -> State: """Perform a dry run of deploying configuration on both sides of the trunk.""" extra_vars = { "wfo_trunk_json": json.loads(json_dumps(subscription)), "dry_run": True, "verb": "deploy", "config_object": "trunk_interface", "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy config for " f"{subscription.iptrunk.geant_s_sid}", } execute_playbook( playbook_name="iptrunks.yaml", callback_route=callback_route, inventory=f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}\n" f"{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}\n", extra_vars=extra_vars, ) return {"subscription": subscription} @step("[FOR REAL] Provision IP trunk interface") def provision_ip_trunk_iface_real( subscription: IptrunkInactive, callback_route: str, process_id: UUIDstr, tt_number: str, ) -> State: """Deploy IP trunk configuration on both sides.""" extra_vars = { "wfo_trunk_json": json.loads(json_dumps(subscription)), "dry_run": False, "verb": "deploy", "config_object": "trunk_interface", "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy config for " f"{subscription.iptrunk.geant_s_sid}", } execute_playbook( playbook_name="iptrunks.yaml", callback_route=callback_route, inventory=f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}\n" f"{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}\n", extra_vars=extra_vars, ) return {"subscription": subscription} @step("Check IP connectivity of the trunk") def check_ip_trunk_connectivity( subscription: IptrunkInactive, callback_route: str, ) -> State: """Check successful connectivity across the new trunk.""" extra_vars = {"wfo_ip_trunk_json": json.loads(json_dumps(subscription)), "check": "ping"} execute_playbook( playbook_name="iptrunks_checks.yaml", callback_route=callback_route, inventory=subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn, # type: ignore[arg-type] extra_vars=extra_vars, ) return {"subscription": subscription} @step("[DRY RUN] Provision IP trunk ISIS interface") def provision_ip_trunk_isis_iface_dry( subscription: IptrunkInactive, callback_route: str, process_id: UUIDstr, tt_number: str, ) -> State: """Perform a dry run of deploying :term:`ISIS` configuration.""" extra_vars = { "wfo_trunk_json": json.loads(json_dumps(subscription)), "dry_run": True, "verb": "deploy", "config_object": "isis_interface", "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy config for " f"{subscription.iptrunk.geant_s_sid}", } execute_playbook( playbook_name="iptrunks.yaml", callback_route=callback_route, inventory=f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}\n" f"{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}\n", extra_vars=extra_vars, ) return {"subscription": subscription} @step("[FOR REAL] Provision IP trunk ISIS interface") def provision_ip_trunk_isis_iface_real( subscription: IptrunkInactive, callback_route: str, process_id: UUIDstr, tt_number: str, ) -> State: """Deploy :term:`ISIS` configuration on both sides.""" extra_vars = { "wfo_trunk_json": json.loads(json_dumps(subscription)), "dry_run": False, "verb": "deploy", "config_object": "isis_interface", "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy config for " f"{subscription.iptrunk.geant_s_sid}", } execute_playbook( playbook_name="iptrunks.yaml", callback_route=callback_route, inventory=f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}\n" f"{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}\n", extra_vars=extra_vars, ) return {"subscription": subscription} @step("Check ISIS adjacency") def check_ip_trunk_isis( subscription: IptrunkInactive, callback_route: str, ) -> State: """Run an Ansible playbook to confirm :term:`ISIS` adjacency.""" extra_vars = {"wfo_ip_trunk_json": json.loads(json_dumps(subscription)), "check": "isis"} execute_playbook( playbook_name="iptrunks_checks.yaml", callback_route=callback_route, inventory=subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn, # type: ignore[arg-type] extra_vars=extra_vars, ) return {"subscription": subscription} @step("Register DNS records for both sides of the trunk") def register_dns_records(subscription: IptrunkInactive) -> State: """Register :term:`DNS` records for both sides of the newly created IPtrunk.""" for index, side in enumerate(subscription.iptrunk.iptrunk_sides): fqdn = f"{side.iptrunk_side_ae_iface}-0.{side.iptrunk_side_node.router_fqdn}" if not (subscription.iptrunk.iptrunk_ipv4_network and subscription.iptrunk.iptrunk_ipv6_network): msg = f"Missing IP resources in trunk, cannot allocate DNS record for side {fqdn}!" raise ValueError(msg) ipv4_addr = subscription.iptrunk.iptrunk_ipv4_network[index] ipv6_addr = subscription.iptrunk.iptrunk_ipv6_network[index + 1] infoblox.create_host_by_ip(fqdn, ipv4_addr, ipv6_addr, "TRUNK", str(subscription.subscription_id)) return {"subscription": subscription} @step("NextBox integration") def reserve_interfaces_in_netbox(subscription: IptrunkInactive) -> State: """Create the :term:`LAG` interfaces in NetBox and attach the lag interfaces to the physical interfaces.""" nbclient = NetboxClient() for trunk_side in subscription.iptrunk.iptrunk_sides: if get_router_vendor(trunk_side.iptrunk_side_node.owner_subscription_id) == Vendor.NOKIA: # Create :term:`LAG` interfaces lag_interface: Interfaces = nbclient.create_interface( iface_name=trunk_side.iptrunk_side_ae_iface, # type: ignore[arg-type] interface_type="lag", device_name=trunk_side.iptrunk_side_node.router_fqdn, # type: ignore[arg-type] description=str(subscription.subscription_id), enabled=True, ) # Attach physical interfaces to :term:`LAG` # Update interface description to subscription ID # Reserve interfaces for interface in trunk_side.iptrunk_side_ae_members: nbclient.attach_interface_to_lag( device_name=trunk_side.iptrunk_side_node.router_fqdn, # type: ignore[arg-type] lag_name=lag_interface.name, iface_name=interface.interface_name, # type: ignore[arg-type] description=str(subscription.subscription_id), ) nbclient.reserve_interface( device_name=trunk_side.iptrunk_side_node.router_fqdn, # type: ignore[arg-type] iface_name=interface.interface_name, # type: ignore[arg-type] ) return { "subscription": subscription, } def _allocate_interfaces_in_netbox(iptrunk_side: IptrunkSideBlockInactive) -> None: for interface in iptrunk_side.iptrunk_side_ae_members: fqdn = iptrunk_side.iptrunk_side_node.router_fqdn iface_name = interface.interface_name if not fqdn or not iface_name: msg = f"FQDN and/or interface name missing in subscription {interface.owner_subscription_id}" raise ValueError(msg) NetboxClient().allocate_interface(device_name=fqdn, iface_name=iface_name) @step("Allocate interfaces in Netbox for side A") def netbox_allocate_side_a_interfaces(subscription: IptrunkInactive) -> None: """Allocate the :term:`LAG` interfaces for the Nokia router on side A.""" _allocate_interfaces_in_netbox(subscription.iptrunk.iptrunk_sides[0]) @step("Allocate interfaces in Netbox for side B") def netbox_allocate_side_b_interfaces(subscription: IptrunkInactive) -> None: """Allocate the :term:`LAG` interfaces for the Nokia router on side B.""" _allocate_interfaces_in_netbox(subscription.iptrunk.iptrunk_sides[1]) @step("Create a new SharePoint checklist item") def create_new_sharepoint_checklist(subscription: IptrunkProvisioning, tt_number: str) -> State: """Create a new checklist item in SharePoint for approving this IPtrunk.""" new_list_item_url = SharePointClient().add_list_item( "ip_trunk", {"Title": subscription.iptrunk.iptrunk_description or subscription.description, "TT_NUMBER": tt_number}, ) return {"checklist_url": new_list_item_url} @workflow( "Create IP trunk", initial_input_form=wrap_create_initial_input_form(initial_input_form_generator), target=Target.CREATE, ) def create_iptrunk() -> StepList: """Create a new IP trunk. * Create the subscription object in the database * Gather relevant information from Infoblox * Reserve interfaces in Netbox * Deploy configuration on the two sides of the trunk, first as a dry run * Check connectivity on the new trunk * Deploy the new :term:`ISIS` metric on the trunk, first as a dry run * Verify :term:`ISIS` adjacency * Allocate the interfaces in Netbox * Set the subscription to active in the database """ side_a_is_nokia = conditional(lambda state: get_router_vendor(state["side_a_node_id"]) == Vendor.NOKIA) side_b_is_nokia = conditional(lambda state: get_router_vendor(state["side_b_node_id"]) == Vendor.NOKIA) return ( init >> create_subscription >> store_process_subscription(Target.CREATE) >> initialize_subscription >> get_info_from_ipam >> reserve_interfaces_in_netbox >> lso_interaction(provision_ip_trunk_iface_dry) >> lso_interaction(provision_ip_trunk_iface_real) >> lso_interaction(check_ip_trunk_connectivity) >> lso_interaction(provision_ip_trunk_isis_iface_dry) >> lso_interaction(provision_ip_trunk_isis_iface_real) >> lso_interaction(check_ip_trunk_isis) >> register_dns_records >> side_a_is_nokia(netbox_allocate_side_a_interfaces) >> side_b_is_nokia(netbox_allocate_side_b_interfaces) >> set_status(SubscriptionLifecycle.PROVISIONING) >> create_new_sharepoint_checklist >> prompt_sharepoint_checklist_url >> resync >> done )