Select Git revision
      
   
   
              Mohammad Torkashvand authored 
   helpers.py  15.85 KiB 
"""Helper methods that are used across GSO."""
import logging
import random
import re
import time
from ipaddress import IPv4Network, IPv6Network
from typing import TYPE_CHECKING, TypeAlias, cast
from uuid import UUID
from orchestrator.db import ProcessTable
from orchestrator.types import SubscriptionLifecycle
from pydantic_forms.types import UUIDstr
from pydantic_forms.validators import Choice
from gso.products.product_blocks.layer_2_circuit import Layer2CircuitType
from gso.products.product_blocks.router import RouterRole
from gso.products.product_types.router import Router
from gso.services.netbox_client import NetboxClient
from gso.services.partners import get_all_partners
from gso.services.processes import get_stopped_process_by_id
from gso.services.subscriptions import (
    get_active_edge_port_subscriptions,
    get_active_router_subscriptions,
    get_active_site_subscriptions,
    get_active_subscriptions_by_field_and_value,
    get_active_switch_subscriptions,
    get_router_subscriptions,
    is_virtual_circuit_id_available,
)
from gso.settings import load_oss_params
from gso.utils.shared_enums import Vendor
from gso.utils.types.interfaces import PhysicalPortCapacity
from gso.utils.types.ip_address import IPv4AddressType, IPv4NetworkType, IPv6NetworkType
from gso.utils.types.virtual_identifiers import VC_ID
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
    from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock
def available_interfaces_choices(router_id: UUID, speed: str) -> TypeAlias:
    """Return a list of available interfaces for a given router and speed.
    For Nokia routers, return a list of available interfaces.
    For Juniper routers, return a string.
    """
    if get_router_vendor(router_id) != Vendor.NOKIA:
        return None
    interfaces = {
        interface["name"]: f"{interface["name"]}  {interface["description"]}"
        for interface in NetboxClient().get_available_interfaces(router_id, speed)
    }
    return cast(
        type[Choice],
        Choice.__call__("ae member", zip(interfaces.keys(), interfaces.items(), strict=True)),
    )
def available_interfaces_choices_including_current_members(
    router_id: UUID,
    speed: str,
    interfaces: list["IptrunkInterfaceBlock"],
) -> TypeAlias:
    """Return a list of available interfaces for a given router and speed including the current members.
    For Nokia routers, return a list of available interfaces.
    For Juniper routers, return a string.
    """
    if get_router_vendor(router_id) != Vendor.NOKIA:
        return None
    available_interfaces = list(NetboxClient().get_available_interfaces(router_id, speed))
    available_interfaces.extend(
        [
            NetboxClient().get_interface_by_name_and_device(
                interface.interface_name,
                Router.from_subscription(router_id).router.router_fqdn,
            )
            for interface in interfaces
        ],
    )
    options = {
        interface["name"]: f"{interface["name"]}  {interface["description"]}" for interface in available_interfaces
    }
    return cast(
        type[Choice],
        Choice.__call__("ae member", zip(options.keys(), options.items(), strict=True)),
    )
def available_lags_choices(router_id: UUID) -> TypeAlias:
    """Return a list of available lags for a given router.
    For Nokia routers, return a list of available lags.
    For Juniper routers, return `None`.
    """
    if get_router_vendor(router_id) != Vendor.NOKIA:
        return None
    side_a_ae_iface_list = NetboxClient().get_available_lags(router_id)
    return cast(
        type[Choice],
        Choice.__call__("ae iface", zip(side_a_ae_iface_list, side_a_ae_iface_list, strict=True)),
    )
def available_service_lags_choices(router_id: UUID) -> TypeAlias:
    """Return a list of available lags for a given router for services.
    For Nokia routers, return a list of available lags.
    For Juniper routers, return `None`.
    """
    if get_router_vendor(router_id) != Vendor.NOKIA:
        return None
    side_a_ae_iface_list = NetboxClient().get_available_services_lags(router_id)
    return cast(
        type[Choice],
        Choice.__call__("ae iface", zip(side_a_ae_iface_list, side_a_ae_iface_list, strict=True)),
    )
