Skip to content
Snippets Groups Projects
helpers.py 11.44 KiB
"""Helper methods that are used across GSO."""

import random
import re
from typing import TYPE_CHECKING
from uuid import UUID

from orchestrator.types import SubscriptionLifecycle
from pydantic_forms.types import UUIDstr
from pydantic_forms.validators import Choice

from gso import settings
from gso.products.product_blocks.router import RouterRole
from gso.products.product_types.router import Router
from gso.services import subscriptions
from gso.services.netbox_client import NetboxClient
from gso.services.partners import get_all_partners
from gso.services.subscriptions import is_virtual_circuit_id_available
from gso.utils.shared_enums import Vendor
from gso.utils.types.interfaces import PhysicalPortCapacity
from gso.utils.types.ip_address import IPv4AddressType
from gso.utils.types.virtual_identifiers import VC_ID

if TYPE_CHECKING:
    from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock


def available_interfaces_choices(router_id: UUID, speed: str) -> Choice | None:
    """Return a list of available interfaces for a given router and speed.

    For Nokia routers, return a list of available interfaces.
    For Juniper routers, return a string.
    """
    if get_router_vendor(router_id) != Vendor.NOKIA:
        return None
    interfaces = {
        interface["name"]: f"{interface["name"]}  {interface["description"]}"
        for interface in NetboxClient().get_available_interfaces(router_id, speed)
    }
    return Choice("ae member", zip(interfaces.keys(), interfaces.items(), strict=True))  # type: ignore[arg-type]


def available_interfaces_choices_including_current_members(
    router_id: UUID,
    speed: str,
    interfaces: list["IptrunkInterfaceBlock"],
) -> Choice | None:
    """Return a list of available interfaces for a given router and speed including the current members.

    For Nokia routers, return a list of available interfaces.
    For Juniper routers, return a string.
    """
    if get_router_vendor(router_id) != Vendor.NOKIA:
        return None

    available_interfaces = list(NetboxClient().get_available_interfaces(router_id, speed))
    available_interfaces.extend(
        [
            NetboxClient().get_interface_by_name_and_device(
                interface.interface_name,
                Router.from_subscription(router_id).router.router_fqdn,
            )
            for interface in interfaces
        ],
    )
    options = {
        interface["name"]: f"{interface["name"]}  {interface["description"]}" for interface in available_interfaces
    }
    return Choice("ae member", zip(options.keys(), options.items(), strict=True))  # type: ignore[arg-type]


def available_lags_choices(router_id: UUID) -> Choice | None:
    """Return a list of available lags for a given router.

    For Nokia routers, return a list of available lags.
    For Juniper routers, return ``None``.
    """
    if get_router_vendor(router_id) != Vendor.NOKIA:
        return None
    side_a_ae_iface_list = NetboxClient().get_available_lags(router_id)
    return Choice("ae iface", zip(side_a_ae_iface_list, side_a_ae_iface_list, strict=True))  # type: ignore[arg-type]


def available_service_lags_choices(router_id: UUID) -> Choice | None:
    """Return a list of available lags for a given router for services.

    For Nokia routers, return a list of available lags.
    For Juniper routers, return ``None``.
    """
    if get_router_vendor(router_id) != Vendor.NOKIA:
        return None
    side_a_ae_iface_list = NetboxClient().get_available_services_lags(router_id)
    return Choice("ae iface", zip(side_a_ae_iface_list, side_a_ae_iface_list, strict=True))  # type: ignore[arg-type]


def get_router_vendor(router_id: UUID) -> Vendor:
    """Retrieve the vendor of a router.

    Args:
        router_id: The UUID of the router.

    Returns:
        The vendor of the router.
    """
    return Router.from_subscription(router_id).router.vendor


def iso_from_ipv4(ipv4_address: IPv4AddressType) -> str:
    """Calculate an ISO address, based on an IPv4 address.

    Args:
        ipv4_address: The address that's to be converted

    Returns:
        An ISO-formatted address.
    """
    padded_octets = [f"{x:>03}" for x in str(ipv4_address).split(".")]
    joined_octets = "".join(padded_octets)
    re_split = ".".join(re.findall("....", joined_octets))
    return f"49.51e5.0001.{re_split}.00"


def generate_fqdn(hostname: str, site_name: str, country_code: str) -> str:
    """Generate an FQDN from a hostname, site name, and a country code."""
    oss = settings.load_oss_params()
    return f"{hostname}.{site_name.lower()}.{country_code.lower()}{oss.IPAM.LO.domain_name}"


def generate_inventory_for_routers(
    router_role: RouterRole,
    exclude_routers: list[str] | None = None,
    router_vendor: Vendor | None = None,
    *,
    include_provisioning_routers: bool = True,
) -> dict:
    """Generate an Ansible-compatible inventory for executing playbooks.

    Contains all active routers of a specific role. Optionally, routers can be excluded from the inventory.

    Args:
        router_role: The role of the routers to include in the inventory.
        exclude_routers: List of routers to exclude from the inventory.
        router_vendor: The vendor of the routers to include in the inventory.
        include_provisioning_routers: Include routers that are in a ``PROVISIONING`` state.

    Returns:
        A dictionary representing the inventory of active routers.
    """
    lifecycles = (
        [SubscriptionLifecycle.PROVISIONING, SubscriptionLifecycle.ACTIVE]
        if include_provisioning_routers
        else [SubscriptionLifecycle.ACTIVE]
    )
    all_routers = [
        Router.from_subscription(r["subscription_id"])
        for r in subscriptions.get_router_subscriptions(lifecycles=lifecycles)
    ]
    exclude_routers = exclude_routers or []

    return {
        "all": {
            "hosts": {
                router.router.router_fqdn: {
                    "lo4": str(router.router.router_lo_ipv4_address),
                    "lo6": str(router.router.router_lo_ipv6_address),
                    "vendor": str(router.router.vendor),
                }
                for router in all_routers
                if router.router.router_role == router_role
                and router.router.router_fqdn not in exclude_routers
                and (router_vendor is None or router.router.vendor == router_vendor)
            }
        }
    }


