"""Helper methods that are used across :term:`GSO`.""" import ipaddress import re from ipaddress import IPv4Address from uuid import UUID import pycountry from orchestrator import step from orchestrator.types import State, UUIDstr from pydantic import BaseModel from pydantic_forms.validators import Choice from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock from gso.products.product_blocks.router import RouterVendor from gso.products.product_types.iptrunk import Iptrunk from gso.products.product_types.router import Router from gso.services import provisioning_proxy from gso.services.netbox_client import NetboxClient from gso.services.subscriptions import get_active_subscriptions_by_field_and_value class LAGMember(BaseModel): """A :term:`LAG` member interface that consists of a name and description. TODO: validate interface name """ interface_name: str interface_description: str def __hash__(self) -> int: """Calculate the hash based on the interface name and description, so that uniqueness can be determined. TODO: Check if this is still needed """ return hash((self.interface_name, self.interface_description)) @step("[COMMIT] Set ISIS metric to 90.000") def set_isis_to_90000(subscription: Iptrunk, process_id: UUIDstr, callback_route: str, tt_number: str) -> State: """Workflow step for setting the :term:`ISIS` metric to 90k as an arbitrarily high value to drain a link.""" old_isis_metric = subscription.iptrunk.iptrunk_isis_metric subscription.iptrunk.iptrunk_isis_metric = 90000 provisioning_proxy.provision_ip_trunk( subscription, process_id, callback_route, tt_number, "isis_interface", dry_run=False, ) return { "subscription": subscription, "old_isis_metric": old_isis_metric, } def available_interfaces_choices(router_id: UUID, speed: str) -> Choice | None: """Return a list of available interfaces for a given router and speed. For Nokia routers, return a list of available interfaces. For Juniper routers, return a string. """ if Router.from_subscription(router_id).router.router_vendor != RouterVendor.NOKIA: return None interfaces = { interface["name"]: f"{interface['name']} - {interface['module']['display']} - {interface['description']}" for interface in NetboxClient().get_available_interfaces(router_id, speed) } return Choice("ae member", zip(interfaces.keys(), interfaces.items(), strict=True)) # type: ignore[arg-type] def available_interfaces_choices_including_current_members( router_id: UUID | UUIDstr, speed: str, interfaces: list[IptrunkInterfaceBlock], ) -> Choice | None: """Return a list of available interfaces for a given router and speed including the current members. For Nokia routers, return a list of available interfaces. For Juniper routers, return a string. """ if Router.from_subscription(router_id).router.router_vendor != RouterVendor.NOKIA: return None available_interfaces = list(NetboxClient().get_available_interfaces(router_id, speed)) available_interfaces.extend( [ NetboxClient().get_interface_by_name_and_device( interface.interface_name, Router.from_subscription(router_id).router.router_fqdn, ) for interface in interfaces ], ) options = { interface["name"]: f"{interface['name']} - {interface['module']['display']} - {interface['description']}" for interface in available_interfaces } return Choice("ae member", zip(options.keys(), options.items(), strict=True)) # type: ignore[arg-type] def available_lags_choices(router_id: UUID) -> Choice | None: """Return a list of available lags for a given router. For Nokia routers, return a list of available lags. For Juniper routers, return a string. """ if Router.from_subscription(router_id).router.router_vendor != RouterVendor.NOKIA: return None side_a_ae_iface_list = NetboxClient().get_available_lags(router_id) return Choice("ae iface", zip(side_a_ae_iface_list, side_a_ae_iface_list, strict=True)) # type: ignore[arg-type] def get_router_vendor(router_id: UUID) -> str: """Retrieve the vendor of a router. Args: ---- router_id (UUID): The {term}`UUID` of the router. Returns: ------- str: The vendor of the router. """ return Router.from_subscription(router_id).router.router_vendor def iso_from_ipv4(ipv4_address: IPv4Address) -> str: """Calculate an :term:`ISO` address, based on an IPv4 address. :param IPv4Address ipv4_address: The address that's to be converted :returns: An :term:`ISO`-formatted address. """ padded_octets = [f"{x:>03}" for x in str(ipv4_address).split(".")] joined_octets = "".join(padded_octets) re_split = ".".join(re.findall("....", joined_octets)) return f"49.51e5.0001.{re_split}.00" def validate_router_in_netbox(subscription_id: UUIDstr) -> UUIDstr | None: """Verify if a device exists in Netbox. Args: ---- subscription_id (UUID): The {term}`UUID` of the router subscription. Returns: ------- UUID: The {term}`UUID` of the router subscription or raises an error. """ router = Router.from_subscription(subscription_id).router if router.router_vendor == RouterVendor.NOKIA: device = NetboxClient().get_device_by_name(router.router_fqdn) if not device: msg = "The selected router does not exist in Netbox." raise ValueError(msg) return subscription_id def validate_iptrunk_unique_interface(interfaces: list[LAGMember]) -> list[LAGMember]: """Verify if the interfaces are unique. Args: ---- interfaces (list[LAGMember]): The list of interfaces. Returns: ------- list[LAGMember]: The list of interfaces or raises an error. """ interface_names = [member.interface_name for member in interfaces] if len(interface_names) != len(set(interface_names)): msg = "Interfaces must be unique." raise ValueError(msg) return interfaces def validate_site_fields_is_unique(field_name: str, value: str | int) -> str | int: """Validate that a site field is unique.""" if len(get_active_subscriptions_by_field_and_value(field_name, str(value))) > 0: msg = f"{field_name} must be unique" raise ValueError(msg) return value def validate_ipv4_or_ipv6(value: str) -> str: """Validate that a value is a valid IPv4 or IPv6 address.""" try: ipaddress.ip_address(value) except ValueError as e: msg = "Enter a valid IPv4 or IPv6 address." raise ValueError(msg) from e else: return value def validate_country_code(country_code: str) -> str: """Validate that a country code is valid.""" try: pycountry.countries.lookup(country_code) except LookupError as e: msg = "Invalid or non-existent country code, it must be in ISO 3166-1 alpha-2 format." raise ValueError(msg) from e else: return country_code def validate_site_name(site_name: str) -> str: """Validate the site name. The site name must consist of three uppercase letters (A-Z) followed by an optional single digit (0-9). """ pattern = re.compile(r"^[A-Z]{3}[0-9]?$") if not pattern.match(site_name): msg = ( "Enter a valid site name. It must consist of three uppercase letters (A-Z) followed by an optional single " f"digit (0-9). Received: {site_name}" ) raise ValueError(msg) return site_name