def get_router_vendor(router_id: UUID) -> Vendor:
    """Retrieve the vendor of a router.
    Args:
        router_id: The UUID of the router.
    Returns:
        The vendor of the router.
    """
    return Router.from_subscription(router_id).router.vendor
def iso_from_ipv4(ipv4_address: IPv4AddressType) -> str:
    """Calculate an ISO address, based on an IPv4 address.
    Args:
        ipv4_address: The address that's to be converted
    Returns:
        An ISO-formatted address.
    """
    padded_octets = [f"{x:>03}" for x in str(ipv4_address).split(".")]
    joined_octets = "".join(padded_octets)
    re_split = ".".join(re.findall("....", joined_octets))
    return f"49.51e5.0001.{re_split}.00"
def generate_fqdn(hostname: str, site_name: str, country_code: str) -> str:
    """Generate an FQDN from a hostname, site name, and a country code."""
    oss = load_oss_params()
    return f"{hostname}.{site_name.lower()}.{country_code.lower()}{oss.IPAM.LO.domain_name}"
def generate_lan_switch_interconnect_subnet_v4(site_internal_id: int) -> IPv4NetworkType:
    """Generate an IPv4 network in which a LAN Switch Interconnect resides, given a Site internal ID."""
    ipam_oss = load_oss_params().IPAM.LAN_SWITCH_INTERCONNECT
    result = str(ipam_oss.V4.containers[0]).split(".")[:2]  # Take the first two octets from the IPv4 network.
    result.append(str(site_internal_id))  # Append the side ID as the third octet.
    result.append(f"0/{ipam_oss.V4.mask}")  # Append the fourth octet, together with the netmask.
    return IPv4Network(".".join(result))
def generate_lan_switch_interconnect_subnet_v6(site_internal_id: int) -> IPv6NetworkType:
    """Generate an IPv6 network in which a LAN Switch Interconnect resides, given a Site internal ID."""
    ipam_oss = load_oss_params().IPAM.LAN_SWITCH_INTERCONNECT
    result = IPv6Network(ipam_oss.V6.containers[0]).exploded[:17]  # Take the first 56 bits of the network
    result += str(hex(site_internal_id)[2:])  # Append the site internal id for bytes 57 to 64 as hexadecimal number
    result += f"::/{ipam_oss.V6.mask}"  # And fill the rest of the network with empty bits
    return IPv6Network(result)
def generate_inventory_for_routers(
    router_role: RouterRole,
    exclude_routers: list[str] | None = None,
    router_vendor: Vendor | None = None,
    *,
    include_provisioning_routers: bool = True,
) -> dict:
    """Generate an Ansible-compatible inventory for executing playbooks.
    Contains all active routers of a specific role. Optionally, routers can be excluded from the inventory.
    Args:
        router_role: The role of the routers to include in the inventory.
        exclude_routers: List of routers to exclude from the inventory.
        router_vendor: The vendor of the routers to include in the inventory.
        include_provisioning_routers: Include routers that are in a `PROVISIONING` state.
    Returns:
        A dictionary representing the inventory of active routers.
    """
    lifecycles = (
        [SubscriptionLifecycle.PROVISIONING, SubscriptionLifecycle.ACTIVE]
        if include_provisioning_routers
        else [SubscriptionLifecycle.ACTIVE]
    )
    all_routers = [
        Router.from_subscription(r["subscription_id"]) for r in get_router_subscriptions(lifecycles=lifecycles)
    ]
    exclude_routers = exclude_routers or []
    return {
        "all": {
            "hosts": {
                router.router.router_fqdn: {
                    "lo4": str(router.router.router_lo_ipv4_address),
                    "lo6": str(router.router.router_lo_ipv6_address),
                    "vendor": str(router.router.vendor),
                }
                for router in all_routers
                if router.router.router_role == router_role
                and router.router.router_fqdn not in exclude_routers
                and (router_vendor is None or router.router.vendor == router_vendor)
            }
        }
    }
