from uuid import uuid4 from orchestrator.forms import FormPage from orchestrator.forms.validators import Choice, UniqueConstrainedList from orchestrator.targets import Target from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr from orchestrator.workflow import StepList, done, init, step, workflow from orchestrator.workflows.steps import resync, set_status, store_process_subscription from orchestrator.workflows.utils import wrap_create_initial_input_form from pydantic import validator from pynetbox.models.dcim import Interfaces from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlockInactive, IptrunkType, PhyPortCapacity from gso.products.product_blocks.router import RouterVendor from gso.products.product_types.iptrunk import IptrunkInactive, IptrunkProvisioning from gso.products.product_types.router import Router from gso.services import infoblox, provisioning_proxy, subscriptions from gso.services.crm import customer_selector from gso.services.netbox_client import NetboxClient from gso.services.provisioning_proxy import pp_interaction from gso.utils.helpers import ( LAGMember, available_interfaces_choices, available_lags_choices, get_router_vendor, validate_iptrunk_unique_interface, validate_router_in_netbox, ) def initial_input_form_generator(product_name: str) -> FormGenerator: # TODO: implement more strict validation: # * interface names must be validated routers = {} for router in subscriptions.get_active_router_subscriptions(includes=["subscription_id", "description"]): routers[str(router["subscription_id"])] = router["description"] class CreateIptrunkForm(FormPage): class Config: title = product_name tt_number: str customer: customer_selector() # type: ignore[valid-type] geant_s_sid: str iptrunk_description: str iptrunk_type: IptrunkType iptrunk_speed: PhyPortCapacity iptrunk_minimum_links: int initial_user_input = yield CreateIptrunkForm router_enum_a = Choice("Select a router", zip(routers.keys(), routers.items())) # type: ignore[arg-type] class SelectRouterSideA(FormPage): class Config: title = "Select a router for side A of the trunk." side_a_node_id: router_enum_a # type: ignore[valid-type] @validator("side_a_node_id", allow_reuse=True) def validate_device_exists_in_netbox(cls, side_a_node_id: UUIDstr) -> str | None: return validate_router_in_netbox(side_a_node_id) user_input_router_side_a = yield SelectRouterSideA router_a = user_input_router_side_a.side_a_node_id.name class JuniperAeMembers(UniqueConstrainedList[LAGMember]): min_items = initial_user_input.iptrunk_minimum_links if get_router_vendor(router_a) == RouterVendor.NOKIA: class NokiaLAGMemberA(LAGMember): interface_name: available_interfaces_choices( # type: ignore[valid-type] router_a, initial_user_input.iptrunk_speed ) class NokiaAeMembersA(UniqueConstrainedList[NokiaLAGMemberA]): min_items = initial_user_input.iptrunk_minimum_links ae_members_side_a = NokiaAeMembersA else: ae_members_side_a = JuniperAeMembers # type: ignore[assignment] class CreateIptrunkSideAForm(FormPage): class Config: title = "Provide subscription details for side A of the trunk." side_a_ae_iface: available_lags_choices(router_a) or str # type: ignore[valid-type] side_a_ae_geant_a_sid: str side_a_ae_members: ae_members_side_a # type: ignore[valid-type] @validator("side_a_ae_members", allow_reuse=True) def validate_iptrunk_unique_interface_side_a(cls, side_a_ae_members: list[LAGMember]) -> list[LAGMember]: return validate_iptrunk_unique_interface(side_a_ae_members) user_input_side_a = yield CreateIptrunkSideAForm # Remove the selected router for side A, to prevent any loops routers.pop(str(router_a)) router_enum_b = Choice("Select a router", zip(routers.keys(), routers.items())) # type: ignore[arg-type] class SelectRouterSideB(FormPage): class Config: title = "Select a router for side B of the trunk." side_b_node_id: router_enum_b # type: ignore[valid-type] @validator("side_b_node_id", allow_reuse=True) def validate_device_exists_in_netbox(cls, side_b_node_id: UUIDstr) -> str | None: return validate_router_in_netbox(side_b_node_id) user_input_router_side_b = yield SelectRouterSideB router_b = user_input_router_side_b.side_b_node_id.name if get_router_vendor(router_b) == RouterVendor.NOKIA: class NokiaLAGMemberB(LAGMember): interface_name: available_interfaces_choices( # type: ignore[valid-type] router_b, initial_user_input.iptrunk_speed ) class NokiaAeMembersB(UniqueConstrainedList): min_items = len(user_input_side_a.side_a_ae_members) max_items = len(user_input_side_a.side_a_ae_members) item_type = NokiaLAGMemberB ae_members_side_b = NokiaAeMembersB else: ae_members_side_b = JuniperAeMembers # type: ignore[assignment] class CreateIptrunkSideBForm(FormPage): class Config: title = "Provide subscription details for side B of the trunk." side_b_ae_iface: available_lags_choices(router_b) or str # type: ignore[valid-type] side_b_ae_geant_a_sid: str side_b_ae_members: ae_members_side_b # type: ignore[valid-type] @validator("side_b_ae_members", allow_reuse=True) def validate_iptrunk_unique_interface_side_b(cls, side_b_ae_members: list[LAGMember]) -> list[LAGMember]: return validate_iptrunk_unique_interface(side_b_ae_members) user_input_side_b = yield CreateIptrunkSideBForm return ( initial_user_input.dict() | user_input_router_side_a.dict() | user_input_side_a.dict() | user_input_router_side_b.dict() | user_input_side_b.dict() ) @step("Create subscription") def create_subscription(product: UUIDstr, customer: UUIDstr) -> State: subscription = IptrunkInactive.from_product_id(product, customer) return { "subscription": subscription, "subscription_id": subscription.subscription_id, } @step("Get information from IPAM") def get_info_from_ipam(subscription: IptrunkProvisioning) -> State: subscription.iptrunk.iptrunk_ipv4_network = infoblox.allocate_v4_network( "TRUNK", subscription.iptrunk.iptrunk_description ) subscription.iptrunk.iptrunk_ipv6_network = infoblox.allocate_v6_network( "TRUNK", subscription.iptrunk.iptrunk_description ) return {"subscription": subscription} @step("Initialize subscription") def initialize_subscription( subscription: IptrunkInactive, geant_s_sid: str, iptrunk_type: IptrunkType, iptrunk_description: str, iptrunk_speed: PhyPortCapacity, iptrunk_minimum_links: int, side_a_node_id: str, side_a_ae_iface: str, side_a_ae_geant_a_sid: str, side_a_ae_members: list[dict], side_b_node_id: str, side_b_ae_iface: str, side_b_ae_geant_a_sid: str, side_b_ae_members: list[dict], ) -> State: subscription.iptrunk.geant_s_sid = geant_s_sid subscription.iptrunk.iptrunk_description = iptrunk_description subscription.iptrunk.iptrunk_type = iptrunk_type subscription.iptrunk.iptrunk_speed = iptrunk_speed subscription.iptrunk.iptrunk_isis_metric = 90000 subscription.iptrunk.iptrunk_minimum_links = iptrunk_minimum_links subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node = Router.from_subscription(side_a_node_id).router subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_iface = side_a_ae_iface subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_geant_a_sid = side_a_ae_geant_a_sid for member in side_a_ae_members: subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members.append( IptrunkInterfaceBlockInactive.new(subscription_id=uuid4(), **member) ) subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node = Router.from_subscription(side_b_node_id).router subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_iface = side_b_ae_iface subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_geant_a_sid = side_b_ae_geant_a_sid for member in side_b_ae_members: subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_members.append( IptrunkInterfaceBlockInactive.new(subscription_id=uuid4(), **member) ) subscription.description = f"IP trunk, geant_s_sid:{geant_s_sid}" subscription = IptrunkProvisioning.from_other_lifecycle(subscription, SubscriptionLifecycle.PROVISIONING) return {"subscription": subscription} @step("Provision IP trunk interface [DRY RUN]") def provision_ip_trunk_iface_dry(subscription: IptrunkProvisioning, process_id: UUIDstr, tt_number: str) -> State: provisioning_proxy.provision_ip_trunk(subscription, process_id, tt_number, "trunk_interface") return { "subscription": subscription, "label_text": "[DRY RUN] Provisioning a trunk interface, please refresh to get the results of the playbook.", } @step("Provision IP trunk interface [FOR REAL]") def provision_ip_trunk_iface_real(subscription: IptrunkProvisioning, process_id: UUIDstr, tt_number: str) -> State: provisioning_proxy.provision_ip_trunk(subscription, process_id, tt_number, "trunk_interface", False) return { "subscription": subscription, "label_text": "Provisioning a trunk interface, please refresh to get the results of the playbook.", } @step("Check IP connectivity of the trunk") def check_ip_trunk_connectivity(subscription: IptrunkProvisioning, process_id: UUIDstr, tt_number: str) -> State: provisioning_proxy.check_ip_trunk(subscription, process_id, tt_number, "ping") return { "subscription": subscription, "label_text": "[CHECK] Checking IP traffic flow on the trunk, to get the results of the playbook.", } @step("Provision IP trunk ISIS interface [DRY RUN]") def provision_ip_trunk_isis_iface_dry(subscription: IptrunkProvisioning, process_id: UUIDstr, tt_number: str) -> State: provisioning_proxy.provision_ip_trunk(subscription, process_id, tt_number, "isis_interface") return { "subscription": subscription, "label_text": "[DRY RUN] Provisioning ISIS interfaces, please refresh to get the results of the playbook.", } @step("Provision IP trunk ISIS interface [FOR REAL]") def provision_ip_trunk_isis_iface_real(subscription: IptrunkProvisioning, process_id: UUIDstr, tt_number: str) -> State: provisioning_proxy.provision_ip_trunk(subscription, process_id, tt_number, "isis_interface", False) return { "subscription": subscription, "label_text": "[COMMIT] Provisioning ISIS interfaces, please refresh to get the results of the playbook.", } @step("Check ISIS adjacency") def check_ip_trunk_isis(subscription: IptrunkProvisioning, process_id: UUIDstr, tt_number: str) -> State: provisioning_proxy.check_ip_trunk(subscription, process_id, tt_number, "isis") return { "subscription": subscription, "label_text": "[CHECK] Checking ISIS adjacency, please refresh to get the results of the playbook.", } @step("NextBox integration") def reserve_interfaces_in_netbox(subscription: IptrunkProvisioning) -> State: """Create the LAG interfaces in NetBox and attach the lag interfaces to the physical interfaces.""" nbclient = NetboxClient() for trunk_side in subscription.iptrunk.iptrunk_sides: if trunk_side.iptrunk_side_node.router_vendor == RouterVendor.NOKIA: # Create LAG interfaces lag_interface: Interfaces = nbclient.create_interface( iface_name=trunk_side.iptrunk_side_ae_iface, type="lag", device_name=trunk_side.iptrunk_side_node.router_fqdn, description=str(subscription.subscription_id), enabled=True, ) # Attach physical interfaces to LAG # Update interface description to subscription ID # Reserve interfaces for interface in trunk_side.iptrunk_side_ae_members: nbclient.attach_interface_to_lag( device_name=trunk_side.iptrunk_side_node.router_fqdn, lag_name=lag_interface.name, iface_name=interface.interface_name, description=str(subscription.subscription_id), ) nbclient.reserve_interface( device_name=trunk_side.iptrunk_side_node.router_fqdn, iface_name=interface.interface_name, ) return { "subscription": subscription, } @step("Allocate interfaces in Netbox") def allocate_interfaces_in_netbox(subscription: IptrunkProvisioning) -> State: """Allocate the LAG interfaces in NetBox and attach the lag interfaces to the physical interfaces.""" for trunk_side in subscription.iptrunk.iptrunk_sides: if trunk_side.iptrunk_side_node.router_vendor == RouterVendor.NOKIA: for interface in trunk_side.iptrunk_side_ae_members: NetboxClient().allocate_interface( device_name=trunk_side.iptrunk_side_node.router_fqdn, iface_name=interface.interface_name, ) return { "subscription": subscription, } @workflow( "Create IP trunk", initial_input_form=wrap_create_initial_input_form(initial_input_form_generator), target=Target.CREATE, ) def create_iptrunk() -> StepList: return ( init >> create_subscription >> store_process_subscription(Target.CREATE) >> initialize_subscription >> get_info_from_ipam >> reserve_interfaces_in_netbox >> pp_interaction(provision_ip_trunk_iface_dry, "Provision IPtrunk interface [DRY RUN]") >> pp_interaction(provision_ip_trunk_iface_real, "Provision IPtrunk interface [FOR REAL]") >> pp_interaction(check_ip_trunk_connectivity, "Check IPtrunk connectivity") >> pp_interaction(provision_ip_trunk_isis_iface_dry, "Provision ISIS interface [DRY RUN]") >> pp_interaction(provision_ip_trunk_isis_iface_real, "Provision ISIS interface [FOR REAL]") >> pp_interaction(check_ip_trunk_isis, "Validate IPtrunk") >> allocate_interfaces_in_netbox >> set_status(SubscriptionLifecycle.ACTIVE) >> resync >> done )