Newer
Older
"""Helper methods that are used across :term:`GSO`."""
Karel van Klink
committed
import re
from ipaddress import IPv4Address
Karel van Klink
committed
from orchestrator import step
from orchestrator.types import State, UUIDstr
from pydantic import BaseModel
from pydantic_forms.validators import Choice

Neda Moeini
committed
from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock
from gso.products.product_blocks.router import RouterVendor
from gso.products.product_types.iptrunk import Iptrunk
from gso.products.product_types.router import Router
from gso.services import provisioning_proxy
from gso.services.netbox_client import NetboxClient
from gso.services.subscriptions import get_active_subscriptions_by_field_and_value
"""A :term:`LAG` member interface that consists of a name and description."""
# TODO: validate interface name
interface_name: str
interface_description: str
def __hash__(self) -> int:
"""Calculate the hash based on the interface name and description, so that uniqueness can be determined."""
# TODO: check if this is still needed
return hash((self.interface_name, self.interface_description))
@step("[COMMIT] Set ISIS metric to 90.000")
def set_isis_to_90000(subscription: Iptrunk, process_id: UUIDstr, callback_route: str, tt_number: str) -> State:
"""Workflow step for setting the :term:`ISIS` metric to 90k as an arbitrarily high value to drain a link."""
old_isis_metric = subscription.iptrunk.iptrunk_isis_metric
subscription.iptrunk.iptrunk_isis_metric = 90000
subscription,
process_id,
callback_route,
tt_number,
"isis_interface",
dry_run=False,
return {
"subscription": subscription,
"old_isis_metric": old_isis_metric,
}
def available_interfaces_choices(router_id: UUID, speed: str) -> Choice | None:
"""Return a list of available interfaces for a given router and speed.
For Nokia routers, return a list of available interfaces.
For Juniper routers, return a string.
"""
if Router.from_subscription(router_id).router.router_vendor != RouterVendor.NOKIA:
return None
interfaces = {
interface["name"]: f"{interface['name']} - {interface['module']['display']} - {interface['description']}"
for interface in NetboxClient().get_available_interfaces(router_id, speed)
Karel van Klink
committed
return Choice("ae member", zip(interfaces.keys(), interfaces.items(), strict=True)) # type: ignore[arg-type]

Neda Moeini
committed
def available_interfaces_choices_including_current_members(
Karel van Klink
committed
router_id: UUID | UUIDstr,
speed: str,
interfaces: list[IptrunkInterfaceBlock],

Neda Moeini
committed
) -> Choice | None:
"""Return a list of available interfaces for a given router and speed including the current members.
For Nokia routers, return a list of available interfaces.
For Juniper routers, return a string.
"""
if Router.from_subscription(router_id).router.router_vendor != RouterVendor.NOKIA:
return None

Neda Moeini
committed
available_interfaces = list(NetboxClient().get_available_interfaces(router_id, speed))
available_interfaces.extend(
[
Karel van Klink
committed
interface.interface_name,
Router.from_subscription(router_id).router.router_fqdn,

Neda Moeini
committed
for interface in interfaces
Karel van Klink
committed
],

Neda Moeini
committed
)
options = {
interface["name"]: f"{interface['name']} - {interface['module']['display']} - {interface['description']}"
for interface in available_interfaces
}
Karel van Klink
committed
return Choice("ae member", zip(options.keys(), options.items(), strict=True)) # type: ignore[arg-type]

Neda Moeini
committed
def available_lags_choices(router_id: UUID) -> Choice | None:
"""Return a list of available lags for a given router.
For Nokia routers, return a list of available lags.
For Juniper routers, return a string.
"""
if Router.from_subscription(router_id).router.router_vendor != RouterVendor.NOKIA:
return None
side_a_ae_iface_list = NetboxClient().get_available_lags(router_id)
Karel van Klink
committed
return Choice("ae iface", zip(side_a_ae_iface_list, side_a_ae_iface_list, strict=True)) # type: ignore[arg-type]
def get_router_vendor(router_id: UUID) -> str:
"""Retrieve the vendor of a router.
Args:
----
router_id (UUID): The {term}`UUID` of the router.
Returns:
-------
str: The vendor of the router.
"""
return Router.from_subscription(router_id).router.router_vendor
Karel van Klink
committed
def iso_from_ipv4(ipv4_address: IPv4Address) -> str:
"""Calculate an :term:`ISO` address, based on an IPv4 address.
Karel van Klink
committed
:param IPv4Address ipv4_address: The address that's to be converted
:returns: An :term:`ISO`-formatted address.
Karel van Klink
committed
"""
padded_octets = [f"{x:>03}" for x in str(ipv4_address).split(".")]
joined_octets = "".join(padded_octets)
re_split = ".".join(re.findall("....", joined_octets))
Karel van Klink
committed
return f"49.51e5.0001.{re_split}.00"
def validate_router_in_netbox(subscription_id: UUIDstr) -> UUIDstr | None:
"""Verify if a device exists in Netbox.
Args:
----
subscription_id (UUID): The {term}`UUID` of the router subscription.
Returns:
-------
UUID: The {term}`UUID` of the router subscription or raises an error.
"""
router = Router.from_subscription(subscription_id).router
if router.router_vendor == RouterVendor.NOKIA:
device = NetboxClient().get_device_by_name(router.router_fqdn)
if not device:
Karel van Klink
committed
msg = "The selected router does not exist in Netbox."
raise ValueError(msg)
def validate_iptrunk_unique_interface(interfaces: list[LAGMember]) -> list[LAGMember]:
"""Verify if the interfaces are unique.
Args:
----
interfaces (list[LAGMember]): The list of interfaces.
Returns:
-------
list[LAGMember]: The list of interfaces or raises an error.
"""
interface_names = [member.interface_name for member in interfaces]
if len(interface_names) != len(set(interface_names)):
Karel van Klink
committed
msg = "Interfaces must be unique."
raise ValueError(msg)
def validate_site_fields_is_unique(field_name: str, value: str | int) -> str | int:
"""Validate that a site field is unique."""
if len(get_active_subscriptions_by_field_and_value(field_name, str(value))) > 0:
Karel van Klink
committed
msg = f"{field_name} must be unique"
raise ValueError(msg)
return value
def validate_ipv4_or_ipv6(value: str) -> str:
"""Validate that a value is a valid IPv4 or IPv6 address."""
try:
ipaddress.ip_address(value)
Karel van Klink
committed
except ValueError as e:
msg = "Enter a valid IPv4 or IPv6 address."
raise ValueError(msg) from e
def validate_country_code(country_code: str) -> str:
"""Validate that a country code is valid."""
try:
pycountry.countries.lookup(country_code)
Karel van Klink
committed
except LookupError as e:
msg = "Invalid or non-existent country code, it must be in ISO 3166-1 alpha-2 format."
raise ValueError(msg) from e
def validate_site_name(site_name: str) -> str:
"""Validate the site name.
The site name must consist of three uppercase letters (A-Z) followed by an optional single digit (0-9).
"""
pattern = re.compile(r"^[A-Z]{3}[0-9]?$")
msg = (
"Enter a valid site name. It must consist of three uppercase letters (A-Z) followed by an optional single "