def calculate_recommended_minimum_links(iptrunk_number_of_members: int, iptrunk_speed: PhysicalPortCapacity) -> int:
    """Calculate the recommended minimum number of links for an IP trunk based on the number of members and speed.
    If the IP trunk speed is 400G, the recommended minimum number of links is the number of members minus 1.
    Otherwise, the recommended minimum number of links is the number of members.
    Args:
        iptrunk_number_of_members: The number of members in the IP trunk.
        iptrunk_speed: The speed of the IP trunk.
    Returns:
        The recommended minimum number of links for the IP trunk.
    """
    if iptrunk_speed == PhysicalPortCapacity.FOUR_HUNDRED_GIGABIT_PER_SECOND:
        return iptrunk_number_of_members - 1
    return iptrunk_number_of_members
def active_site_selector() -> TypeAlias:
    """Generate a dropdown selector for choosing an active site in an input form."""
    site_subscriptions = {
        str(site["subscription_id"]): site["description"]
        for site in get_active_site_subscriptions(includes=["subscription_id", "description"])
    }
    return cast(
        type[Choice],
        Choice.__call__("Select a site", zip(site_subscriptions.keys(), site_subscriptions.items(), strict=True)),
    )
def active_router_selector(*, excludes: list[UUIDstr] | None = None) -> TypeAlias:
    """Generate a dropdown selector for choosing an active Router in an input form.
    The resulting list of routers can be filtered using a list of excluded subscription IDs.
    """
    excludes = excludes or []
    router_subscriptions = {
        str(router["subscription_id"]): router["description"]
        for router in get_active_router_subscriptions(includes=["subscription_id", "description"])
        if router["subscription_id"] not in excludes
    }
    return cast(
        type[Choice],
        Choice.__call__("Select a router", zip(router_subscriptions.keys(), router_subscriptions.items(), strict=True)),
    )
def active_nokia_router_selector(*, excludes: list[UUIDstr] | None = None) -> TypeAlias:
    """Generate a dropdown choice list of all active Nokia routers.
    Args:
        excludes: An optional list of subscription IDs that should be excluded from the resulting dropdown.
    """
    excludes = excludes or []
    router_subscriptions = {
        str(router.subscription_id): router.description
        for router in [
            Router.from_subscription(subscription["subscription_id"])
            for subscription in get_active_router_subscriptions(["subscription_id"])
        ]
        if router.subscription_id not in excludes and router.router.vendor == Vendor.NOKIA
    }
    return cast(
        type[Choice],
        Choice.__call__("Select a router", zip(router_subscriptions.keys(), router_subscriptions.items(), strict=True)),
    )
def active_pe_router_selector(excludes: list[UUIDstr] | None = None) -> TypeAlias:
    """Generate a dropdown selector for choosing an active PE Router in an input form."""
    excludes = excludes or []
    routers = {
        str(router.subscription_id): router.description
        for router in get_active_subscriptions_by_field_and_value("router_role", RouterRole.PE)
        if router.subscription_id not in excludes
    }
    return cast(
        type[Choice],
        Choice.__call__("Select a router", zip(routers.keys(), routers.items(), strict=True)),
    )
def active_switch_selector() -> TypeAlias:
    """Generate a dropdown selector for choosing an active Switch in an input form."""
    switch_subscriptions = {
        str(switch["subscription_id"]): switch["description"]
        for switch in get_active_switch_subscriptions(includes=["subscription_id", "description"])
    }
    return cast(
        type[Choice],
        Choice.__call__("Select a switch", zip(switch_subscriptions.keys(), switch_subscriptions.items(), strict=True)),
    )
