Newer
Older
Karel van Klink
committed
import re
from ipaddress import IPv4Address
Karel van Klink
committed
from orchestrator import step
from orchestrator.types import State, UUIDstr
from pydantic import BaseModel
from pydantic_forms.validators import Choice

Neda Moeini
committed
from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock
from gso.products.product_blocks.router import RouterVendor
from gso.products.product_types.iptrunk import Iptrunk
from gso.products.product_types.router import Router
from gso.services import provisioning_proxy
from gso.services.netbox_client import NetboxClient
from gso.services.subscriptions import get_active_subscriptions_by_field_and_value
class LAGMember(BaseModel):
# TODO: validate interface name
interface_name: str
interface_description: str
def __hash__(self) -> int:
# TODO: check if this is still needed
return hash((self.interface_name, self.interface_description))
@step("[COMMIT] Set ISIS metric to 90.000")
def set_isis_to_90000(subscription: Iptrunk, process_id: UUIDstr, callback_route: str, tt_number: str) -> State:
old_isis_metric = subscription.iptrunk.iptrunk_isis_metric
subscription.iptrunk.iptrunk_isis_metric = 90000
provisioning_proxy.provision_ip_trunk(
subscription, process_id, callback_route, tt_number, "isis_interface", dry_run=False,
)
return {
"subscription": subscription,
"old_isis_metric": old_isis_metric,
}
def available_interfaces_choices(router_id: UUID, speed: str) -> Choice | None:
"""Return a list of available interfaces for a given router and speed.
For Nokia routers, return a list of available interfaces.
For Juniper routers, return a string.
"""
if Router.from_subscription(router_id).router.router_vendor != RouterVendor.NOKIA:
return None
interfaces = {
interface["name"]: f"{interface['name']} - {interface['module']['display']} - {interface['description']}"
for interface in NetboxClient().get_available_interfaces(router_id, speed)
Karel van Klink
committed
return Choice("ae member", zip(interfaces.keys(), interfaces.items(), strict=True)) # type: ignore[arg-type]

Neda Moeini
committed
def available_interfaces_choices_including_current_members(
Karel van Klink
committed
router_id: UUID | UUIDstr,
speed: str,
interfaces: list[IptrunkInterfaceBlock],

Neda Moeini
committed
) -> Choice | None:
"""Return a list of available interfaces for a given router and speed including the current members.
For Nokia routers, return a list of available interfaces.
For Juniper routers, return a string.
"""
if Router.from_subscription(router_id).router.router_vendor != RouterVendor.NOKIA:
return None

Neda Moeini
committed
available_interfaces = list(NetboxClient().get_available_interfaces(router_id, speed))
available_interfaces.extend(
[
Karel van Klink
committed
interface.interface_name,
Router.from_subscription(router_id).router.router_fqdn,

Neda Moeini
committed
for interface in interfaces
Karel van Klink
committed
],

Neda Moeini
committed
)
options = {
interface["name"]: f"{interface['name']} - {interface['module']['display']} - {interface['description']}"
for interface in available_interfaces
}
Karel van Klink
committed
return Choice("ae member", zip(options.keys(), options.items(), strict=True)) # type: ignore[arg-type]

Neda Moeini
committed
def available_lags_choices(router_id: UUID) -> Choice | None:
"""Return a list of available lags for a given router.
For Nokia routers, return a list of available lags.
For Juniper routers, return a string.
"""
if Router.from_subscription(router_id).router.router_vendor != RouterVendor.NOKIA:
return None
side_a_ae_iface_list = NetboxClient().get_available_lags(router_id)
Karel van Klink
committed
return Choice("ae iface", zip(side_a_ae_iface_list, side_a_ae_iface_list, strict=True)) # type: ignore[arg-type]
def get_router_vendor(router_id: UUID) -> str:
"""Retrieve the vendor of a router.
Args:
----
router_id (UUID): The {term}`UUID` of the router.
Returns:
-------
str: The vendor of the router.
"""
return Router.from_subscription(router_id).router.router_vendor
Karel van Klink
committed
def iso_from_ipv4(ipv4_address: IPv4Address) -> str:
"""Calculate an :term:`ISO` address, based on an IPv4 address.
Karel van Klink
committed
:param IPv4Address ipv4_address: The address that's to be converted
:returns: An :term:`ISO`-formatted address.
Karel van Klink
committed
"""
padded_octets = [f"{x:>03}" for x in str(ipv4_address).split(".")]
joined_octets = "".join(padded_octets)
re_split = ".".join(re.findall("....", joined_octets))
Karel van Klink
committed
return f"49.51e5.0001.{re_split}.00"
def validate_router_in_netbox(subscription_id: UUIDstr) -> UUIDstr | None:
"""Verify if a device exists in Netbox.
Args:
----
subscription_id (UUID): The {term}`UUID` of the router subscription.
Returns:
-------
UUID: The {term}`UUID` of the router subscription or raises an error.
"""
router = Router.from_subscription(subscription_id).router
if router.router_vendor == RouterVendor.NOKIA:
device = NetboxClient().get_device_by_name(router.router_fqdn)
if not device:
Karel van Klink
committed
msg = "The selected router does not exist in Netbox."
raise ValueError(msg)
def validate_iptrunk_unique_interface(interfaces: list[LAGMember]) -> list[LAGMember]:
"""Verify if the interfaces are unique.
Args:
----
interfaces (list[LAGMember]): The list of interfaces.
Returns:
-------
list[LAGMember]: The list of interfaces or raises an error.
"""
interface_names = [member.interface_name for member in interfaces]
if len(interface_names) != len(set(interface_names)):
Karel van Klink
committed
msg = "Interfaces must be unique."
raise ValueError(msg)
def validate_site_fields_is_unique(field_name: str, value: str | int) -> str | int:
"""Validate that a site field is unique."""
if len(get_active_subscriptions_by_field_and_value(field_name, str(value))) > 0:
Karel van Klink
committed
msg = f"{field_name} must be unique"
raise ValueError(msg)
return value
def validate_ipv4_or_ipv6(value: str) -> str:
"""Validate that a value is a valid IPv4 or IPv6 address."""
try:
ipaddress.ip_address(value)
return value
Karel van Klink
committed
except ValueError as e:
msg = "Enter a valid IPv4 or IPv6 address."
raise ValueError(msg) from e
def validate_country_code(country_code: str) -> str:
"""Validate that a country code is valid."""
try:
pycountry.countries.lookup(country_code)
return country_code
Karel van Klink
committed
except LookupError as e:
msg = "Invalid or non-existent country code, it must be in ISO 3166-1 alpha-2 format."
raise ValueError(msg) from e
def validate_site_name(site_name: str) -> str:
"""Validate the site name.
The site name must consist of three uppercase letters (A-Z) followed by an optional single digit (0-9).
"""
pattern = re.compile(r"^[A-Z]{3}[0-9]?$")
Karel van Klink
committed
msg = "Enter a valid site name. It must consist of three uppercase letters (A-Z) followed by an optional single digit (0-9)."
Karel van Klink
committed
msg,