import copy import re from logging import getLogger from typing import NoReturn from uuid import uuid4 from orchestrator import step, workflow from orchestrator.config.assignee import Assignee from orchestrator.forms import FormPage from orchestrator.forms.validators import Choice, Label, UniqueConstrainedList from orchestrator.targets import Target from orchestrator.types import FormGenerator, State, UUIDstr from orchestrator.workflow import StepList, done, init, inputstep from orchestrator.workflows.steps import resync, store_process_subscription, unsync from orchestrator.workflows.utils import wrap_modify_initial_input_form from pydantic import validator from pydantic_forms.core import ReadOnlyField from pynetbox.models.dcim import Interfaces from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock from gso.products.product_blocks.router import RouterVendor from gso.products.product_types.iptrunk import Iptrunk from gso.products.product_types.router import Router from gso.services import provisioning_proxy from gso.services.netbox_client import NetboxClient from gso.services.provisioning_proxy import pp_interaction from gso.services.subscriptions import get_active_router_subscriptions from gso.utils.helpers import ( LAGMember, available_interfaces_choices, available_lags_choices, get_router_vendor, set_isis_to_90000, ) logger = getLogger(__name__) def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: subscription = Iptrunk.from_subscription(subscription_id) form_title = ( f"Subscription {subscription.iptrunk.geant_s_sid} " f" from {subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}" f" to {subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}" ) sides_dict = { str(side.iptrunk_side_node.subscription.subscription_id): side.iptrunk_side_node.subscription.description for side in subscription.iptrunk.iptrunk_sides } replaced_side_enum = Choice( "Select the side of the IP trunk to be replaced", zip(sides_dict.keys(), sides_dict.items()), # type: ignore[arg-type] ) class IPTrunkMigrateForm(FormPage): class Config: title = form_title tt_number: str replace_side: replaced_side_enum # type: ignore[valid-type] warning_label: Label = "Are we moving to a different Site?" # type: ignore[assignment] migrate_to_different_site: bool = False migrate_form_input = yield IPTrunkMigrateForm current_routers = [ side.iptrunk_side_node.subscription.subscription_id for side in subscription.iptrunk.iptrunk_sides ] routers = {} for router in get_active_router_subscriptions(includes=["subscription_id", "description"]): router_id = router["subscription_id"] if router_id not in current_routers: current_router_site = Router.from_subscription(router_id).router.router_site.subscription old_side_site = Router.from_subscription(migrate_form_input.replace_side).router.router_site if ( migrate_form_input.migrate_to_different_site and current_router_site.subscription_id == old_side_site.owner_subscription_id ): continue routers[str(router_id)] = router["description"] new_router_enum = Choice("Select a new router", zip(routers.keys(), routers.items())) # type: ignore[arg-type] class NewSideIPTrunkRouterForm(FormPage): class Config: title = form_title new_node: new_router_enum # type: ignore[valid-type] new_side_iptrunk_router_input = yield NewSideIPTrunkRouterForm new_router = new_side_iptrunk_router_input.new_node side_a_ae_iface = available_lags_choices(new_router) or str if get_router_vendor(new_router) == RouterVendor.NOKIA: class NokiaLAGMember(LAGMember): interface_name: available_interfaces_choices( # type: ignore[valid-type] new_router, subscription.iptrunk.iptrunk_speed ) class NokiaAeMembers(UniqueConstrainedList[NokiaLAGMember]): min_items = len(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members) max_items = len(subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_members) ae_members = NokiaAeMembers else: class JuniperLagMember(UniqueConstrainedList[LAGMember]): min_items = len(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members) max_items = len(subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_members) ae_members = JuniperLagMember # type: ignore[assignment] replace_index = ( 0 if str(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.subscription.subscription_id) == migrate_form_input.replace_side else 1 ) existing_lag_ae_members = [ {"interface_name": iface.interface_name, "interface_description": iface.interface_description} for iface in subscription.iptrunk.iptrunk_sides[replace_index].iptrunk_side_ae_members ] class NewSideIPTrunkForm(FormPage): class Config: title = form_title new_lag_interface: side_a_ae_iface # type: ignore[valid-type] existing_lag_interface: list[LAGMember] = ReadOnlyField(existing_lag_ae_members) new_lag_member_interfaces: ae_members # type: ignore[valid-type] @validator("new_lag_interface", allow_reuse=True, pre=True, always=True) def lag_interface_proper_name(cls, new_lag_interface: str) -> str | NoReturn: if get_router_vendor(new_router) == RouterVendor.JUNIPER: juniper_lag_re = re.compile("^ae\\d{1,2}$") if not juniper_lag_re.match(new_lag_interface): raise ValueError("Invalid LAG name, please try again.") return new_lag_interface new_side_input = yield NewSideIPTrunkForm return ( migrate_form_input.dict() | new_side_iptrunk_router_input.dict() | new_side_input.dict() | {"replace_index": replace_index} ) @step("[DRY RUN] Disable configuration on old router") def disable_old_config_dry( subscription: Iptrunk, new_node: Router, new_lag_interface: str, new_lag_member_interfaces: list[dict], replace_index: int, process_id: UUIDstr, tt_number: str, ) -> State: provisioning_proxy.migrate_ip_trunk( subscription, new_node, new_lag_interface, new_lag_member_interfaces, replace_index, process_id, tt_number, "deactivate", "deactivate", ) return { "subscription": subscription, "label_text": "[DRY RUN] Disable config on old node, please refresh to get the results of the playbook.", } @step("[REAL] Disable configuration on old router") def disable_old_config_real( subscription: Iptrunk, new_node: Router, new_lag_interface: str, new_lag_member_interfaces: list[dict], replace_index: int, process_id: UUIDstr, tt_number: str, ) -> State: provisioning_proxy.migrate_ip_trunk( subscription, new_node, new_lag_interface, new_lag_member_interfaces, replace_index, process_id, tt_number, "deactivate", "deactivate", False, ) return { "subscription": subscription, "label_text": "Disable config on the old node, please refresh to get the results of the playbook.", } @step("[DRY RUN] Deploy configuration on new router") def deploy_new_config_dry( subscription: Iptrunk, new_node: Router, new_lag_interface: str, new_lag_member_interfaces: list[dict], replace_index: int, process_id: UUIDstr, tt_number: str, ) -> State: provisioning_proxy.migrate_ip_trunk( subscription, new_node, new_lag_interface, new_lag_member_interfaces, replace_index, process_id, tt_number, "deploy", "trunk_interface", ) logger.warning("Playbook verb is not yet properly set.") return { "subscription": subscription, "label_text": "[DRY RUN] Deploying new trunk interface, please refresh to get the results of the playbook.", } @step("Deploy configuration on new router") def deploy_new_config_real( subscription: Iptrunk, new_node: Router, new_lag_interface: str, new_lag_member_interfaces: list[dict], replace_index: int, process_id: UUIDstr, tt_number: str, ) -> State: provisioning_proxy.migrate_ip_trunk( subscription, new_node, new_lag_interface, new_lag_member_interfaces, replace_index, process_id, tt_number, "deploy", "trunk_interface", False, ) logger.warning("Playbook verb is not yet properly set.") return { "subscription": subscription, "label_text": "[COMMIT] Deploying new trunk interface, please refresh to get the results of the playbook.", } @inputstep("Wait for confirmation", assignee=Assignee.SYSTEM) def confirm_continue_move_fiber() -> FormGenerator: class ProvisioningResultPage(FormPage): class Config: title = "Please confirm before continuing" info_label: Label = ( "New Trunk interface has been deployed, " "wait for the physical connection to be moved." # type: ignore[assignment] ) yield ProvisioningResultPage return {} # Interface checks go here @step("Deploy ISIS configuration on new router") def deploy_new_isis( subscription: Iptrunk, new_node: Router, new_lag_interface: str, new_lag_member_interfaces: list[dict], replace_index: int, process_id: UUIDstr, tt_number: str, ) -> State: provisioning_proxy.migrate_ip_trunk( subscription, new_node, new_lag_interface, new_lag_member_interfaces, replace_index, process_id, tt_number, "deploy", "isis_interface", False, ) logger.warning("Playbook verb is not yet properly set.") return { "subscription": subscription, "label_text": "Deploy ISIS config on the new router, please refresh to get the results of the playbook.", } @inputstep("Wait for confirmation", assignee=Assignee.SYSTEM) def confirm_continue_restore_isis() -> FormGenerator: class ProvisioningResultPage(FormPage): class Config: title = "Please confirm before continuing" info_label: Label = ( "ISIS config has been deployed, confirm if you want to restore the old metric." # type: ignore[assignment] ) yield ProvisioningResultPage return {} @step("Restore ISIS metric to original value") def restore_isis_metric(subscription: Iptrunk, process_id: UUIDstr, tt_number: str, old_isis_metric: int) -> State: subscription.iptrunk.iptrunk_isis_metric = old_isis_metric provisioning_proxy.provision_ip_trunk(subscription, process_id, tt_number, "isis_interface", False) return {"subscription": subscription} @step("[DRY RUN] Delete configuration on old router") def delete_old_config_dry( subscription: Iptrunk, new_node: Router, new_lag_interface: str, new_lag_member_interfaces: list[dict], replace_index: int, process_id: UUIDstr, tt_number: str, ) -> State: provisioning_proxy.migrate_ip_trunk( subscription, new_node, new_lag_interface, new_lag_member_interfaces, replace_index, process_id, tt_number, "delete", "delete", ) logger.warning("Playbook verb is not yet properly set.") return { "subscription": subscription, "label_text": "[DRY RUN] Removing configuration from old router," "please refresh to get the results of the playbook.", } @step("Delete configuration on old router") def delete_old_config_real( subscription: Iptrunk, new_node: Router, new_lag_interface: str, new_lag_member_interfaces: list[dict], replace_index: int, process_id: UUIDstr, tt_number: str, ) -> State: provisioning_proxy.migrate_ip_trunk( subscription, new_node, new_lag_interface, new_lag_member_interfaces, replace_index, process_id, tt_number, "delete", "delete", False, ) logger.warning("Playbook verb is not yet properly set.") return { "subscription": subscription, "label_text": "Removing configuration from old router, please refresh to get the results of the playbook.", } @step("Update IPAM") def update_ipam(subscription: Iptrunk) -> State: return {"subscription": subscription} @step("Update subscription model") def update_subscription_model( subscription: Iptrunk, replace_index: int, new_node: UUIDstr, new_lag_interface: str, new_lag_member_interfaces: list[dict], ) -> State: # Deep copy of subscription data old_subscription = copy.deepcopy(subscription) old_side_data = { "iptrunk_side_node": old_subscription.iptrunk.iptrunk_sides[replace_index].iptrunk_side_node, "iptrunk_side_ae_iface": old_subscription.iptrunk.iptrunk_sides[replace_index].iptrunk_side_ae_iface, "iptrunk_side_ae_members": old_subscription.iptrunk.iptrunk_sides[replace_index].iptrunk_side_ae_members, } subscription.iptrunk.iptrunk_sides[replace_index].iptrunk_side_node = Router.from_subscription(new_node).router subscription.iptrunk.iptrunk_sides[replace_index].iptrunk_side_ae_iface = new_lag_interface subscription.iptrunk.iptrunk_sides[replace_index].iptrunk_side_ae_members.clear() # And update the list to only include the new member interfaces for member in new_lag_member_interfaces: subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members.append( IptrunkInterfaceBlock.new(subscription_id=uuid4(), **member) ) return {"subscription": subscription, "old_side_data": old_side_data} @step("Reserve interfaces in Netbox") def reserve_interfaces_in_netbox( subscription: Iptrunk, new_node: UUIDstr, new_lag_interface: str, new_lag_member_interfaces: list[dict], ) -> State: new_side = Router.from_subscription(new_node).router nbclient = NetboxClient() if new_side.router_vendor == RouterVendor.NOKIA: # Create LAG interfaces lag_interface: Interfaces = nbclient.create_interface( iface_name=new_lag_interface, type="lag", device_name=new_side.router_fqdn, description=str(subscription.subscription_id), enabled=True, ) # Attach physical interfaces to LAG # Reserve interfaces for interface in new_lag_member_interfaces: nbclient.attach_interface_to_lag( device_name=new_side.router_fqdn, lag_name=lag_interface.name, iface_name=interface["interface_name"], description=str(subscription.subscription_id), ) nbclient.reserve_interface( device_name=new_side.router_fqdn, iface_name=interface["interface_name"], ) return {"subscription": subscription} @step("Update Netbox. Allocate new interfaces and deallocate old ones.") def update_netbox( subscription: Iptrunk, replace_index: int, old_side_data: dict, ) -> State: new_side = subscription.iptrunk.iptrunk_sides[replace_index] nbclient = NetboxClient() if new_side.iptrunk_side_node.router_vendor == RouterVendor.NOKIA: for interface in new_side.iptrunk_side_ae_members: nbclient.allocate_interface( device_name=new_side.iptrunk_side_node.router_fqdn, iface_name=interface.interface_name, ) if old_side_data["iptrunk_side_node"]["router_vendor"] == RouterVendor.NOKIA: # Set interfaces to free for iface in old_side_data["iptrunk_side_ae_members"]: nbclient.free_interface(old_side_data["iptrunk_side_node"]["router_fqdn"], iface["interface_name"]) # Delete LAG interfaces nbclient.delete_interface( old_side_data["iptrunk_side_node"]["router_fqdn"], old_side_data["iptrunk_side_ae_iface"] ) return {"subscription": subscription} @workflow( "Migrate an IP Trunk", initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator), target=Target.MODIFY, ) def migrate_iptrunk() -> StepList: return ( init >> store_process_subscription(Target.MODIFY) >> unsync >> reserve_interfaces_in_netbox >> pp_interaction(set_isis_to_90000, 3) >> pp_interaction(disable_old_config_dry, 3) >> pp_interaction(disable_old_config_real, 3) >> pp_interaction(deploy_new_config_dry, 3) >> pp_interaction(deploy_new_config_real, 3) >> confirm_continue_move_fiber >> pp_interaction(deploy_new_isis, 3) >> confirm_continue_restore_isis >> pp_interaction(restore_isis_metric, 3) >> pp_interaction(delete_old_config_dry, 3) >> pp_interaction(delete_old_config_real, 3) >> update_ipam >> update_subscription_model >> update_netbox >> resync >> done )