Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • goat/gap/geant-service-orchestrator
1 result
Select Git revision
Show changes
Commits on Source (6)
...@@ -3,6 +3,7 @@ from uuid import UUID ...@@ -3,6 +3,7 @@ from uuid import UUID
import pydantic import pydantic
import pynetbox import pynetbox
from infoblox_client.objects import Interface
from pynetbox.models.dcim import Devices, DeviceTypes, Interfaces from pynetbox.models.dcim import Devices, DeviceTypes, Interfaces
from gso.products.product_types.router import Router from gso.products.product_types.router import Router
...@@ -53,6 +54,14 @@ class NetboxClient: ...@@ -53,6 +54,14 @@ class NetboxClient:
def get_all_devices(self) -> list[Devices]: def get_all_devices(self) -> list[Devices]:
return list(self.netbox.dcim.devices.all()) return list(self.netbox.dcim.devices.all())
def get_allocated_interfaces_by_gso_subscription(self, device_name: str, subscription_id: UUID) -> list[Interfaces]:
"""Return all allocated interfaces of a device by name."""
device = self.get_device_by_name(device_name)
return self.netbox.dcim.interfaces.filter(
device_id=device.id, enabled=True, mark_connected=True, description=subscription_id
)
def get_device_by_name(self, device_name: str) -> Devices: def get_device_by_name(self, device_name: str) -> Devices:
"""Return the device object by name from netbox, or ``None`` if not found.""" """Return the device object by name from netbox, or ``None`` if not found."""
return self.netbox.dcim.devices.get(name=device_name) return self.netbox.dcim.devices.get(name=device_name)
...@@ -84,6 +93,13 @@ class NetboxClient: ...@@ -84,6 +93,13 @@ class NetboxClient:
description=description, description=description,
) )
def delete_interface(self, device_name: str, iface_name: str) -> None:
"""Delete an interface from a device by name."""
device = self.get_device_by_name(device_name)
interface = self.netbox.dcim.interfaces.get(device_id=device.id, name=iface_name)
return interface.delete()
def create_device_type(self, manufacturer: str, model: str, slug: str) -> DeviceTypes: def create_device_type(self, manufacturer: str, model: str, slug: str) -> DeviceTypes:
"""Create a new device type in Netbox.""" """Create a new device type in Netbox."""
...@@ -224,6 +240,34 @@ class NetboxClient: ...@@ -224,6 +240,34 @@ class NetboxClient:
return interface return interface
def free_interface(self, device_name: str, iface_name: str) -> Interfaces:
"""Free interface by marking disconnect and disable it."""
device = self.get_device_by_name(device_name)
interface = self.netbox.dcim.interfaces.get(device_id=device.id, name=iface_name)
# Check if interface is available
if interface is None:
raise NotFoundError(f"Interface: {iface_name} on device: {device_name} not found.")
interface.mark_connected = False
interface.enabled = False
interface.description = ""
interface.save()
return interface
def detach_interfaces_from_lag(self, device_name: str, lag_name: str) -> None:
"""Detach all interfaces from a LAG."""
device = self.get_device_by_name(device_name)
lag = self.netbox.dcim.interfaces.get(device_id=device.id, name=lag_name)
for interface in self.netbox.dcim.interfaces.filter(
device_id=device.id, lag_id=lag.id, enabled=False, mark_connected=False
):
interface.lag = None
interface.save()
return
def get_available_lags(self, router_id: UUID) -> list[str]: def get_available_lags(self, router_id: UUID) -> list[str]:
"""Return all available :term:`LAG`s not assigned to a device.""" """Return all available :term:`LAG`s not assigned to a device."""
...@@ -258,3 +302,13 @@ class NetboxClient: ...@@ -258,3 +302,13 @@ class NetboxClient:
return self.netbox.dcim.interfaces.filter( return self.netbox.dcim.interfaces.filter(
device=device.name, enabled=False, mark_connected=False, speed=speed_bps device=device.name, enabled=False, mark_connected=False, speed=speed_bps
) )
def get_interface_by_name_and_device(self, router_id: UUID, interface_name: str) -> Interface:
"""Return the interface object by name and device from netbox, or ``None`` if not found."""
router = Router.from_subscription(router_id).router.router_fqdn
device = self.get_device_by_name(router)
try:
return self.netbox.dcim.interfaces.get(device=device.name, name=interface_name)
except pynetbox.RequestError:
raise NotFoundError(f"Interface: {interface_name} on device: {device.name} not found.")
...@@ -103,7 +103,12 @@ def provision_router( ...@@ -103,7 +103,12 @@ def provision_router(
def provision_ip_trunk( def provision_ip_trunk(
subscription: IptrunkProvisioning, process_id: UUIDstr, tt_number: str, config_object: str, dry_run: bool = True subscription: IptrunkProvisioning,
process_id: UUIDstr,
tt_number: str,
config_object: str,
dry_run: bool = True,
removed_ae_members: list[str] | None = None,
) -> None: ) -> None:
"""Provision an IP trunk service using :term:`LSO`. """Provision an IP trunk service using :term:`LSO`.
...@@ -116,6 +121,8 @@ def provision_ip_trunk( ...@@ -116,6 +121,8 @@ def provision_ip_trunk(
:param dry_run: A boolean indicating whether this should be a dry run or not, defaults to `True`. :param dry_run: A boolean indicating whether this should be a dry run or not, defaults to `True`.
:type dry_run: bool :type dry_run: bool
:rtype: None :rtype: None
:param removed_ae_members: A list of interfaces that are removed from the :term:`LAG`, defaults to `None`.
it's only used when we removed some interfaces from the LAG in modify_ip_trunk.
""" """
parameters = { parameters = {
"subscription": json.loads(json_dumps(subscription)), "subscription": json.loads(json_dumps(subscription)),
...@@ -124,6 +131,7 @@ def provision_ip_trunk( ...@@ -124,6 +131,7 @@ def provision_ip_trunk(
"tt_number": tt_number, "tt_number": tt_number,
"process_id": process_id, "process_id": process_id,
"object": config_object, "object": config_object,
"removed_ae_members": removed_ae_members,
} }
_send_request("ip_trunk", parameters, process_id, CUDOperation.POST) _send_request("ip_trunk", parameters, process_id, CUDOperation.POST)
...@@ -175,7 +183,7 @@ def migrate_ip_trunk( ...@@ -175,7 +183,7 @@ def migrate_ip_trunk(
subscription: Iptrunk, subscription: Iptrunk,
new_node: Router, new_node: Router,
new_lag_interface: str, new_lag_interface: str,
new_lag_member_interfaces: list[str], new_lag_member_interfaces: list[dict],
replace_index: int, replace_index: int,
process_id: UUIDstr, process_id: UUIDstr,
tt_number: str, tt_number: str,
......
...@@ -7,6 +7,7 @@ from orchestrator.types import State, UUIDstr ...@@ -7,6 +7,7 @@ from orchestrator.types import State, UUIDstr
from pydantic import BaseModel from pydantic import BaseModel
from pydantic_forms.validators import Choice from pydantic_forms.validators import Choice
from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock
from gso.products.product_blocks.router import RouterVendor from gso.products.product_blocks.router import RouterVendor
from gso.products.product_types.iptrunk import Iptrunk from gso.products.product_types.iptrunk import Iptrunk
from gso.products.product_types.router import Router from gso.products.product_types.router import Router
...@@ -51,6 +52,30 @@ def available_interfaces_choices(router_id: UUID, speed: str) -> Choice | None: ...@@ -51,6 +52,30 @@ def available_interfaces_choices(router_id: UUID, speed: str) -> Choice | None:
return Choice("ae member", zip(interfaces.keys(), interfaces.items())) # type: ignore[arg-type] return Choice("ae member", zip(interfaces.keys(), interfaces.items())) # type: ignore[arg-type]
def available_interfaces_choices_including_current_members(
router_id: UUID, speed: str, interfaces: list[IptrunkInterfaceBlock]
) -> Choice | None:
"""Return a list of available interfaces for a given router and speed including the current members.
For Nokia routers, return a list of available interfaces.
For Juniper routers, return a string.
"""
if Router.from_subscription(router_id).router.router_vendor != RouterVendor.NOKIA:
return None
available_interfaces = list(NetboxClient().get_available_interfaces(router_id, speed))
available_interfaces.extend(
[
NetboxClient().get_interface_by_name_and_device(router_id, interface.interface_name)
for interface in interfaces
]
)
options = {
interface["name"]: f"{interface['name']} - {interface['module']['display']} - {interface['description']}"
for interface in available_interfaces
}
return Choice("ae member", zip(options.keys(), options.items())) # type: ignore[arg-type]
def available_lags_choices(router_id: UUID) -> Choice | None: def available_lags_choices(router_id: UUID) -> Choice | None:
"""Return a list of available lags for a given router. """Return a list of available lags for a given router.
......
import copy
import re import re
from logging import getLogger from logging import getLogger
from typing import NoReturn from typing import NoReturn
from uuid import uuid4
from orchestrator import step, workflow from orchestrator import step, workflow
from orchestrator.config.assignee import Assignee from orchestrator.config.assignee import Assignee
from orchestrator.db import ProductTable, SubscriptionTable
from orchestrator.forms import FormPage from orchestrator.forms import FormPage
from orchestrator.forms.validators import Choice, Label, UniqueConstrainedList from orchestrator.forms.validators import Choice, Label, UniqueConstrainedList
from orchestrator.targets import Target from orchestrator.targets import Target
...@@ -13,108 +14,139 @@ from orchestrator.workflow import StepList, done, init, inputstep ...@@ -13,108 +14,139 @@ from orchestrator.workflow import StepList, done, init, inputstep
from orchestrator.workflows.steps import resync, store_process_subscription, unsync from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic import validator from pydantic import validator
from pydantic_forms.core import ReadOnlyField
from pynetbox.models.dcim import Interfaces
from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock
from gso.products.product_blocks.router import RouterVendor
from gso.products.product_types.iptrunk import Iptrunk from gso.products.product_types.iptrunk import Iptrunk
from gso.products.product_types.router import Router from gso.products.product_types.router import Router
from gso.services import provisioning_proxy from gso.services import provisioning_proxy
from gso.services.netbox_client import NetboxClient
from gso.services.provisioning_proxy import pp_interaction from gso.services.provisioning_proxy import pp_interaction
from gso.utils.helpers import set_isis_to_90000 from gso.services.subscriptions import get_active_router_subscriptions
from gso.utils.helpers import (
LAGMember,
available_interfaces_choices,
available_lags_choices,
get_router_vendor,
set_isis_to_90000,
)
logger = getLogger(__name__) logger = getLogger(__name__)
def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
subscription = Iptrunk.from_subscription(subscription_id) subscription = Iptrunk.from_subscription(subscription_id)
form_title = (
f"Subscription {subscription.iptrunk.geant_s_sid} "
f" from {subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}"
f" to {subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}"
)
sides_dict = { sides_dict = {
str(side.iptrunk_side_node.subscription.subscription_id): side.iptrunk_side_node.subscription.description str(side.iptrunk_side_node.subscription.subscription_id): side.iptrunk_side_node.subscription.description
for side in subscription.iptrunk.iptrunk_sides for side in subscription.iptrunk.iptrunk_sides
} }
ReplacedSide = Choice( replaced_side_enum = Choice(
"Select the side of the IP trunk to be replaced", "Select the side of the IP trunk to be replaced",
zip(sides_dict.keys(), sides_dict.items()), # type: ignore[arg-type] zip(sides_dict.keys(), sides_dict.items()), # type: ignore[arg-type]
) )
class OldSideIptrunkForm(FormPage): class IPTrunkMigrateForm(FormPage):
class Config: class Config:
title = ( title = form_title
f"Subscription {subscription.iptrunk.geant_s_sid} from "
f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}"
f" to "
f"{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}"
)
tt_number: str tt_number: str
replace_side: ReplacedSide # type: ignore[valid-type] replace_side: replaced_side_enum # type: ignore[valid-type]
warning_label: Label = "Are we moving to a different Site?" # type: ignore[assignment] warning_label: Label = "Are we moving to a different Site?" # type: ignore[assignment]
migrate_to_different_site: bool | None = False migrate_to_different_site: bool = False
old_side_input = yield OldSideIptrunkForm migrate_form_input = yield IPTrunkMigrateForm
current_routers = [
side.iptrunk_side_node.subscription.subscription_id for side in subscription.iptrunk.iptrunk_sides
]
routers = {} routers = {}
for router_id, router_description in ( for router in get_active_router_subscriptions(includes=["subscription_id", "description"]):
SubscriptionTable.query.join(ProductTable) router_id = router["subscription_id"]
.filter( if router_id not in current_routers:
ProductTable.product_type == "Router", current_router_site = Router.from_subscription(router_id).router.router_site.subscription
SubscriptionTable.status == "active", old_side_site = Router.from_subscription(migrate_form_input.replace_side).router.router_site
)
.with_entities(SubscriptionTable.subscription_id, SubscriptionTable.description)
.all()
):
if router_id not in [
subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.subscription.subscription_id,
subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.subscription.subscription_id,
]:
current_router = Router.from_subscription(router_id)
old_side_site_id = Router.from_subscription(old_side_input.replace_side).router.router_site
if ( if (
not old_side_input.migrate_to_different_site migrate_form_input.migrate_to_different_site
and current_router.router.router_site.subscription.subscription_id != old_side_site_id and current_router_site.subscription_id == old_side_site.owner_subscription_id
): ):
continue continue
routers[str(router_id)] = router_description routers[str(router_id)] = router["description"]
NewRouterEnum = Choice("Select a new router", zip(routers.keys(), routers.items())) # type: ignore[arg-type] new_router_enum = Choice("Select a new router", zip(routers.keys(), routers.items())) # type: ignore[arg-type]
class LagMemberList(UniqueConstrainedList[str]): class NewSideIPTrunkRouterForm(FormPage):
min_items = len(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members)
max_items = len(subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_members)
class NewSideIptrunkForm(FormPage):
class Config: class Config:
title = ( title = form_title
f"Subscription {subscription.iptrunk.geant_s_sid} from "
f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn} to " new_node: new_router_enum # type: ignore[valid-type]
f"{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}"
new_side_iptrunk_router_input = yield NewSideIPTrunkRouterForm
new_router = new_side_iptrunk_router_input.new_node
side_a_ae_iface = available_lags_choices(new_router) or str
if get_router_vendor(new_router) == RouterVendor.NOKIA:
class NokiaLAGMember(LAGMember):
interface_name: available_interfaces_choices( # type: ignore[valid-type]
new_router, subscription.iptrunk.iptrunk_speed
) )
new_node: NewRouterEnum # type: ignore[valid-type] class NokiaAeMembers(UniqueConstrainedList[NokiaLAGMember]):
new_lag_interface: str min_items = len(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members)
new_lag_member_interfaces: LagMemberList max_items = len(subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_members)
@validator("new_lag_interface", allow_reuse=True, pre=True, always=True) ae_members = NokiaAeMembers
def lag_interface_proper_name(cls, new_lag_name: str) -> str | NoReturn: else:
nokia_lag_re = re.compile("^lag-\\d+$")
juniper_lag_re = re.compile("^ae\\d{1,2}$")
if nokia_lag_re.match(new_lag_name) or juniper_lag_re.match(new_lag_name): class JuniperLagMember(UniqueConstrainedList[LAGMember]):
return new_lag_name min_items = len(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members)
max_items = len(subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_members)
raise ValueError("Invalid LAG name, please try again.") ae_members = JuniperLagMember # type: ignore[assignment]
new_side_input = yield NewSideIptrunkForm replace_index = (
0
if str(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.subscription.subscription_id)
== migrate_form_input.replace_side
else 1
)
existing_lag_ae_members = [
{"interface_name": iface.interface_name, "interface_description": iface.interface_description}
for iface in subscription.iptrunk.iptrunk_sides[replace_index].iptrunk_side_ae_members
]
def _find_updated_side_of_trunk(trunk: Iptrunk, new_side: str) -> int: class NewSideIPTrunkForm(FormPage):
sides = trunk.iptrunk.iptrunk_sides class Config:
if str(sides[0].iptrunk_side_node.subscription.subscription_id) == new_side: title = form_title
return 0
elif str(sides[1].iptrunk_side_node.subscription.subscription_id) == new_side: # noqa: RET505
return 1
raise ValueError("Invalid Router id provided to be replaced!")
replace_index = _find_updated_side_of_trunk(subscription, old_side_input.replace_side) new_lag_interface: side_a_ae_iface # type: ignore[valid-type]
existing_lag_interface: list[LAGMember] = ReadOnlyField(existing_lag_ae_members)
new_lag_member_interfaces: ae_members # type: ignore[valid-type]
return old_side_input.dict() | new_side_input.dict() | {"replace_index": replace_index} @validator("new_lag_interface", allow_reuse=True, pre=True, always=True)
def lag_interface_proper_name(cls, new_lag_interface: str) -> str | NoReturn:
if get_router_vendor(new_router) == RouterVendor.JUNIPER:
juniper_lag_re = re.compile("^ae\\d{1,2}$")
if not juniper_lag_re.match(new_lag_interface):
raise ValueError("Invalid LAG name, please try again.")
return new_lag_interface
new_side_input = yield NewSideIPTrunkForm
return (
migrate_form_input.dict()
| new_side_iptrunk_router_input.dict()
| new_side_input.dict()
| {"replace_index": replace_index}
)
@step("[DRY RUN] Disable configuration on old router") @step("[DRY RUN] Disable configuration on old router")
...@@ -122,7 +154,7 @@ def disable_old_config_dry( ...@@ -122,7 +154,7 @@ def disable_old_config_dry(
subscription: Iptrunk, subscription: Iptrunk,
new_node: Router, new_node: Router,
new_lag_interface: str, new_lag_interface: str,
new_lag_member_interfaces: list[str], new_lag_member_interfaces: list[dict],
replace_index: int, replace_index: int,
process_id: UUIDstr, process_id: UUIDstr,
tt_number: str, tt_number: str,
...@@ -150,7 +182,7 @@ def disable_old_config_real( ...@@ -150,7 +182,7 @@ def disable_old_config_real(
subscription: Iptrunk, subscription: Iptrunk,
new_node: Router, new_node: Router,
new_lag_interface: str, new_lag_interface: str,
new_lag_member_interfaces: list[str], new_lag_member_interfaces: list[dict],
replace_index: int, replace_index: int,
process_id: UUIDstr, process_id: UUIDstr,
tt_number: str, tt_number: str,
...@@ -179,7 +211,7 @@ def deploy_new_config_dry( ...@@ -179,7 +211,7 @@ def deploy_new_config_dry(
subscription: Iptrunk, subscription: Iptrunk,
new_node: Router, new_node: Router,
new_lag_interface: str, new_lag_interface: str,
new_lag_member_interfaces: list[str], new_lag_member_interfaces: list[dict],
replace_index: int, replace_index: int,
process_id: UUIDstr, process_id: UUIDstr,
tt_number: str, tt_number: str,
...@@ -209,7 +241,7 @@ def deploy_new_config_real( ...@@ -209,7 +241,7 @@ def deploy_new_config_real(
subscription: Iptrunk, subscription: Iptrunk,
new_node: Router, new_node: Router,
new_lag_interface: str, new_lag_interface: str,
new_lag_member_interfaces: list[str], new_lag_member_interfaces: list[dict],
replace_index: int, replace_index: int,
process_id: UUIDstr, process_id: UUIDstr,
tt_number: str, tt_number: str,
...@@ -259,7 +291,7 @@ def deploy_new_isis( ...@@ -259,7 +291,7 @@ def deploy_new_isis(
subscription: Iptrunk, subscription: Iptrunk,
new_node: Router, new_node: Router,
new_lag_interface: str, new_lag_interface: str,
new_lag_member_interfaces: list[str], new_lag_member_interfaces: list[dict],
replace_index: int, replace_index: int,
process_id: UUIDstr, process_id: UUIDstr,
tt_number: str, tt_number: str,
...@@ -313,7 +345,7 @@ def delete_old_config_dry( ...@@ -313,7 +345,7 @@ def delete_old_config_dry(
subscription: Iptrunk, subscription: Iptrunk,
new_node: Router, new_node: Router,
new_lag_interface: str, new_lag_interface: str,
new_lag_member_interfaces: list[str], new_lag_member_interfaces: list[dict],
replace_index: int, replace_index: int,
process_id: UUIDstr, process_id: UUIDstr,
tt_number: str, tt_number: str,
...@@ -344,7 +376,7 @@ def delete_old_config_real( ...@@ -344,7 +376,7 @@ def delete_old_config_real(
subscription: Iptrunk, subscription: Iptrunk,
new_node: Router, new_node: Router,
new_lag_interface: str, new_lag_interface: str,
new_lag_member_interfaces: list[str], new_lag_member_interfaces: list[dict],
replace_index: int, replace_index: int,
process_id: UUIDstr, process_id: UUIDstr,
tt_number: str, tt_number: str,
...@@ -372,8 +404,6 @@ def delete_old_config_real( ...@@ -372,8 +404,6 @@ def delete_old_config_real(
@step("Update IPAM") @step("Update IPAM")
def update_ipam(subscription: Iptrunk) -> State: def update_ipam(subscription: Iptrunk) -> State:
pass
return {"subscription": subscription} return {"subscription": subscription}
...@@ -383,12 +413,85 @@ def update_subscription_model( ...@@ -383,12 +413,85 @@ def update_subscription_model(
replace_index: int, replace_index: int,
new_node: UUIDstr, new_node: UUIDstr,
new_lag_interface: str, new_lag_interface: str,
new_lag_member_interfaces: list[str], new_lag_member_interfaces: list[dict],
) -> State: ) -> State:
# Deep copy of subscription data
old_subscription = copy.deepcopy(subscription)
old_side_data = {
"iptrunk_side_node": old_subscription.iptrunk.iptrunk_sides[replace_index].iptrunk_side_node,
"iptrunk_side_ae_iface": old_subscription.iptrunk.iptrunk_sides[replace_index].iptrunk_side_ae_iface,
"iptrunk_side_ae_members": old_subscription.iptrunk.iptrunk_sides[replace_index].iptrunk_side_ae_members,
}
subscription.iptrunk.iptrunk_sides[replace_index].iptrunk_side_node = Router.from_subscription(new_node).router subscription.iptrunk.iptrunk_sides[replace_index].iptrunk_side_node = Router.from_subscription(new_node).router
subscription.iptrunk.iptrunk_sides[replace_index].iptrunk_side_ae_iface = new_lag_interface subscription.iptrunk.iptrunk_sides[replace_index].iptrunk_side_ae_iface = new_lag_interface
subscription.iptrunk.iptrunk_sides[replace_index].iptrunk_side_ae_members = new_lag_member_interfaces subscription.iptrunk.iptrunk_sides[replace_index].iptrunk_side_ae_members.clear()
# And update the list to only include the new member interfaces
for member in new_lag_member_interfaces:
subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members.append(
IptrunkInterfaceBlock.new(subscription_id=uuid4(), **member)
)
return {"subscription": subscription, "old_side_data": old_side_data}
@step("Reserve interfaces in Netbox")
def reserve_interfaces_in_netbox(
subscription: Iptrunk,
new_node: UUIDstr,
new_lag_interface: str,
new_lag_member_interfaces: list[dict],
) -> State:
new_side = Router.from_subscription(new_node).router
nbclient = NetboxClient()
if new_side.router_vendor == RouterVendor.NOKIA:
# Create LAG interfaces
lag_interface: Interfaces = nbclient.create_interface(
iface_name=new_lag_interface,
type="lag",
device_name=new_side.router_fqdn,
description=str(subscription.subscription_id),
enabled=True,
)
# Attach physical interfaces to LAG
# Reserve interfaces
for interface in new_lag_member_interfaces:
nbclient.attach_interface_to_lag(
device_name=new_side.router_fqdn,
lag_name=lag_interface.name,
iface_name=interface["interface_name"],
description=str(subscription.subscription_id),
)
nbclient.reserve_interface(
device_name=new_side.router_fqdn,
iface_name=interface["interface_name"],
)
return {"subscription": subscription}
@step("Update Netbox. Allocate new interfaces and deallocate old ones.")
def update_netbox(
subscription: Iptrunk,
replace_index: int,
old_side_data: dict,
) -> State:
new_side = subscription.iptrunk.iptrunk_sides[replace_index]
nbclient = NetboxClient()
if new_side.iptrunk_side_node.router_vendor == RouterVendor.NOKIA:
for interface in new_side.iptrunk_side_ae_members:
nbclient.allocate_interface(
device_name=new_side.iptrunk_side_node.router_fqdn,
iface_name=interface.interface_name,
)
if old_side_data["iptrunk_side_node"]["router_vendor"] == RouterVendor.NOKIA:
# Set interfaces to free
for iface in old_side_data["iptrunk_side_ae_members"]:
nbclient.free_interface(old_side_data["iptrunk_side_node"]["router_fqdn"], iface["interface_name"])
# Delete LAG interfaces
nbclient.delete_interface(
old_side_data["iptrunk_side_node"]["router_fqdn"], old_side_data["iptrunk_side_ae_iface"]
)
return {"subscription": subscription} return {"subscription": subscription}
...@@ -402,6 +505,7 @@ def migrate_iptrunk() -> StepList: ...@@ -402,6 +505,7 @@ def migrate_iptrunk() -> StepList:
init init
>> store_process_subscription(Target.MODIFY) >> store_process_subscription(Target.MODIFY)
>> unsync >> unsync
>> reserve_interfaces_in_netbox
>> pp_interaction(set_isis_to_90000, 3) >> pp_interaction(set_isis_to_90000, 3)
>> pp_interaction(disable_old_config_dry, 3) >> pp_interaction(disable_old_config_dry, 3)
>> pp_interaction(disable_old_config_real, 3) >> pp_interaction(disable_old_config_real, 3)
...@@ -415,6 +519,7 @@ def migrate_iptrunk() -> StepList: ...@@ -415,6 +519,7 @@ def migrate_iptrunk() -> StepList:
>> pp_interaction(delete_old_config_real, 3) >> pp_interaction(delete_old_config_real, 3)
>> update_ipam >> update_ipam
>> update_subscription_model >> update_subscription_model
>> update_netbox
>> resync >> resync
>> done >> done
) )
import ipaddress import ipaddress
from typing import List, Type
from uuid import uuid4 from uuid import uuid4
from orchestrator.forms import FormPage, ReadOnlyField from orchestrator.forms import FormPage, ReadOnlyField
...@@ -8,12 +9,50 @@ from orchestrator.types import FormGenerator, State, UUIDstr ...@@ -8,12 +9,50 @@ from orchestrator.types import FormGenerator, State, UUIDstr
from orchestrator.workflow import StepList, done, init, step, workflow from orchestrator.workflow import StepList, done, init, step, workflow
from orchestrator.workflows.steps import resync, store_process_subscription, unsync from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic import validator
from pydantic_forms.validators import Label
from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock, IptrunkType, PhyPortCapacity from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock, IptrunkType, PhyPortCapacity
from gso.products.product_blocks.router import RouterVendor
from gso.products.product_types.iptrunk import Iptrunk from gso.products.product_types.iptrunk import Iptrunk
from gso.services import provisioning_proxy from gso.services import provisioning_proxy
from gso.services.netbox_client import NetboxClient
from gso.services.provisioning_proxy import pp_interaction from gso.services.provisioning_proxy import pp_interaction
from gso.utils.helpers import LAGMember from gso.utils.helpers import (
LAGMember,
available_interfaces_choices,
available_interfaces_choices_including_current_members,
validate_iptrunk_unique_interface,
)
def initialize_ae_members(subscription: Iptrunk, initial_user_input: dict, side_index: int) -> Type[LAGMember]:
"""Initialize the list of AE members."""
router = subscription.iptrunk.iptrunk_sides[side_index].iptrunk_side_node
iptrunk_minimum_link = initial_user_input["iptrunk_minimum_links"]
if router.router_vendor == RouterVendor.NOKIA:
iptrunk_speed = initial_user_input["iptrunk_speed"]
class NokiaLAGMember(LAGMember):
interface_name: available_interfaces_choices_including_current_members( # type: ignore[valid-type]
router.owner_subscription_id,
iptrunk_speed,
subscription.iptrunk.iptrunk_sides[side_index].iptrunk_side_ae_members,
) if iptrunk_speed == subscription.iptrunk.iptrunk_speed else (
available_interfaces_choices(router.owner_subscription_id, initial_user_input["iptrunk_speed"])
)
class NokiaAeMembers(UniqueConstrainedList[NokiaLAGMember]):
min_items = iptrunk_minimum_link
ae_members = NokiaAeMembers
else:
class JuniperAeMembers(UniqueConstrainedList[LAGMember]):
min_items = iptrunk_minimum_link
ae_members = JuniperAeMembers # type: ignore[assignment]
return ae_members # type: ignore[return-value]
def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
...@@ -24,6 +63,10 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: ...@@ -24,6 +63,10 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
geant_s_sid: str = subscription.iptrunk.geant_s_sid geant_s_sid: str = subscription.iptrunk.geant_s_sid
iptrunk_description: str = subscription.iptrunk.iptrunk_description iptrunk_description: str = subscription.iptrunk.iptrunk_description
iptrunk_type: IptrunkType = subscription.iptrunk.iptrunk_type iptrunk_type: IptrunkType = subscription.iptrunk.iptrunk_type
warning_label: Label = (
"Changing the PhyPortCapacity will result in the deletion of all AE members. "
"You will need to add the new AE members in the next steps." # type: ignore[assignment]
)
iptrunk_speed: PhyPortCapacity = subscription.iptrunk.iptrunk_speed iptrunk_speed: PhyPortCapacity = subscription.iptrunk.iptrunk_speed
iptrunk_minimum_links: int = subscription.iptrunk.iptrunk_minimum_links iptrunk_minimum_links: int = subscription.iptrunk.iptrunk_minimum_links
iptrunk_isis_metric: int = ReadOnlyField(subscription.iptrunk.iptrunk_isis_metric) iptrunk_isis_metric: int = ReadOnlyField(subscription.iptrunk.iptrunk_isis_metric)
...@@ -31,9 +74,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: ...@@ -31,9 +74,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
iptrunk_ipv6_network: ipaddress.IPv6Network = ReadOnlyField(subscription.iptrunk.iptrunk_ipv6_network) iptrunk_ipv6_network: ipaddress.IPv6Network = ReadOnlyField(subscription.iptrunk.iptrunk_ipv6_network)
initial_user_input = yield ModifyIptrunkForm initial_user_input = yield ModifyIptrunkForm
ae_members_side_a = initialize_ae_members(subscription, initial_user_input.dict(), 0)
class AeMembersListA(UniqueConstrainedList[LAGMember]):
min_items = initial_user_input.iptrunk_minimum_links
class ModifyIptrunkSideAForm(FormPage): class ModifyIptrunkSideAForm(FormPage):
class Config: class Config:
...@@ -42,13 +83,18 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: ...@@ -42,13 +83,18 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
side_a_node: str = ReadOnlyField(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn) side_a_node: str = ReadOnlyField(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn)
side_a_ae_iface: str = ReadOnlyField(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_iface) side_a_ae_iface: str = ReadOnlyField(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_iface)
side_a_ae_geant_a_sid: str = subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_geant_a_sid side_a_ae_geant_a_sid: str = subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_geant_a_sid
side_a_ae_members: AeMembersListA = subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members side_a_ae_members: ae_members_side_a = ( # type: ignore[valid-type]
subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members
if initial_user_input.iptrunk_speed == subscription.iptrunk.iptrunk_speed
else []
)
user_input_side_a = yield ModifyIptrunkSideAForm @validator("side_a_ae_members", allow_reuse=True)
def validate_iptrunk_unique_interface_side_a(cls, side_a_ae_members: list[LAGMember]) -> list[LAGMember]:
return validate_iptrunk_unique_interface(side_a_ae_members)
class AeMembersListB(UniqueConstrainedList[LAGMember]): user_input_side_a = yield ModifyIptrunkSideAForm
min_items = len(user_input_side_a.side_a_ae_members) ae_members_side_b = initialize_ae_members(subscription, initial_user_input.dict(), 1)
max_items = len(user_input_side_a.side_a_ae_members)
class ModifyIptrunkSideBForm(FormPage): class ModifyIptrunkSideBForm(FormPage):
class Config: class Config:
...@@ -57,7 +103,15 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: ...@@ -57,7 +103,15 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
side_b_node: str = ReadOnlyField(subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn) side_b_node: str = ReadOnlyField(subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn)
side_b_ae_iface: str = ReadOnlyField(subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_iface) side_b_ae_iface: str = ReadOnlyField(subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_iface)
side_b_ae_geant_a_sid: str = subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_geant_a_sid side_b_ae_geant_a_sid: str = subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_geant_a_sid
side_b_ae_members: AeMembersListB = subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_members side_b_ae_members: ae_members_side_b = ( # type: ignore[valid-type]
subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_members
if initial_user_input.iptrunk_speed == subscription.iptrunk.iptrunk_speed
else []
)
@validator("side_b_ae_members", allow_reuse=True)
def validate_iptrunk_unique_interface_side_b(cls, side_b_ae_members: list[LAGMember]) -> list[LAGMember]:
return validate_iptrunk_unique_interface(side_b_ae_members)
user_input_side_b = yield ModifyIptrunkSideBForm user_input_side_b = yield ModifyIptrunkSideBForm
...@@ -77,6 +131,20 @@ def modify_iptrunk_subscription( ...@@ -77,6 +131,20 @@ def modify_iptrunk_subscription(
side_b_ae_geant_a_sid: str, side_b_ae_geant_a_sid: str,
side_b_ae_members: list[dict], side_b_ae_members: list[dict],
) -> State: ) -> State:
# Prepare the list of removed AE members
previous_ae_members = {}
removed_ae_members = {}
for side_index in range(2):
previous_ae_members[side_index] = [
{"interface_name": member.interface_name, "interface_description": member.interface_description}
for member in subscription.iptrunk.iptrunk_sides[side_index].iptrunk_side_ae_members
]
for side_index in range(2):
previous_members = previous_ae_members[side_index]
current_members = side_a_ae_members if side_index == 0 else side_b_ae_members
removed_ae_members[side_index] = [
ae_member for ae_member in previous_members if ae_member not in current_members
]
subscription.iptrunk.geant_s_sid = geant_s_sid subscription.iptrunk.geant_s_sid = geant_s_sid
subscription.iptrunk.iptrunk_description = iptrunk_description subscription.iptrunk.iptrunk_description = iptrunk_description
subscription.iptrunk.iptrunk_type = iptrunk_type subscription.iptrunk.iptrunk_type = iptrunk_type
...@@ -101,12 +169,20 @@ def modify_iptrunk_subscription( ...@@ -101,12 +169,20 @@ def modify_iptrunk_subscription(
subscription.description = f"IP trunk, geant_s_sid:{geant_s_sid}" subscription.description = f"IP trunk, geant_s_sid:{geant_s_sid}"
return {"subscription": subscription} return {
"subscription": subscription,
"removed_ae_members": removed_ae_members,
"previous_ae_members": previous_ae_members,
}
@step("Provision IP trunk interface [DRY RUN]") @step("Provision IP trunk interface [DRY RUN]")
def provision_ip_trunk_iface_dry(subscription: Iptrunk, process_id: UUIDstr, tt_number: str) -> State: def provision_ip_trunk_iface_dry(
provisioning_proxy.provision_ip_trunk(subscription, process_id, tt_number, "trunk_interface") subscription: Iptrunk, process_id: UUIDstr, tt_number: str, removed_ae_members: List[str]
) -> State:
provisioning_proxy.provision_ip_trunk(
subscription, process_id, tt_number, "trunk_interface", True, removed_ae_members
)
return { return {
"subscription": subscription, "subscription": subscription,
...@@ -115,8 +191,12 @@ def provision_ip_trunk_iface_dry(subscription: Iptrunk, process_id: UUIDstr, tt_ ...@@ -115,8 +191,12 @@ def provision_ip_trunk_iface_dry(subscription: Iptrunk, process_id: UUIDstr, tt_
@step("Provision IP trunk interface [FOR REAL]") @step("Provision IP trunk interface [FOR REAL]")
def provision_ip_trunk_iface_real(subscription: Iptrunk, process_id: UUIDstr, tt_number: str) -> State: def provision_ip_trunk_iface_real(
provisioning_proxy.provision_ip_trunk(subscription, process_id, tt_number, "trunk_interface", False) subscription: Iptrunk, process_id: UUIDstr, tt_number: str, removed_ae_members: List[str]
) -> State:
provisioning_proxy.provision_ip_trunk(
subscription, process_id, tt_number, "trunk_interface", False, removed_ae_members
)
return { return {
"subscription": subscription, "subscription": subscription,
...@@ -124,6 +204,69 @@ def provision_ip_trunk_iface_real(subscription: Iptrunk, process_id: UUIDstr, tt ...@@ -124,6 +204,69 @@ def provision_ip_trunk_iface_real(subscription: Iptrunk, process_id: UUIDstr, tt
} }
@step("Update interfaces in Netbox. Reserving interfaces.")
def update_interfaces_in_netbox(subscription: Iptrunk, removed_ae_members: dict, previous_ae_members: dict) -> State:
nbclient = NetboxClient()
for side in range(0, 2):
if subscription.iptrunk.iptrunk_sides[side].iptrunk_side_node.router_vendor == RouterVendor.NOKIA:
lag_interface = subscription.iptrunk.iptrunk_sides[side].iptrunk_side_ae_iface
router_name = subscription.iptrunk.iptrunk_sides[side].iptrunk_side_node.router_fqdn
# Free removed interfaces
for member in removed_ae_members[str(side)]:
nbclient.free_interface(router_name, member["interface_name"])
# Attach physical interfaces to LAG
# Update interface description to subscription ID
# Reserve interfaces
for interface in subscription.iptrunk.iptrunk_sides[side].iptrunk_side_ae_members:
if any(
ae_member.get("interface_name") == interface.interface_name
for ae_member in previous_ae_members[str(side)]
):
continue
nbclient.attach_interface_to_lag(
device_name=subscription.iptrunk.iptrunk_sides[side].iptrunk_side_node.router_fqdn,
lag_name=lag_interface,
iface_name=interface.interface_name,
description=str(subscription.subscription_id),
)
nbclient.reserve_interface(
device_name=subscription.iptrunk.iptrunk_sides[side].iptrunk_side_node.router_fqdn,
iface_name=interface.interface_name,
)
return {
"subscription": subscription,
}
@step("Allocate interfaces in Netbox")
def allocate_interfaces_in_netbox(subscription: Iptrunk, previous_ae_members: dict) -> State:
"""Allocate the LAG interfaces in NetBox.
attach the lag interfaces to the physical interfaces detach old ones from the LAG.
"""
for side in range(0, 2):
nbclient = NetboxClient()
if subscription.iptrunk.iptrunk_sides[side].iptrunk_side_node.router_vendor == RouterVendor.NOKIA:
for interface in subscription.iptrunk.iptrunk_sides[side].iptrunk_side_ae_members:
if any(
ae_member.get("interface_name") == interface.interface_name
for ae_member in previous_ae_members[str(side)]
):
continue
nbclient.allocate_interface(
device_name=subscription.iptrunk.iptrunk_sides[side].iptrunk_side_node.router_fqdn,
iface_name=interface.interface_name,
)
# detach the old interfaces from lag
nbclient.detach_interfaces_from_lag(
device_name=subscription.iptrunk.iptrunk_sides[side].iptrunk_side_node.router_fqdn,
lag_name=subscription.iptrunk.iptrunk_sides[side].iptrunk_side_ae_iface,
)
return {"subscription": subscription}
@workflow( @workflow(
"Modify IP Trunk interface", "Modify IP Trunk interface",
initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator), initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator),
...@@ -135,8 +278,10 @@ def modify_trunk_interface() -> StepList: ...@@ -135,8 +278,10 @@ def modify_trunk_interface() -> StepList:
>> store_process_subscription(Target.MODIFY) >> store_process_subscription(Target.MODIFY)
>> unsync >> unsync
>> modify_iptrunk_subscription >> modify_iptrunk_subscription
>> update_interfaces_in_netbox
>> pp_interaction(provision_ip_trunk_iface_dry, 3) >> pp_interaction(provision_ip_trunk_iface_dry, 3)
>> pp_interaction(provision_ip_trunk_iface_real, 3) >> pp_interaction(provision_ip_trunk_iface_real, 3)
>> allocate_interfaces_in_netbox
>> resync >> resync
>> done >> done
) )
class MockedNetboxClient:
class BaseMockObject:
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
def get_device_by_name(self):
return self.BaseMockObject(id=1, name="test")
def get_available_lags(self) -> list[str]:
return [f"LAG{lag}" for lag in range(1, 5)]
def get_available_interfaces(self):
interfaces = []
for interface in range(5):
interface_data = {
"name": f"Interface{interface}",
"module": {"display": f"Module{interface}"},
"description": f"Description{interface}",
}
interfaces.append(interface_data)
return interfaces
def create_interface(self):
return self.BaseMockObject(id=1, name="test")
def attach_interface_to_lag(self):
return self.BaseMockObject(id=1, name="test")
def reserve_interface(self):
return self.BaseMockObject(id=1, name="test")
def allocate_interface(self):
return {"id": 1, "name": "test"}
def free_interface(self):
return self.BaseMockObject(id=1, name="test")
def detach_interfaces_from_lag(self):
return None
def delete_interface(self):
return None
...@@ -8,6 +8,7 @@ from gso.products.product_blocks.iptrunk import IptrunkType, PhyPortCapacity ...@@ -8,6 +8,7 @@ from gso.products.product_blocks.iptrunk import IptrunkType, PhyPortCapacity
from gso.services.crm import customer_selector, get_customer_by_name from gso.services.crm import customer_selector, get_customer_by_name
from gso.services.subscriptions import get_product_id_by_name from gso.services.subscriptions import get_product_id_by_name
from gso.utils.helpers import LAGMember from gso.utils.helpers import LAGMember
from test.services.conftest import MockedNetboxClient
from test.workflows import ( from test.workflows import (
assert_aborted, assert_aborted,
assert_complete, assert_complete,
...@@ -19,42 +20,6 @@ from test.workflows import ( ...@@ -19,42 +20,6 @@ from test.workflows import (
) )
class MockedNetboxClient:
class BaseMockObject:
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
def get_device_by_name(self):
return self.BaseMockObject(id=1, name="test")
def get_available_lags(self) -> list[str]:
return [f"LAG{lag}" for lag in range(1, 5)]
def get_available_interfaces(self):
interfaces = []
for interface in range(5):
interface_data = {
"name": f"Interface{interface}",
"module": {"display": f"Module{interface}"},
"description": f"Description{interface}",
}
interfaces.append(interface_data)
return interfaces
def create_interface(self):
return self.BaseMockObject(id=1, name="test")
def attach_interface_to_lag(self):
return self.BaseMockObject(id=1, name="test")
def reserve_interface(self):
return self.BaseMockObject(id=1, name="test")
def allocate_interface(self):
return {"id": 1, "name": "test"}
@pytest.fixture @pytest.fixture
def netbox_client_mock(): def netbox_client_mock():
# Mock NetboxClient methods # Mock NetboxClient methods
......
from unittest.mock import patch
import pytest
from gso.products import Iptrunk
from gso.utils.helpers import LAGMember
from test.workflows import (
assert_complete,
assert_suspended,
extract_state,
resume_workflow,
run_workflow,
user_accept_and_assert_suspended,
)
from test.workflows.iptrunk.test_create_iptrunk import MockedNetboxClient
@pytest.mark.workflow
@patch("gso.workflows.iptrunk.migrate_iptrunk.provisioning_proxy.migrate_ip_trunk")
@patch("gso.workflows.iptrunk.migrate_iptrunk.provisioning_proxy.provision_ip_trunk")
@patch("gso.services.netbox_client.NetboxClient.get_device_by_name")
@patch("gso.services.netbox_client.NetboxClient.get_available_interfaces")
@patch("gso.services.netbox_client.NetboxClient.get_available_lags")
@patch("gso.services.netbox_client.NetboxClient.create_interface")
@patch("gso.services.netbox_client.NetboxClient.attach_interface_to_lag")
@patch("gso.services.netbox_client.NetboxClient.reserve_interface")
@patch("gso.services.netbox_client.NetboxClient.allocate_interface")
@patch("gso.services.netbox_client.NetboxClient.free_interface")
@patch("gso.services.netbox_client.NetboxClient.delete_interface")
def test_migrate_iptrunk_success(
mocked_delete_interface,
mocked_free_interface,
mocked_allocate_interface,
mocked_reserve_interface,
mocked_attach_interface_to_lag,
mocked_create_interface,
mocked_get_available_lags,
mocked_get_available_interfaces,
mocked_get_device_by_name,
mock_provision_ip_trunk,
mock_migrate_ip_trunk,
iptrunk_subscription_factory,
router_subscription_factory,
faker,
):
# Set up mock return values
mocked_netbox = MockedNetboxClient()
mocked_get_device_by_name.return_value = mocked_netbox.get_device_by_name()
mocked_get_available_interfaces.return_value = mocked_netbox.get_available_interfaces()
mocked_attach_interface_to_lag.return_value = mocked_netbox.attach_interface_to_lag()
mocked_reserve_interface.return_value = mocked_netbox.reserve_interface()
mocked_allocate_interface.return_value = mocked_netbox.allocate_interface()
mocked_free_interface.return_value = mocked_netbox.free_interface()
mocked_create_interface.return_value = mocked_netbox.create_interface()
mocked_get_available_lags.return_value = mocked_netbox.get_available_lags()
mocked_delete_interface.return_value = mocked_netbox.delete_interface()
product_id = iptrunk_subscription_factory()
old_subscription = Iptrunk.from_subscription(product_id)
new_router = router_subscription_factory()
# Run workflow
migrate_form_input = [
{"subscription_id": product_id},
{
"tt_number": faker.tt_number(),
"replace_side": str(
old_subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.subscription.subscription_id
),
},
{
"new_node": new_router,
},
{
"new_lag_interface": "LAG1",
"new_lag_member_interfaces": [
LAGMember(interface_name=f"Interface{interface}", interface_description=faker.sentence())
for interface in range(2)
],
},
]
result, process_stat, step_log = run_workflow("migrate_iptrunk", migrate_form_input)
assert_suspended(result)
lso_return = {
"pp_run_results": {
"status": "ok",
"job_id": faker.uuid4(),
"output": "parsed_output",
"return_code": 0,
},
"confirm": "ACCEPTED",
}
# Resume steps
for _ in range(5):
result, step_log = user_accept_and_assert_suspended(process_stat, step_log, lso_return)
result, step_log = user_accept_and_assert_suspended(process_stat, step_log, [{}, {}])
for _ in range(2):
result, step_log = user_accept_and_assert_suspended(process_stat, step_log)
result, step_log = user_accept_and_assert_suspended(process_stat, step_log, lso_return)
result, step_log = user_accept_and_assert_suspended(process_stat, step_log, [{}, {}])
result, step_log = user_accept_and_assert_suspended(process_stat, step_log, lso_return)
result, step_log = user_accept_and_assert_suspended(process_stat, step_log, [{}, {}])
result, step_log = user_accept_and_assert_suspended(process_stat, step_log, lso_return)
result, step_log = resume_workflow(process_stat, step_log, [{}, {}])
assert_complete(result)
state = extract_state(result)
subscription_id = state["subscription_id"]
subscription = Iptrunk.from_subscription(subscription_id)
assert "active" == subscription.status
assert mock_provision_ip_trunk.call_count == 2
assert mock_migrate_ip_trunk.call_count == 7
# Assert all Netbox calls have been made
# This test case is only for migrating Nokia to Nokia.
# For Juniper to Nokia and Nokia to Juniper, the workflow is different.
assert mocked_create_interface.call_count == 1 # once for creating the LAG on the newly replaced side
assert mocked_reserve_interface.call_count == 2 # Twice for the new interfaces
assert mocked_attach_interface_to_lag.call_count == 2 # Twice for the new interfaces
assert mocked_allocate_interface.call_count == 2 # Twice for the new interfaces
assert mocked_free_interface.call_count == 2 # Twice for the old interfaces
assert mocked_delete_interface.call_count == 1 # once for deleting the LAG on the old replaced side
# Assert the new side is replaced
assert str(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.subscription.subscription_id) == new_router
assert subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_iface == "LAG1"
assert len(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members) == 2
assert subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members[0].interface_name == "Interface0"
assert subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members[1].interface_name == "Interface1"
...@@ -12,16 +12,40 @@ from test.workflows import ( ...@@ -12,16 +12,40 @@ from test.workflows import (
run_workflow, run_workflow,
user_accept_and_assert_suspended, user_accept_and_assert_suspended,
) )
from test.workflows.iptrunk.test_create_iptrunk import MockedNetboxClient
@pytest.mark.workflow @pytest.mark.workflow
@patch("gso.workflows.iptrunk.modify_trunk_interface.provisioning_proxy.provision_ip_trunk") @patch("gso.workflows.iptrunk.modify_trunk_interface.provisioning_proxy.provision_ip_trunk")
@patch("gso.services.netbox_client.NetboxClient.get_device_by_name")
@patch("gso.services.netbox_client.NetboxClient.get_available_interfaces")
@patch("gso.services.netbox_client.NetboxClient.attach_interface_to_lag")
@patch("gso.services.netbox_client.NetboxClient.reserve_interface")
@patch("gso.services.netbox_client.NetboxClient.allocate_interface")
@patch("gso.services.netbox_client.NetboxClient.free_interface")
@patch("gso.services.netbox_client.NetboxClient.detach_interfaces_from_lag")
def test_iptrunk_modify_trunk_interface_success( def test_iptrunk_modify_trunk_interface_success(
mocked_detach_interfaces_from_lag,
mocked_free_interface,
mocked_allocate_interface,
mocked_reserve_interface,
mocked_attach_interface_to_lag,
mocked_get_available_interfaces,
mocked_get_device_by_name,
mock_provision_ip_trunk, mock_provision_ip_trunk,
iptrunk_subscription_factory, iptrunk_subscription_factory,
faker, faker,
): ):
# Set up mock return values # Set up mock return values
mocked_netbox = MockedNetboxClient()
mocked_get_device_by_name.return_value = mocked_netbox.get_device_by_name()
mocked_get_available_interfaces.return_value = mocked_netbox.get_available_interfaces()
mocked_attach_interface_to_lag.return_value = mocked_netbox.attach_interface_to_lag()
mocked_reserve_interface.return_value = mocked_netbox.reserve_interface()
mocked_allocate_interface.return_value = mocked_netbox.allocate_interface()
mocked_free_interface.return_value = mocked_netbox.free_interface()
mocked_detach_interfaces_from_lag.return_value = mocked_netbox.detach_interfaces_from_lag()
product_id = iptrunk_subscription_factory() product_id = iptrunk_subscription_factory()
new_sid = faker.geant_sid() new_sid = faker.geant_sid()
new_description = faker.sentence() new_description = faker.sentence()
...@@ -31,12 +55,12 @@ def test_iptrunk_modify_trunk_interface_success( ...@@ -31,12 +55,12 @@ def test_iptrunk_modify_trunk_interface_success(
new_side_a_sid = faker.geant_sid() new_side_a_sid = faker.geant_sid()
new_side_a_ae_members = [ new_side_a_ae_members = [
{"interface_name": faker.network_interface(), "interface_description": faker.sentence()} for _ in range(5) {"interface_name": f"Interface{i}", "interface_description": faker.sentence()} for i in range(5)
] ]
new_side_b_sid = faker.geant_sid() new_side_b_sid = faker.geant_sid()
new_side_b_ae_members = [ new_side_b_ae_members = [
{"interface_name": faker.network_interface(), "interface_description": faker.sentence()} for _ in range(5) {"interface_name": f"Interface{i}", "interface_description": faker.sentence()} for i in range(5)
] ]
# Run workflow # Run workflow
...@@ -86,6 +110,11 @@ def test_iptrunk_modify_trunk_interface_success( ...@@ -86,6 +110,11 @@ def test_iptrunk_modify_trunk_interface_success(
assert "active" == subscription.status assert "active" == subscription.status
assert mock_provision_ip_trunk.call_count == 2 assert mock_provision_ip_trunk.call_count == 2
# Assert all Netbox calls have been made
assert mocked_reserve_interface.call_count == 10 # 5 interfaces per side
assert mocked_attach_interface_to_lag.call_count == 10 # 5 interfaces per side
assert mocked_free_interface.call_count == 4 # 2 interfaces per side(The old ones)
assert mocked_detach_interfaces_from_lag.call_count == 2 # 1 time per side
# Assert all subscription properties have been updated correctly # Assert all subscription properties have been updated correctly
assert subscription.description == f"IP trunk, geant_s_sid:{new_sid}" assert subscription.description == f"IP trunk, geant_s_sid:{new_sid}"
......
...@@ -32,4 +32,7 @@ commands = ...@@ -32,4 +32,7 @@ commands =
coverage run --source gso --omit="gso/migrations/*" -m pytest {posargs} coverage run --source gso --omit="gso/migrations/*" -m pytest {posargs}
coverage xml coverage xml
coverage html coverage html
coverage report --fail-under 80 sh -c "if [ $SKIP_ALL_TESTS -eq 1 ]; then echo 'Skipping coverage report'; else coverage report --fail-under 80; fi"
allowlist_externals =
sh
\ No newline at end of file