helpers.py 10.34 KiB
"""Helper methods that are used across :term:`GSO`."""
import re
from typing import TYPE_CHECKING
from uuid import UUID
from pydantic_forms.validators import Choice
from gso import settings
from gso.products.product_blocks.router import RouterRole
from gso.products.product_types.router import Router
from gso.services import subscriptions
from gso.services.netbox_client import NetboxClient
from gso.services.partners import get_all_partners, get_partner_by_name
from gso.utils.shared_enums import Vendor
from gso.utils.types.interfaces import PhysicalPortCapacity
from gso.utils.types.ip_address import IPv4AddressType
if TYPE_CHECKING:
from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock
def available_interfaces_choices(router_id: UUID, speed: str) -> Choice | None:
"""Return a list of available interfaces for a given router and speed.
For Nokia routers, return a list of available interfaces.
For Juniper routers, return a string.
"""
if get_router_vendor(router_id) != Vendor.NOKIA:
return None
interfaces = {
interface["name"]: f"{interface["name"]} {interface["description"]}"
for interface in NetboxClient().get_available_interfaces(router_id, speed)
}
return Choice("ae member", zip(interfaces.keys(), interfaces.items(), strict=True)) # type: ignore[arg-type]
def available_interfaces_choices_including_current_members(
router_id: UUID,
speed: str,
interfaces: list["IptrunkInterfaceBlock"],
) -> Choice | None:
"""Return a list of available interfaces for a given router and speed including the current members.
For Nokia routers, return a list of available interfaces.
For Juniper routers, return a string.
"""
if get_router_vendor(router_id) != Vendor.NOKIA:
return None
available_interfaces = list(NetboxClient().get_available_interfaces(router_id, speed))
available_interfaces.extend(
[
NetboxClient().get_interface_by_name_and_device(
interface.interface_name,
Router.from_subscription(router_id).router.router_fqdn,
)
for interface in interfaces
],
)
options = {
interface["name"]: f"{interface["name"]} {interface["description"]}" for interface in available_interfaces
}
return Choice("ae member", zip(options.keys(), options.items(), strict=True)) # type: ignore[arg-type]
def available_lags_choices(router_id: UUID) -> Choice | None:
"""Return a list of available lags for a given router.
For Nokia routers, return a list of available lags.
For Juniper routers, return ``None``.
"""
if get_router_vendor(router_id) != Vendor.NOKIA:
return None
side_a_ae_iface_list = NetboxClient().get_available_lags(router_id)
return Choice("ae iface", zip(side_a_ae_iface_list, side_a_ae_iface_list, strict=True)) # type: ignore[arg-type]
def available_service_lags_choices(router_id: UUID) -> Choice | None:
"""Return a list of available lags for a given router for services.
For Nokia routers, return a list of available lags.
For Juniper routers, return ``None``.
"""
if get_router_vendor(router_id) != Vendor.NOKIA:
return None
side_a_ae_iface_list = NetboxClient().get_available_services_lags(router_id)
return Choice("ae iface", zip(side_a_ae_iface_list, side_a_ae_iface_list, strict=True)) # type: ignore[arg-type]
def get_router_vendor(router_id: UUID) -> Vendor:
"""Retrieve the vendor of a router.
:param router_id: The :term:`UUID` of the router.
:type router_id: :class:`uuid.UUID`
:return: The vendor of the router.
:rtype: Vendor:
"""
return Router.from_subscription(router_id).router.vendor
def iso_from_ipv4(ipv4_address: IPv4AddressType) -> str:
"""Calculate an :term:`ISO` address, based on an IPv4 address.
:param IPv4Address ipv4_address: The address that's to be converted
:returns: An :term:`ISO`-formatted address.
"""
padded_octets = [f"{x:>03}" for x in str(ipv4_address).split(".")]
joined_octets = "".join(padded_octets)
re_split = ".".join(re.findall("....", joined_octets))
return f"49.51e5.0001.{re_split}.00"
def generate_fqdn(hostname: str, site_name: str, country_code: str) -> str:
"""Generate an :term:`FQDN` from a hostname, site name, and a country code."""
oss = settings.load_oss_params()
return f"{hostname}.{site_name.lower()}.{country_code.lower()}{oss.IPAM.LO.domain_name}"
def generate_inventory_for_active_routers(
router_role: RouterRole,
exclude_routers: list[str] | None = None,
router_vendor: Vendor | None = None,
) -> dict:
"""Generate an Ansible-compatible inventory for executing playbooks.
Contains all active routers of a specific role. Optionally, routers can be excluded from the inventory.
:param RouterRole router_role: The role of the routers to include in the inventory.
:param list exclude_routers: List of routers to exclude from the inventory.
:param Vendor router_vendor: The vendor of the routers to include in the inventory.
:return: A dictionary representing the inventory of active routers.
:rtype: dict[str, Any]
"""
all_routers = [
Router.from_subscription(r["subscription_id"]) for r in subscriptions.get_active_router_subscriptions()
]
exclude_routers = exclude_routers or []
return {
"all": {
"hosts": {
router.router.router_fqdn: {
"lo4": str(router.router.router_lo_ipv4_address),
"lo6": str(router.router.router_lo_ipv6_address),
"vendor": str(router.router.vendor),
}
for router in all_routers
if router.router.router_role == router_role
and router.router.router_fqdn not in exclude_routers
and (router_vendor is None or router.router.vendor == router_vendor)
}
}
}
def calculate_recommended_minimum_links(iptrunk_number_of_members: int, iptrunk_speed: PhysicalPortCapacity) -> int:
"""Calculate the recommended minimum number of links for an IP trunk based on the number of members and speed.
If the IP trunk speed is 400G, the recommended minimum number of links is the number of members minus 1.
Otherwise, the recommended minimum number of links is the number of members.
:param int iptrunk_number_of_members: The number of members in the IP trunk.
:param PhysicalPortCapacity iptrunk_speed: The speed of the IP trunk.
:return: The recommended minimum number of links for the IP trunk.
"""
if iptrunk_speed == PhysicalPortCapacity.FOUR_HUNDRED_GIGABIT_PER_SECOND:
return iptrunk_number_of_members - 1
return iptrunk_number_of_members
def active_site_selector() -> Choice:
"""Generate a dropdown selector for choosing an active site in an input form."""
site_subscriptions = {
str(site["subscription_id"]): site["description"]
for site in subscriptions.get_active_site_subscriptions(includes=["subscription_id", "description"])
}
return Choice("Select a site", zip(site_subscriptions.keys(), site_subscriptions.items(), strict=True)) # type: ignore[arg-type]
def active_router_selector() -> Choice:
"""Generate a dropdown selector for choosing an active Router in an input form."""
router_subscriptions = {
str(router["subscription_id"]): router["description"]
for router in subscriptions.get_active_router_subscriptions(includes=["subscription_id", "description"])
}
return Choice("Select a router", zip(router_subscriptions.keys(), router_subscriptions.items(), strict=True)) # type: ignore[arg-type]
def active_pe_router_selector() -> Choice:
"""Generate a dropdown selector for choosing an active PE Router in an input form."""
routers = {
str(router.subscription_id): router.description
for router in subscriptions.get_active_subscriptions_by_field_and_value("router_role", RouterRole.PE)
}
return Choice("Select a router", zip(routers.keys(), routers.items(), strict=True)) # type: ignore[arg-type]
def active_switch_selector() -> Choice:
"""Generate a dropdown selector for choosing an active Switch in an input form."""
switch_subscriptions = {
str(switch["subscription_id"]): switch["description"]
for switch in subscriptions.get_active_switch_subscriptions(includes=["subscription_id", "description"])
}
return Choice("Select a switch", zip(switch_subscriptions.keys(), switch_subscriptions.items(), strict=True)) # type: ignore[arg-type]
def active_edge_port_selector(*, geant_only: bool | None = None) -> Choice:
"""Generate a dropdown selector for choosing an active Edge Port in an input form."""
edge_port_subscriptions = subscriptions.get_active_edge_port_subscriptions(
includes=["subscription_id", "description", "customer_id"]
)
if geant_only is not None:
# ``geant_only`` is set, so we will filter accordingly.
geant_partner_id = get_partner_by_name("GEANT")["partner_id"]
edge_port_subscriptions = list(
filter(
lambda subscription: geant_only ^ bool(subscription["customer_id"] != geant_partner_id),
edge_port_subscriptions,
)
)
edge_ports = {str(port["subscription_id"]): port["description"] for port in edge_port_subscriptions}
return Choice(
"Select an Edge Port",
zip(edge_ports.keys(), edge_ports.items(), strict=True), # type: ignore[arg-type]
)
def partner_choice() -> Choice:
"""Return a Choice object containing a list of available partners."""
partners = {partner["partner_id"]: partner["name"] for partner in get_all_partners()}
return Choice("Select a partner", zip(partners.values(), partners.items(), strict=True)) # type: ignore[arg-type]
def validate_edge_port_number_of_members_based_on_lacp(*, number_of_members: int, enable_lacp: bool) -> None:
"""Validate the number of edge port members based on the :term:`LACP` setting.
:param number_of_members: The number of members to validate.
:param enable_lacp: Whether :term:`LACP` is enabled or not.
:raises ValueError: If the number of members is greater than 1 and :term:`LACP` is disabled.
"""
if number_of_members > 1 and not enable_lacp:
err_msg = "Number of members must be 1 if LACP is disabled."
raise ValueError(err_msg)