"""Helper methods that are used across :term:`GSO`.""" import ipaddress import re from ipaddress import IPv4Address from uuid import UUID import pycountry from orchestrator import step from orchestrator.types import State, UUIDstr from pydantic import BaseModel from pydantic_forms.validators import Choice from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock from gso.products.product_blocks.router import RouterVendor from gso.products.product_types.iptrunk import Iptrunk from gso.products.product_types.router import Router from gso.services import provisioning_proxy from gso.services.netbox_client import NetboxClient from gso.services.subscriptions import get_active_subscriptions_by_field_and_value class LAGMember(BaseModel): """A :term:`LAG` member interface that consists of a name and description. TODO: validate interface name """ interface_name: str interface_description: str def __hash__(self) -> int: """Calculate the hash based on the interface name and description, so that uniqueness can be determined. TODO: Check if this is still needed """ return hash((self.interface_name, self.interface_description)) @step("[COMMIT] Set ISIS metric to 90.000") def set_isis_to_90000(subscription: Iptrunk, process_id: UUIDstr, callback_route: str, tt_number: str) -> State: """Workflow step for setting the :term:`ISIS` metric to 90k as an arbitrarily high value to drain a link.""" old_isis_metric = subscription.iptrunk.iptrunk_isis_metric subscription.iptrunk.iptrunk_isis_metric = 90000 provisioning_proxy.provision_ip_trunk( subscription, process_id, callback_route, tt_number, "isis_interface", dry_run=False, ) return { "subscription": subscription, "old_isis_metric": old_isis_metric, } def available_interfaces_choices(router_id: UUID, speed: str) -> Choice | None: """Return a list of available interfaces for a given router and speed. For Nokia routers, return a list of available interfaces. For Juniper routers, return a string. """ if get_router_vendor(router_id) != RouterVendor.NOKIA: return None interfaces = { interface["name"]: f"{interface['name']} - {interface['module']['display']} - {interface['description']}" for interface in NetboxClient().get_available_interfaces(router_id, speed) } return Choice("ae member", zip(interfaces.keys(), interfaces.items(), strict=True)) # type: ignore[arg-type] def available_interfaces_choices_including_current_members( router_id: UUID, speed: str, interfaces: list[IptrunkInterfaceBlock], ) -> Choice | None: """Return a list of available interfaces for a given router and speed including the current members. For Nokia routers, return a list of available interfaces. For Juniper routers, return a string. """ if get_router_vendor(router_id) != RouterVendor.NOKIA: return None available_interfaces = list(NetboxClient().get_available_interfaces(router_id, speed)) available_interfaces.extend( [ NetboxClient().get_interface_by_name_and_device( interface.interface_name, Router.from_subscription(router_id).router.router_fqdn, ) for interface in interfaces ], ) options = { interface["name"]: f"{interface['name']} - {interface['module']['display']} - {interface['description']}" for interface in available_interfaces } return Choice("ae member", zip(options.keys(), options.items(), strict=True)) # type: ignore[arg-type] def available_lags_choices(router_id: UUID) -> Choice | None: """Return a list of available lags for a given router. For Nokia routers, return a list of available lags. For Juniper routers, return ``None``. """ if get_router_vendor(router_id) != RouterVendor.NOKIA: return None side_a_ae_iface_list = NetboxClient().get_available_lags(router_id) return Choice("ae iface", zip(side_a_ae_iface_list, side_a_ae_iface_list, strict=True)) # type: ignore[arg-type] def get_router_vendor(router_id: UUID) -> RouterVendor: """Retrieve the vendor of a router. :param router_id: The :term:`UUID` of the router. :type router_id: :class:`uuid.UUID` :return: The vendor of the router. :rtype: RouterVendor: """ return Router.from_subscription(router_id).router.vendor def iso_from_ipv4(ipv4_address: IPv4Address) -> str: """Calculate an :term:`ISO` address, based on an IPv4 address. :param IPv4Address ipv4_address: The address that's to be converted :returns: An :term:`ISO`-formatted address. """ padded_octets = [f"{x:>03}" for x in str(ipv4_address).split(".")] joined_octets = "".join(padded_octets) re_split = ".".join(re.findall("....", joined_octets)) return f"49.51e5.0001.{re_split}.00" def validate_router_in_netbox(subscription_id: UUIDstr) -> UUIDstr: """Verify if a device exists in Netbox. Raises a :class:`ValueError` if the device is not found. :param subscription_id: The :term:`UUID` of the router subscription. :type subscription_id: :class:`UUIDstr` :return: The :term:`UUID` of the router subscription. :rtype: :class:`UUIDstr` """ router_type = Router.from_subscription(subscription_id) if router_type.router.vendor == RouterVendor.NOKIA: device = NetboxClient().get_device_by_name(router_type.router.router_fqdn) if not device: msg = "The selected router does not exist in Netbox." raise ValueError(msg) return subscription_id def validate_iptrunk_unique_interface(interfaces: list[LAGMember]) -> list[LAGMember]: """Verify if the interfaces are unique. Raises a :class:`ValueError` if the interfaces are not unique. :param interfaces: The list of interfaces. :type interfaces: list[:class:`LAGMember`] :return: The list of interfaces :rtype: list[:class:`LAGMember`] """ interface_names = [member.interface_name for member in interfaces] if len(interface_names) != len(set(interface_names)): msg = "Interfaces must be unique." raise ValueError(msg) return interfaces def validate_site_fields_is_unique(field_name: str, value: str | int) -> str | int: """Validate that a site field is unique.""" if len(get_active_subscriptions_by_field_and_value(field_name, str(value))) > 0: msg = f"{field_name} must be unique" raise ValueError(msg) return value def validate_ipv4_or_ipv6(value: str) -> str: """Validate that a value is a valid IPv4 or IPv6 address.""" try: ipaddress.ip_address(value) except ValueError as e: msg = "Enter a valid IPv4 or IPv6 address." raise ValueError(msg) from e else: return value def validate_country_code(country_code: str) -> str: """Validate that a country code is valid.""" try: pycountry.countries.lookup(country_code) except LookupError as e: msg = "Invalid or non-existent country code, it must be in ISO 3166-1 alpha-2 format." raise ValueError(msg) from e else: return country_code def validate_site_name(site_name: str) -> str: """Validate the site name. The site name must consist of three uppercase letters, optionally followed by a single digit. """ pattern = re.compile(r"^[A-Z]{3}[0-9]?$") if not pattern.match(site_name): msg = ( "Enter a valid site name. It must consist of three uppercase letters (A-Z), followed by an optional single " f"digit (0-9). Received: {site_name}" ) raise ValueError(msg) return site_name