Newer
Older
"""Helper methods that are used across :term:`GSO`."""
Karel van Klink
committed
import re
from ipaddress import IPv4Address
Karel van Klink
committed
from orchestrator import step
from orchestrator.types import State, UUIDstr
from pydantic import BaseModel, validator
from pydantic.fields import ModelField
from pydantic_forms.validators import Choice

Neda Moeini
committed
from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock
from gso.products.product_blocks.router import RouterVendor
from gso.products.product_blocks.site import SiteTier
from gso.products.product_types.iptrunk import Iptrunk
from gso.products.product_types.router import Router
from gso.services import provisioning_proxy
from gso.services.netbox_client import NetboxClient
from gso.services.subscriptions import get_active_subscriptions_by_field_and_value
"""A :term:`LAG` member interface that consists of a name and description.
TODO: validate interface name
"""
interface_name: str
interface_description: str
def __hash__(self) -> int:
"""Calculate the hash based on the interface name and description, so that uniqueness can be determined.
TODO: Check if this is still needed
"""
return hash((self.interface_name, self.interface_description))
@step("[COMMIT] Set ISIS metric to 90.000")
def set_isis_to_90000(subscription: Iptrunk, process_id: UUIDstr, callback_route: str, tt_number: str) -> State:
"""Workflow step for setting the :term:`ISIS` metric to 90k as an arbitrarily high value to drain a link."""
old_isis_metric = subscription.iptrunk.iptrunk_isis_metric
subscription.iptrunk.iptrunk_isis_metric = 90000
subscription,
process_id,
callback_route,
tt_number,
"isis_interface",
dry_run=False,
return {
"subscription": subscription,
"old_isis_metric": old_isis_metric,
}
def available_interfaces_choices(router_id: UUID, speed: str) -> Choice | None:
"""Return a list of available interfaces for a given router and speed.
For Nokia routers, return a list of available interfaces.
For Juniper routers, return a string.
"""
if get_router_vendor(router_id) != RouterVendor.NOKIA:
return None
interfaces = {
interface["name"]: f"{interface['name']} - {interface['module']['display']} - {interface['description']}"
for interface in NetboxClient().get_available_interfaces(router_id, speed)
Karel van Klink
committed
return Choice("ae member", zip(interfaces.keys(), interfaces.items(), strict=True)) # type: ignore[arg-type]

Neda Moeini
committed
def available_interfaces_choices_including_current_members(
Karel van Klink
committed
speed: str,
interfaces: list[IptrunkInterfaceBlock],

Neda Moeini
committed
) -> Choice | None:
"""Return a list of available interfaces for a given router and speed including the current members.
For Nokia routers, return a list of available interfaces.
For Juniper routers, return a string.
"""
if get_router_vendor(router_id) != RouterVendor.NOKIA:

Neda Moeini
committed
return None

Neda Moeini
committed
available_interfaces = list(NetboxClient().get_available_interfaces(router_id, speed))
available_interfaces.extend(
[
Karel van Klink
committed
interface.interface_name,
Router.from_subscription(router_id).router.router_fqdn,

Neda Moeini
committed
for interface in interfaces
Karel van Klink
committed
],

Neda Moeini
committed
)
options = {
interface["name"]: f"{interface['name']} - {interface['module']['display']} - {interface['description']}"
for interface in available_interfaces
}
Karel van Klink
committed
return Choice("ae member", zip(options.keys(), options.items(), strict=True)) # type: ignore[arg-type]

Neda Moeini
committed
def available_lags_choices(router_id: UUID) -> Choice | None:
"""Return a list of available lags for a given router.
For Nokia routers, return a list of available lags.
For Juniper routers, return ``None``.
if get_router_vendor(router_id) != RouterVendor.NOKIA:
side_a_ae_iface_list = NetboxClient().get_available_lags(router_id)
Karel van Klink
committed
return Choice("ae iface", zip(side_a_ae_iface_list, side_a_ae_iface_list, strict=True)) # type: ignore[arg-type]
def get_router_vendor(router_id: UUID) -> RouterVendor:
"""Retrieve the vendor of a router.
:param router_id: The :term:`UUID` of the router.
:type router_id: :class:`uuid.UUID`
return Router.from_subscription(router_id).router.vendor
Karel van Klink
committed
def iso_from_ipv4(ipv4_address: IPv4Address) -> str:
"""Calculate an :term:`ISO` address, based on an IPv4 address.
Karel van Klink
committed
:param IPv4Address ipv4_address: The address that's to be converted
:returns: An :term:`ISO`-formatted address.
Karel van Klink
committed
"""
padded_octets = [f"{x:>03}" for x in str(ipv4_address).split(".")]
joined_octets = "".join(padded_octets)
re_split = ".".join(re.findall("....", joined_octets))
Karel van Klink
committed
return f"49.51e5.0001.{re_split}.00"
def validate_router_in_netbox(subscription_id: UUIDstr) -> UUIDstr:
"""Verify if a device exists in Netbox.
Raises a :class:`ValueError` if the device is not found.
:param subscription_id: The :term:`UUID` of the router subscription.
:type subscription_id: :class:`UUIDstr`
:return: The :term:`UUID` of the router subscription.
:rtype: :class:`UUIDstr`
router_type = Router.from_subscription(subscription_id)
if router_type.router.vendor == RouterVendor.NOKIA:
device = NetboxClient().get_device_by_name(router_type.router.router_fqdn)
Karel van Klink
committed
msg = "The selected router does not exist in Netbox."
raise ValueError(msg)
def validate_iptrunk_unique_interface(interfaces: list[LAGMember]) -> list[LAGMember]:
"""Verify if the interfaces are unique.
Raises a :class:`ValueError` if the interfaces are not unique.
:param interfaces: The list of interfaces.
:type interfaces: list[:class:`LAGMember`]
:return: The list of interfaces
:rtype: list[:class:`LAGMember`]
"""
interface_names = [member.interface_name for member in interfaces]
if len(interface_names) != len(set(interface_names)):
Karel van Klink
committed
msg = "Interfaces must be unique."
raise ValueError(msg)
def validate_site_fields_is_unique(field_name: str, value: str | int) -> str | int:
"""Validate that a site field is unique."""
if len(get_active_subscriptions_by_field_and_value(field_name, str(value))) > 0:
Karel van Klink
committed
msg = f"{field_name} must be unique"
raise ValueError(msg)
return value
def validate_ipv4_or_ipv6(value: str) -> str:
"""Validate that a value is a valid IPv4 or IPv6 address."""
try:
ipaddress.ip_address(value)
Karel van Klink
committed
except ValueError as e:
msg = "Enter a valid IPv4 or IPv6 address."
raise ValueError(msg) from e
def validate_country_code(country_code: str) -> str:
"""Validate that a country code is valid."""
try:
pycountry.countries.lookup(country_code)
Karel van Klink
committed
except LookupError as e:
msg = "Invalid or non-existent country code, it must be in ISO 3166-1 alpha-2 format."
raise ValueError(msg) from e
def validate_site_name(site_name: str) -> str:
"""Validate the site name.
The site name must consist of three uppercase letters, optionally followed by a single digit.
"""
pattern = re.compile(r"^[A-Z]{3}[0-9]?$")
"Enter a valid site name. It must consist of three uppercase letters (A-Z), followed by an optional single "
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
class BaseSiteValidatorModel(BaseModel):
"""A base site validator model extended by create site and by import site."""
site_bgp_community_id: int
site_internal_id: int
site_tier: SiteTier
site_ts_address: str
@validator("site_ts_address", check_fields=False, allow_reuse=True)
def validate_ts_address(cls, site_ts_address: str) -> str:
"""Validate that a terminal server address is valid."""
validate_ipv4_or_ipv6(site_ts_address)
return site_ts_address
@validator("site_country_code", check_fields=False, allow_reuse=True)
def country_code_must_exist(cls, country_code: str) -> str:
"""Validate that the country code exists."""
validate_country_code(country_code)
return country_code
@validator(
"site_ts_address",
"site_internal_id",
"site_bgp_community_id",
"site_name",
check_fields=False,
allow_reuse=True,
)
def validate_unique_fields(cls, value: str, field: ModelField) -> str | int:
"""Validate that the internal and :term:`BGP` community IDs are unique."""
return validate_site_fields_is_unique(field.name, value)
@validator("site_name", check_fields=False, allow_reuse=True)
def site_name_must_be_valid(cls, site_name: str) -> str:
"""Validate the site name.
The site name must consist of three uppercase letters, followed by an optional single digit.
"""
validate_site_name(site_name)
return site_name