-
Neda Moeini authoredNeda Moeini authored
helpers.py 12.28 KiB
"""Helper methods that are used across :term:`GSO`."""
import ipaddress
import re
from enum import StrEnum
from uuid import UUID
import pycountry
from orchestrator.types import UUIDstr
from pydantic import BaseModel, field_validator
from pydantic_core.core_schema import ValidationInfo
from pydantic_forms.validators import Choice
from gso import settings
from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock
from gso.products.product_blocks.router import RouterRole
from gso.products.product_blocks.site import LatitudeCoordinate, LongitudeCoordinate, SiteTier
from gso.products.product_types.router import Router
from gso.services.netbox_client import NetboxClient
from gso.services.subscriptions import get_active_router_subscriptions, get_active_subscriptions_by_field_and_value
from gso.utils.shared_enums import IPv4AddressType, Vendor
class LAGMember(BaseModel):
"""A :term:`LAG` member interface that consists of a name and description."""
interface_name: str
interface_description: str | None = None
def __hash__(self) -> int:
"""Calculate the hash based on the interface name and description, so that uniqueness can be determined."""
return hash((self.interface_name, self.interface_description))
class SNMPVersion(StrEnum):
"""An enumerator for the two relevant versions of :term:`SNMP`: v2c and 3."""
V2C = "v2c"
V3 = "v3"
def available_interfaces_choices(router_id: UUID, speed: str) -> Choice | None:
"""Return a list of available interfaces for a given router and speed.
For Nokia routers, return a list of available interfaces.
For Juniper routers, return a string.
"""
if get_router_vendor(router_id) != Vendor.NOKIA:
return None
interfaces = {
interface["name"]: f"{interface["name"]} {interface["description"]}"
for interface in NetboxClient().get_available_interfaces(router_id, speed)
}
return Choice("ae member", zip(interfaces.keys(), interfaces.items(), strict=True)) # type: ignore[arg-type]
def available_interfaces_choices_including_current_members(
router_id: UUID,
speed: str,
interfaces: list[IptrunkInterfaceBlock],
) -> Choice | None:
"""Return a list of available interfaces for a given router and speed including the current members.
For Nokia routers, return a list of available interfaces.
For Juniper routers, return a string.
"""
if get_router_vendor(router_id) != Vendor.NOKIA:
return None
available_interfaces = list(NetboxClient().get_available_interfaces(router_id, speed))
available_interfaces.extend(
[
NetboxClient().get_interface_by_name_and_device(
interface.interface_name,
Router.from_subscription(router_id).router.router_fqdn,
)
for interface in interfaces
],
)
options = {
interface["name"]: f"{interface["name"]} {interface["description"]}" for interface in available_interfaces
}
return Choice("ae member", zip(options.keys(), options.items(), strict=True)) # type: ignore[arg-type]
def available_lags_choices(router_id: UUID) -> Choice | None:
"""Return a list of available lags for a given router.
For Nokia routers, return a list of available lags.
For Juniper routers, return ``None``.
"""
if get_router_vendor(router_id) != Vendor.NOKIA:
return None
side_a_ae_iface_list = NetboxClient().get_available_lags(router_id)
return Choice("ae iface", zip(side_a_ae_iface_list, side_a_ae_iface_list, strict=True)) # type: ignore[arg-type]
def get_router_vendor(router_id: UUID) -> Vendor:
"""Retrieve the vendor of a router.
:param router_id: The :term:`UUID` of the router.
:type router_id: :class:`uuid.UUID`
:return: The vendor of the router.
:rtype: Vendor:
"""
return Router.from_subscription(router_id).router.vendor
def iso_from_ipv4(ipv4_address: IPv4AddressType) -> str:
"""Calculate an :term:`ISO` address, based on an IPv4 address.
:param IPv4Address ipv4_address: The address that's to be converted
:returns: An :term:`ISO`-formatted address.
"""
padded_octets = [f"{x:>03}" for x in str(ipv4_address).split(".")]
joined_octets = "".join(padded_octets)
re_split = ".".join(re.findall("....", joined_octets))
return f"49.51e5.0001.{re_split}.00"
def validate_router_in_netbox(subscription_id: UUIDstr) -> UUIDstr:
"""Verify if a device exists in Netbox.
Raises a :class:`ValueError` if the device is not found.
:param subscription_id: The :term:`UUID` of the router subscription.
:type subscription_id: :class:`UUIDstr`
:return: The :term:`UUID` of the router subscription.
:rtype: :class:`UUIDstr`
"""
router_type = Router.from_subscription(subscription_id)
if router_type.router.vendor == Vendor.NOKIA:
device = NetboxClient().get_device_by_name(router_type.router.router_fqdn)
if not device:
msg = "The selected router does not exist in Netbox."
raise ValueError(msg)
return subscription_id
def validate_iptrunk_unique_interface(interfaces: list[LAGMember]) -> list[LAGMember]:
"""Verify if the interfaces are unique.
Raises a :class:`ValueError` if the interfaces are not unique.
:param interfaces: The list of interfaces.
:type interfaces: list[:class:`LAGMember`]
:return: The list of interfaces
:rtype: list[:class:`LAGMember`]
"""
interface_names = [member.interface_name for member in interfaces]
if len(interface_names) != len(set(interface_names)):
msg = "Interfaces must be unique."
raise ValueError(msg)
return interfaces
def validate_site_fields_is_unique(field_name: str, value: str | int) -> None:
"""Validate that a site field is unique."""
if len(get_active_subscriptions_by_field_and_value(field_name, str(value))) > 0:
msg = f"{field_name} must be unique"
raise ValueError(msg)
def validate_ipv4_or_ipv6(value: str) -> str:
"""Validate that a value is a valid IPv4 or IPv6 address."""
try:
ipaddress.ip_address(value)
except ValueError as e:
msg = "Enter a valid IPv4 or IPv6 address."
raise ValueError(msg) from e
else:
return value
def validate_country_code(country_code: str) -> str:
"""Validate that a country code is valid."""
# Check for the UK code before attempting to look it up since it's known as "GB" in the pycountry database.
if country_code != "UK":
try:
pycountry.countries.lookup(country_code)
except LookupError as e:
msg = "Invalid or non-existent country code, it must be in ISO 3166-1 alpha-2 format."
raise ValueError(msg) from e
return country_code
def validate_site_name(site_name: str) -> None:
"""Validate the site name.
The site name must consist of three uppercase letters, optionally followed by a single digit.
"""
pattern = re.compile(r"^[A-Z]{3}[0-9]?$")
if not pattern.match(site_name):
msg = (
"Enter a valid site name. It must consist of three uppercase letters (A-Z), followed by an optional single "
f"digit (0-9). Received: {site_name}"
)
raise ValueError(msg)
class BaseSiteValidatorModel(BaseModel):
"""A base site validator model extended by create site and by import site."""
site_bgp_community_id: int
site_internal_id: int
site_tier: SiteTier
site_ts_address: str
site_country_code: str
site_name: str
site_city: str
site_country: str
site_latitude: LatitudeCoordinate
site_longitude: LongitudeCoordinate
partner: str
@field_validator("site_ts_address")
def validate_ts_address(cls, site_ts_address: str) -> str:
"""Validate that a terminal server address is valid."""
validate_ipv4_or_ipv6(site_ts_address)
return site_ts_address
@field_validator("site_country_code")
def country_code_must_exist(cls, country_code: str) -> str:
"""Validate that the country code exists."""
validate_country_code(country_code)
return country_code
@field_validator("site_ts_address", "site_internal_id", "site_bgp_community_id", "site_name")
def field_must_be_unique(cls, value: str | int, info: ValidationInfo) -> str | int:
"""Validate that a field is unique."""
if not info.field_name:
msg = "Field name must be provided."
raise ValueError(msg)
validate_site_fields_is_unique(info.field_name, value)
return value
@field_validator("site_name")
def site_name_must_be_valid(cls, site_name: str) -> str:
"""Validate the site name.
The site name must consist of three uppercase letters, followed by an optional single digit.
"""
validate_site_name(site_name)
return site_name
def validate_interface_name_list(interface_name_list: list, vendor: str) -> list:
"""Validate that the provided interface name matches the expected pattern.
The expected pattern for the interface name is one of 'ge', 'et', 'xe' followed by a dash '-',
then a number between 0 and 19, a forward slash '/', another number between 0 and 99,
another forward slash '/', and ends with a number between 0 and 99.
For example: 'xe-1/0/0'.
:param list interface_name_list: List of interface names to validate.
:param str vendor: Router vendor to check interface names
:return list: The list of interface names if all match was successful, otherwise it will throw a ValueError
exception.
"""
# For Nokia nothing to do
if vendor == Vendor.NOKIA:
return interface_name_list
pattern = re.compile(r"^(ge|et|xe)-1?[0-9]/[0-9]{1,2}/[0-9]{1,2}$")
for interface in interface_name_list:
if not bool(pattern.match(interface.interface_name)):
error_msg = (
f"Invalid interface name. The interface name should be of format: xe-1/0/0. "
f"Got: [{interface.interface_name}]"
)
raise ValueError(error_msg)
return interface_name_list
def validate_tt_number(tt_number: str) -> str:
"""Validate a string to match a specific pattern.
This method checks if the input string starts with 'TT#' and is followed by exactly 16 digits.
:param str tt_number: The TT number as string to validate
:return str: The TT number string if TT number match was successful, otherwise it will raise a ValueError.
"""
pattern = r"^TT#\d{16}$"
if not bool(re.match(pattern, tt_number)):
err_msg = (
f"The given TT number: {tt_number} is not valid. "
f" A valid TT number starts with 'TT#' followed by 16 digits."
)
raise ValueError(err_msg)
return tt_number
def generate_fqdn(hostname: str, site_name: str, country_code: str) -> str:
"""Generate an :term:`FQDN` from a hostname, site name, and a country code."""
oss = settings.load_oss_params()
return f"{hostname}.{site_name.lower()}.{country_code.lower()}{oss.IPAM.LO.domain_name}"
def generate_inventory_for_active_routers(
router_role: RouterRole,
exclude_routers: list[str] | None = None,
) -> dict:
"""Generate an Ansible-compatible inventory for executing playbooks.
Contains all active routers of a specific role. Optionally, routers can be excluded from the inventory.
Args:
----
router_role (RouterRole): The role of the routers to include in the inventory.
exclude_routers (list): List of routers to exclude from the inventory.
Returns:
-------
dict[str, Any]: A dictionary representing the inventory of active routers.
"""
all_routers = [Router.from_subscription(r["subscription_id"]) for r in get_active_router_subscriptions()]
exclude_routers = exclude_routers or []
return {
"all": {
"hosts": {
router.router.router_fqdn:
{
"lo4": str(router.router.router_lo_ipv4_address),
"lo6": str(router.router.router_lo_ipv6_address),
"vendor": str(router.router.vendor),
}
for router in all_routers
if router.router.router_role == router_role
and router.router.router_fqdn not in exclude_routers
}
}
}