def calculate_recommended_minimum_links(iptrunk_number_of_members: int, iptrunk_speed: PhysicalPortCapacity) -> int:
    """Calculate the recommended minimum number of links for an IP trunk based on the number of members and speed.

    If the IP trunk speed is 400G, the recommended minimum number of links is the number of members minus 1.
    Otherwise, the recommended minimum number of links is the number of members.

    Args:
        iptrunk_number_of_members: The number of members in the IP trunk.
        iptrunk_speed: The speed of the IP trunk.

    Returns:
        The recommended minimum number of links for the IP trunk.
    """
    if iptrunk_speed == PhysicalPortCapacity.FOUR_HUNDRED_GIGABIT_PER_SECOND:
        return iptrunk_number_of_members - 1
    return iptrunk_number_of_members


def active_site_selector() -> Choice:
    """Generate a dropdown selector for choosing an active site in an input form."""
    site_subscriptions = {
        str(site["subscription_id"]): site["description"]
        for site in subscriptions.get_active_site_subscriptions(includes=["subscription_id", "description"])
    }

    return Choice("Select a site", zip(site_subscriptions.keys(), site_subscriptions.items(), strict=True))  # type: ignore[arg-type]


def active_router_selector() -> Choice:
    """Generate a dropdown selector for choosing an active Router in an input form."""
    router_subscriptions = {
        str(router["subscription_id"]): router["description"]
        for router in subscriptions.get_active_router_subscriptions(includes=["subscription_id", "description"])
    }

    return Choice("Select a router", zip(router_subscriptions.keys(), router_subscriptions.items(), strict=True))  # type: ignore[arg-type]


def active_pe_router_selector() -> Choice:
    """Generate a dropdown selector for choosing an active PE Router in an input form."""
    routers = {
        str(router.subscription_id): router.description
        for router in subscriptions.get_active_subscriptions_by_field_and_value("router_role", RouterRole.PE)
    }

    return Choice("Select a router", zip(routers.keys(), routers.items(), strict=True))  # type: ignore[arg-type]


def active_switch_selector() -> Choice:
    """Generate a dropdown selector for choosing an active Switch in an input form."""
    switch_subscriptions = {
        str(switch["subscription_id"]): switch["description"]
        for switch in subscriptions.get_active_switch_subscriptions(includes=["subscription_id", "description"])
    }

    return Choice("Select a switch", zip(switch_subscriptions.keys(), switch_subscriptions.items(), strict=True))  # type: ignore[arg-type]


def active_edge_port_selector(*, partner_id: UUIDstr | None = None) -> Choice:
    """Generate a dropdown selector for choosing an active Edge Port in an input form."""
    edge_port_subscriptions = subscriptions.get_active_edge_port_subscriptions(
        includes=["subscription_id", "description", "customer_id"]
    )

    if partner_id:
        # ``partner_id`` is set, so we will filter accordingly.
        edge_port_subscriptions = list(
            filter(lambda subscription: bool(subscription["customer_id"] == partner_id), edge_port_subscriptions)
        )

    edge_ports = {str(port["subscription_id"]): port["description"] for port in edge_port_subscriptions}

    return Choice(
        "Select an Edge Port",
        zip(edge_ports.keys(), edge_ports.items(), strict=True),  # type: ignore[arg-type]
    )


def partner_choice() -> Choice:
    """Return a Choice object containing a list of available partners."""
    partners = {partner["partner_id"]: partner["name"] for partner in get_all_partners()}

    return Choice("Select a partner", zip(partners.values(), partners.items(), strict=True))  # type: ignore[arg-type]


def validate_edge_port_number_of_members_based_on_lacp(*, number_of_members: int, enable_lacp: bool) -> None:
    """Validate the number of edge port members based on the LACP setting.

    Args:
        number_of_members: The number of members to validate.
        enable_lacp: Whether LACP is enabled or not.

    Raises:
        ValueError: If the number of members is greater than 1 and LACP is disabled.
    """
    if number_of_members > 1 and not enable_lacp:
        err_msg = "Number of members must be 1 if LACP is disabled."
        raise ValueError(err_msg)


def generate_unique_vc_id(max_attempts: int = 100) -> VC_ID | None:
    """Generate a unique 8-digit VC_ID starting with '11'.

    This function attempts to generate an 8-digit VC_ID beginning with '11',
    checking its uniqueness before returning it. A maximum attempt limit is
    set to prevent infinite loops in case the ID space is saturated.

    Args:
        max_attempts: The maximum number of attempts to generate a unique ID.

    Returns:
        A unique VC_ID instance if successful, None if no unique ID is found.
    """

    def create_vc_id() -> str:
        """Generate an 8-digit VC_ID starting with '11'."""
        return f"11{random.randint(100000, 999999)}"  # noqa: S311

    for _ in range(max_attempts):
        vc_id = create_vc_id()
        if is_virtual_circuit_id_available(vc_id):
            return VC_ID(vc_id)

    return None