def active_edge_port_selector(*, partner_id: UUIDstr | None = None) -> TypeAlias:
    """Generate a dropdown selector for choosing an active Edge Port in an input form."""
    edge_ports = get_active_edge_port_subscriptions(partner_id=partner_id)
    options = {str(edge_port.subscription_id): edge_port.description for edge_port in edge_ports}
    return cast(
        type[Choice],
        Choice.__call__(
            "Select an Edge Port",
            zip(options.keys(), options.items(), strict=True),
        ),
    )
def ip_trunk_service_version_selector() -> Choice:
    """Generate a dropdown selector for choosing a service version."""
    iptrunk_versions = load_oss_params().SERVICE_VERSIONS.IP_TRUNK.version
    return Choice(
        "Select an IP trunk service version.",
        zip(iptrunk_versions.keys(), iptrunk_versions.items(), strict=True),  # type: ignore[arg-type]
    )
def partner_choice() -> TypeAlias:
    """Return a Choice object containing a list of available partners."""
    partners = {partner.partner_id: partner.name for partner in get_all_partners()}
    return cast(
        type[Choice],
        Choice.__call__("Select a partner", zip(partners.values(), partners.items(), strict=True)),
    )
def validate_edge_port_number_of_members_based_on_lacp(*, number_of_members: int, enable_lacp: bool) -> None:
    """Validate the number of edge port members based on the LACP setting.
    Args:
        number_of_members: The number of members to validate.
        enable_lacp: Whether LACP is enabled or not.
    Raises:
        ValueError: If the number of members is greater than 1 and LACP is disabled.
    """
    if number_of_members > 1 and not enable_lacp:
        err_msg = "Number of members must be 1 if LACP is disabled."
        raise ValueError(err_msg)
def generate_unique_vc_id(l2c_type: str, max_attempts: int = 100) -> VC_ID | None:
    """Generate a unique 8-digit VC_ID.
    This function attempts to generate a ``VC_ID`` based on their circuit type,
    and ensures uniqueness before returning it. A maximum attempt limit is
    set to prevent an infinite loop in case the ID space is saturated.
    The range used for generating a ``VC_ID`` depends on the circuit type:
    ``Ethernet`` and ``VLAN`` type circuits get their IDs from different ranges.
    Args:
        l2c_type: type of Layer 2 Circuit.
        max_attempts: The maximum number of attempts to generate a unique ID.
    Returns:
        A unique VC_ID instance if successful, None if no unique ID is found.
    """
    def create_vc_id() -> str:
        """Generate an 8-digit VC_ID starting with '11'."""
        if l2c_type == Layer2CircuitType.ETHERNET:
            return str(random.randint(30001, 39999))
        return str(random.randint(6000, 6999))
    for _ in range(max_attempts):
        vc_id = create_vc_id()
        if is_virtual_circuit_id_available(vc_id):
            return VC_ID(vc_id)
    return None
def wait_for_workflow_to_stop(
    process_id: UUIDstr | UUID,
    check_interval: int,
    max_retries: int,
) -> ProcessTable | None:
    """Waits until any step in the workflow reaches a terminal status.
    Times out after max_retries * check_interval seconds.
    :param process_id: ID of the workflow process
    :param check_interval: Seconds between checks
    :param max_retries: Max number of retries before giving up
    :return: process object if it has stopped, None if it timed out
    """
    for attempt in range(max_retries):
        if process := get_stopped_process_by_id(process_id):
            msg = f"✅ Process {process_id} has stopped with status: {process.last_status}"
            logger.info(msg)
            return process
        msg = f"⏳ Attempt {attempt + 1}/{max_retries}: Waiting for workflow to progress..."
        logger.info(msg)
        time.sleep(check_interval)
    msg = f"❌ Timeout reached. Workflow {process_id} did not stop after {max_retries * check_interval} seconds."
    logger.error(msg)
    return None