Skip to content
Snippets Groups Projects
helpers.py 6.43 KiB
Newer Older
import ipaddress
from uuid import UUID
import pycountry
from orchestrator import step
from orchestrator.types import State, UUIDstr
from pydantic import BaseModel
from pydantic_forms.validators import Choice
from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock
from gso.products.product_blocks.router import RouterVendor
from gso.products.product_types.iptrunk import Iptrunk
from gso.products.product_types.router import Router
from gso.services import provisioning_proxy
from gso.services.netbox_client import NetboxClient
from gso.services.subscriptions import get_active_subscriptions_by_field_and_value
class LAGMember(BaseModel):
    #  TODO: validate interface name
    interface_name: str
    interface_description: str

    def __hash__(self) -> int:
        #  TODO: check if this is still needed
        return hash((self.interface_name, self.interface_description))


@step("[COMMIT] Set ISIS metric to 90.000")
def set_isis_to_90000(subscription: Iptrunk, process_id: UUIDstr, callback_route: str, tt_number: str) -> State:
    old_isis_metric = subscription.iptrunk.iptrunk_isis_metric
    subscription.iptrunk.iptrunk_isis_metric = 90000
    provisioning_proxy.provision_ip_trunk(subscription, process_id, callback_route, tt_number, "isis_interface", False)

    return {
        "subscription": subscription,
        "old_isis_metric": old_isis_metric,
    }


def available_interfaces_choices(router_id: UUID, speed: str) -> Choice | None:
    """Return a list of available interfaces for a given router and speed.

    For Nokia routers, return a list of available interfaces.
    For Juniper routers, return a string.
    """
    if Router.from_subscription(router_id).router.router_vendor != RouterVendor.NOKIA:
        return None
    interfaces = {
        interface["name"]: f"{interface['name']} - {interface['module']['display']} - {interface['description']}"
        for interface in NetboxClient().get_available_interfaces(router_id, speed)
    }
    return Choice("ae member", zip(interfaces.keys(), interfaces.items()))  # type: ignore[arg-type]


def available_interfaces_choices_including_current_members(
    router_id: UUID, speed: str, interfaces: list[IptrunkInterfaceBlock]
) -> Choice | None:
    """Return a list of available interfaces for a given router and speed including the current members.

    For Nokia routers, return a list of available interfaces.
    For Juniper routers, return a string.
    """
    if Router.from_subscription(router_id).router.router_vendor != RouterVendor.NOKIA:
        return None
    available_interfaces = list(NetboxClient().get_available_interfaces(router_id, speed))
    available_interfaces.extend(
        [
Neda Moeini's avatar
Neda Moeini committed
            NetboxClient().get_interface_by_name_and_device(
                interface.interface_name, Router.from_subscription(router_id).router.router_fqdn
            )
            for interface in interfaces
        ]
    )
    options = {
        interface["name"]: f"{interface['name']} - {interface['module']['display']} - {interface['description']}"
        for interface in available_interfaces
    }
    return Choice("ae member", zip(options.keys(), options.items()))  # type: ignore[arg-type]


def available_lags_choices(router_id: UUID) -> Choice | None:
    """Return a list of available lags for a given router.

    For Nokia routers, return a list of available lags.
    For Juniper routers, return a string.
    """

    if Router.from_subscription(router_id).router.router_vendor != RouterVendor.NOKIA:
        return None
    side_a_ae_iface_list = NetboxClient().get_available_lags(router_id)
    return Choice("ae iface", zip(side_a_ae_iface_list, side_a_ae_iface_list))  # type: ignore[arg-type]


def get_router_vendor(router_id: UUID) -> str:
    """Retrieve the vendor of a router.

    Args:
    ----
    router_id (UUID): The {term}`UUID` of the router.

    Returns:
    -------
    str: The vendor of the router.
    """
    return Router.from_subscription(router_id).router.router_vendor


def iso_from_ipv4(ipv4_address: IPv4Address) -> str:
    """Calculate an :term:`ISO` address, based on an IPv4 address.
Karel van Klink's avatar
Karel van Klink committed
    :param IPv4Address ipv4_address: The address that's to be converted
    :returns: An :term:`ISO`-formatted address.
    """
    padded_octets = [f"{x:>03}" for x in str(ipv4_address).split(".")]
    joined_octets = "".join(padded_octets)
    re_split = ".".join(re.findall("....", joined_octets))
    return ".".join(["49.51e5.0001", re_split, "00"])


def validate_router_in_netbox(subscription_id: UUIDstr) -> UUIDstr | None:
    """Verify if a device exists in Netbox.

    Args:
    ----
    subscription_id (UUID): The {term}`UUID` of the router subscription.

    Returns:
    -------
    UUID: The {term}`UUID` of the router subscription or raises an error.
    """
    router = Router.from_subscription(subscription_id).router
    if router.router_vendor == RouterVendor.NOKIA:
        device = NetboxClient().get_device_by_name(router.router_fqdn)
        if not device:
            raise ValueError("The selected router does not exist in Netbox.")
    return subscription_id


def validate_iptrunk_unique_interface(interfaces: list[LAGMember]) -> list[LAGMember]:
    """Verify if the interfaces are unique.

    Args:
    ----
    interfaces (list[LAGMember]): The list of interfaces.

    Returns:
    -------
    list[LAGMember]: The list of interfaces or raises an error.
    """
    interface_names = [member.interface_name for member in interfaces]
    if len(interface_names) != len(set(interface_names)):
        raise ValueError("Interfaces must be unique.")
    return interfaces


def validate_site_fields_is_unique(field_name: str, value: str | int) -> str | int:
    """Validate that a site field is unique."""
    if len(get_active_subscriptions_by_field_and_value(field_name, str(value))) > 0:
        raise ValueError(f"{field_name} must be unique")
    return value


def validate_ipv4_or_ipv6(value: str) -> str:
    """Validate that a value is a valid IPv4 or IPv6 address."""
    try:
        ipaddress.ip_address(value)
        return value
    except ValueError:
        raise ValueError("Enter a valid IPv4 or IPv6 address.")


def validate_country_code(country_code: str) -> str:
    """Validate that a country code is valid."""
    try:
        pycountry.countries.lookup(country_code)
        return country_code
    except LookupError:
        raise ValueError("Invalid or non-existent country code, it must be in ISO 3166-1 alpha-2 format.")