Skip to content
Snippets Groups Projects
Verified Commit 27934bdf authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

Refactor validators into separate types module

parent f1bb7633
No related branches found
No related tags found
1 merge request!265Feature/refactor validators
Showing
with 430 additions and 316 deletions
......@@ -10,10 +10,11 @@ from orchestrator.security import authorize
from orchestrator.services.subscriptions import build_extended_domain_model
from starlette import status
from gso.products.product_blocks.iptrunk import IptrunkType, PhysicalPortCapacity
from gso.products.product_blocks.iptrunk import IptrunkType
from gso.products.product_blocks.router import RouterRole
from gso.products.product_blocks.site import LatitudeCoordinate, LongitudeCoordinate
from gso.services.subscriptions import get_active_iptrunk_subscriptions
from gso.types.coordinates import LatitudeCoordinate, LongitudeCoordinate
from gso.types.interfaces import PhysicalPortCapacity
from gso.utils.shared_enums import Vendor
router = APIRouter(prefix="/networks", tags=["Network"], dependencies=[Depends(authorize)])
......
......@@ -18,7 +18,7 @@ from sqlalchemy.exc import SQLAlchemyError
from gso.db.models import PartnerTable
from gso.products import ProductType
from gso.products.product_blocks.iptrunk import IptrunkType, PhysicalPortCapacity
from gso.products.product_blocks.iptrunk import IptrunkType
from gso.products.product_blocks.router import RouterRole
from gso.services.partners import (
PartnerNotFoundError,
......@@ -31,7 +31,8 @@ from gso.services.subscriptions import (
get_active_subscriptions_by_field_and_value,
get_subscriptions,
)
from gso.utils.helpers import BaseSiteValidatorModel, LAGMember
from gso.types.base_site import BaseSiteValidatorModel
from gso.types.interfaces import LAGMember, LAGMemberList, PhysicalPortCapacity
from gso.utils.shared_enums import IPv4AddressType, IPv6AddressType, PortNumber, Vendor
app: typer.Typer = typer.Typer()
......@@ -115,11 +116,11 @@ class IptrunkImportModel(BaseModel):
side_a_node_id: str
side_a_ae_iface: str
side_a_ae_geant_a_sid: str | None
side_a_ae_members: list[LAGMember]
side_a_ae_members: LAGMemberList[LAGMember]
side_b_node_id: str
side_b_ae_iface: str
side_b_ae_geant_a_sid: str | None
side_b_ae_members: list[LAGMember]
side_b_ae_members: LAGMemberList[LAGMember]
iptrunk_ipv4_network: ipaddress.IPv4Network
iptrunk_ipv6_network: ipaddress.IPv6Network
......@@ -150,15 +151,6 @@ class IptrunkImportModel(BaseModel):
return value
@field_validator("side_a_ae_members", "side_b_ae_members")
def check_side_uniqueness(cls, value: list[str]) -> list[str]:
""":term:`LAG` members must be unique."""
if len(value) != len(set(value)):
msg = "Items must be unique"
raise ValueError(msg)
return value
@model_validator(mode="after")
def check_members(self) -> Self:
"""Amount of :term:`LAG` members has to match on side A and B, and meet the minimum requirement."""
......
......@@ -15,18 +15,7 @@ from gso.products.product_blocks.router import (
RouterBlockInactive,
RouterBlockProvisioning,
)
class PhysicalPortCapacity(strEnum):
"""Physical port capacity enumerator.
An enumerator that has the different possible capacities of ports that are available to use in subscriptions.
"""
ONE_GIGABIT_PER_SECOND = "1G"
TEN_GIGABIT_PER_SECOND = "10G"
HUNDRED_GIGABIT_PER_SECOND = "100G"
FOUR_HUNDRED_GIGABIT_PER_SECOND = "400G"
from gso.types.interfaces import LAGMemberList, PhysicalPortCapacity
class IptrunkType(strEnum):
......@@ -36,9 +25,6 @@ class IptrunkType(strEnum):
LEASED = "Leased"
LAGMemberList = Annotated[
list[T], AfterValidator(validate_unique_list), Len(min_length=0), Doc("A list of :term:`LAG` member interfaces.")
]
IptrunkSides = Annotated[
list[T],
AfterValidator(validate_unique_list),
......
......@@ -3,9 +3,9 @@
from orchestrator.domain.base import ProductBlockModel
from orchestrator.types import SubscriptionLifecycle, strEnum
from gso.products.product_blocks.iptrunk import LAGMemberList
from gso.products.product_blocks.router import RouterBlock, RouterBlockInactive, RouterBlockProvisioning
from gso.products.product_blocks.switch import SwitchBlock, SwitchBlockInactive, SwitchBlockProvisioning
from gso.types.interfaces import LAGMemberList
class LanSwitchInterconnectAddressSpace(strEnum):
......
"""The product block that describes a site subscription."""
import re
from typing import Annotated
from orchestrator.domain.base import ProductBlockModel
from orchestrator.types import SubscriptionLifecycle, strEnum
from pydantic import AfterValidator
from typing_extensions import Doc
MAX_LONGITUDE = 180
MIN_LONGITUDE = -180
MAX_LATITUDE = 90
MIN_LATITUDE = -90
from gso.types.coordinates import LatitudeCoordinate, LongitudeCoordinate
class SiteTier(strEnum):
......@@ -28,56 +19,6 @@ class SiteTier(strEnum):
TIER4 = 4
def validate_latitude(v: str) -> str:
"""Validate a latitude coordinate."""
msg = "Invalid latitude coordinate. Valid examples: '40.7128', '-74.0060', '90', '-90', '0'."
regex = re.compile(r"^-?([1-8]?\d(\.\d+)?|90(\.0+)?)$")
if not regex.match(str(v)):
raise ValueError(msg)
float_v = float(v)
if float_v > MAX_LATITUDE or float_v < MIN_LATITUDE:
raise ValueError(msg)
return v
def validate_longitude(v: str) -> str:
"""Validate a longitude coordinate."""
regex = re.compile(r"^-?(180(\.0+)?|((1[0-7]\d)|([1-9]?\d))(\.\d+)?)$")
msg = "Invalid longitude coordinate. Valid examples: '40.7128', '-74.0060', '180', '-180', '0'."
if not regex.match(v):
raise ValueError(msg)
float_v = float(v)
if float_v > MAX_LONGITUDE or float_v < MIN_LONGITUDE:
raise ValueError(msg)
return v
LatitudeCoordinate = Annotated[
str,
AfterValidator(validate_latitude),
Doc(
"A latitude coordinate, modeled as a string. "
"The coordinate must match the format conforming to the latitude range of -90 to +90 degrees. "
"It can be a floating-point number or an integer. Valid examples: 40.7128, -74.0060, 90, -90, 0."
),
]
LongitudeCoordinate = Annotated[
str,
AfterValidator(validate_longitude),
Doc(
"A longitude coordinate, modeled as a string. "
"The coordinate must match the format conforming to the longitude "
"range of -180 to +180 degrees. It can be a floating-point number or an integer. "
"Valid examples: 40.7128, -74.0060, 180, -180, 0."
),
]
class SiteBlockInactive(
ProductBlockModel,
lifecycle=[SubscriptionLifecycle.INITIAL],
......
......@@ -10,7 +10,7 @@ from requests import HTTPError, Response
from requests.adapters import HTTPAdapter
from gso.settings import load_oss_params
from gso.utils.helpers import SNMPVersion
from gso.types.snmp import SNMPVersion
logger = logging.getLogger(__name__)
......
......@@ -69,20 +69,6 @@ def get_subscriptions(
return [dict(zip(includes, result, strict=True)) for result in results]
def get_active_site_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]:
"""Retrieve active subscriptions specifically for sites.
:param includes: The fields to be included in the returned Subscription objects.
:type includes: list[str]
:return: A list of Subscription objects for sites.
:rtype: list[Subscription]
"""
return get_subscriptions(
product_types=[ProductType.SITE], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=includes
)
def get_router_subscriptions(
includes: list[str] | None = None, lifecycles: list[SubscriptionLifecycle] | None = None
) -> list[SubscriptionType]:
......@@ -122,6 +108,20 @@ def get_provisioning_router_subscriptions(includes: list[str] | None = None) ->
)
def get_active_switch_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]:
"""Retrieve active subscriptions specifically for switches.
:param includes: The fields to be included in the returned Subscription objects.
:type includes: list[str]
:return: A list of Subscription objects for switches.
:rtype: list[Subscription]
"""
return get_subscriptions(
product_types=[ProductType.SWITCH], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=includes
)
def get_active_iptrunk_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]:
"""Retrieve active subscriptions specifically for IP trunks.
......@@ -233,6 +233,18 @@ def get_active_insync_subscriptions() -> list[SubscriptionTable]:
)
def get_active_site_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]:
"""Retrieve active subscriptions specifically for sites.
:param includes: The fields to be included in the returned Subscription objects.
:type includes: list[str]
:return: A list of Subscription objects for sites.
:rtype: list[Subscription]
"""
return get_subscriptions(product_types=["Site"], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=includes) # type: ignore[list-item]
def get_site_by_name(site_name: str) -> Site:
"""Get a site by its name.
......
......@@ -32,16 +32,18 @@
"migrate_to_different_site": "Migrating to a different Site",
"remove_configuration": "Remove configuration from the router",
"clean_up_ipam": "Clean up related entries in IPAM",
"restore_isis_metric": "Restore ISIS metric to original value"
"restore_isis_metric": "Restore ISIS metric to original value",
"confirm_info": "Please verify this form looks correct."
}
},
"workflow": {
"activate_iptrunk": "Activate IP Trunk",
"activate_router": "Activate router",
"confirm_info": "Please verify this form looks correct.",
"activate_router": "Activate Router",
"activate_switch": "Activate Switch",
"create_iptrunk": "Create IP Trunk",
"create_router": "Create Router",
"create_site": "Create Site",
"create_switch": "Create Switch",
"deploy_twamp": "Deploy TWAMP",
"migrate_iptrunk": "Migrate IP Trunk",
"modify_isis_metric": "Modify the ISIS metric",
......@@ -52,6 +54,7 @@
"terminate_iptrunk": "Terminate IP Trunk",
"terminate_router": "Terminate Router",
"terminate_site": "Terminate Site",
"terminate_switch": "Terminate Switch",
"redeploy_base_config": "Redeploy base config",
"update_ibgp_mesh": "Update iBGP mesh",
"create_imported_site": "NOT FOR HUMANS -- Import existing site",
......@@ -67,7 +70,8 @@
"import_super_pop_switch": "NOT FOR HUMANS -- Finalize import into a Super PoP switch",
"import_opengear": "NOT FOR HUMANS -- Finalize import into an OpenGear",
"validate_iptrunk": "Validate IP Trunk configuration",
"validate_router": "Validate router configuration",
"validate_router": "Validate Router configuration",
"validate_switch": "Validate Switch configuration",
"task_validate_geant_products": "Validation task for GEANT products",
"task_send_email_notifications": "Send email notifications for failed tasks",
"task_create_partners": "Create partner task",
......
"""Define custom types for use across the application."""
"""A base site type for validation purposes that can be extended elsewhere."""
from pydantic import BaseModel, field_validator
from pydantic_core.core_schema import ValidationInfo
from gso.products.product_blocks.site import SiteTier
from gso.types.coordinates import LatitudeCoordinate, LongitudeCoordinate
from gso.types.country_code import validate_country_code
from gso.types.ip_address import validate_ipv4_or_ipv6
from gso.types.site_name import validate_site_name
from gso.types.unique_field import validate_field_is_unique
class BaseSiteValidatorModel(BaseModel):
"""A base site validator model extended by create site and by import site."""
site_bgp_community_id: int
site_internal_id: int
site_tier: SiteTier
site_ts_address: str
site_country_code: str
site_name: str
site_city: str
site_country: str
site_latitude: LatitudeCoordinate
site_longitude: LongitudeCoordinate
partner: str
@field_validator("site_ts_address")
def validate_ts_address(cls, site_ts_address: str) -> str:
"""Validate that a terminal server address is valid."""
validate_ipv4_or_ipv6(site_ts_address)
return site_ts_address
@field_validator("site_country_code")
def country_code_must_exist(cls, country_code: str) -> str:
"""Validate that the country code exists."""
validate_country_code(country_code)
return country_code
@field_validator("site_ts_address", "site_internal_id", "site_bgp_community_id", "site_name")
def field_must_be_unique(cls, value: str | int, info: ValidationInfo) -> str | int:
"""Validate that a field is unique."""
if not info.field_name:
msg = "Field name must be provided."
raise ValueError(msg)
validate_field_is_unique(info.field_name, value)
return value
@field_validator("site_name")
def site_name_must_be_valid(cls, site_name: str) -> str:
"""Validate the site name.
The site name must consist of three uppercase letters, followed by an optional single digit.
"""
validate_site_name(site_name)
return site_name
"""Custom coordinate types for latitude and longitude."""
import re
from typing import Annotated
from pydantic import AfterValidator
from typing_extensions import Doc
MAX_LONGITUDE = 180
MIN_LONGITUDE = -180
MAX_LATITUDE = 90
MIN_LATITUDE = -90
def validate_latitude(v: str) -> str:
"""Validate a latitude coordinate."""
msg = "Invalid latitude coordinate. Valid examples: '40.7128', '-74.0060', '90', '-90', '0'."
regex = re.compile(r"^-?([1-8]?\d(\.\d+)?|90(\.0+)?)$")
if not regex.match(str(v)):
raise ValueError(msg)
float_v = float(v)
if float_v > MAX_LATITUDE or float_v < MIN_LATITUDE:
raise ValueError(msg)
return v
def validate_longitude(v: str) -> str:
"""Validate a longitude coordinate."""
regex = re.compile(r"^-?(180(\.0+)?|((1[0-7]\d)|([1-9]?\d))(\.\d+)?)$")
msg = "Invalid longitude coordinate. Valid examples: '40.7128', '-74.0060', '180', '-180', '0'."
if not regex.match(v):
raise ValueError(msg)
float_v = float(v)
if float_v > MAX_LONGITUDE or float_v < MIN_LONGITUDE:
raise ValueError(msg)
return v
LatitudeCoordinate = Annotated[
str,
AfterValidator(validate_latitude),
Doc(
"A latitude coordinate, modeled as a string. "
"The coordinate must match the format conforming to the latitude range of -90 to +90 degrees. "
"It can be a floating-point number or an integer. Valid examples: 40.7128, -74.0060, 90, -90, 0."
),
]
LongitudeCoordinate = Annotated[
str,
AfterValidator(validate_longitude),
Doc(
"A longitude coordinate, modeled as a string. "
"The coordinate must match the format conforming to the longitude "
"range of -180 to +180 degrees. It can be a floating-point number or an integer. "
"Valid examples: 40.7128, -74.0060, 180, -180, 0."
),
]
"""Country codes."""
import pycountry
def validate_country_code(country_code: str) -> str:
"""Validate that a country code is valid."""
# Check for the UK code before attempting to look it up since it's known as "GB" in the pycountry database.
if country_code != "UK":
try:
pycountry.countries.lookup(country_code)
except LookupError as e:
msg = "Invalid or non-existent country code, it must be in ISO 3166-1 alpha-2 format."
raise ValueError(msg) from e
return country_code
"""Custom types for interfaces, both physical and logical."""
import re
from typing import Annotated
from annotated_types import Len
from orchestrator.domain.base import T
from pydantic import AfterValidator, BaseModel
from pydantic_forms.types import strEnum
from pydantic_forms.validators import validate_unique_list
from typing_extensions import Doc
class LAGMember(BaseModel):
"""A :term:`LAG` member interface that consists of a name and description."""
interface_name: str
interface_description: str | None = None
def __hash__(self) -> int:
"""Calculate the hash based on the interface name and description, so that uniqueness can be determined."""
return hash(self.interface_name)
def validate_interface_names_are_unique(interfaces: list[LAGMember]) -> list[LAGMember]:
"""Verify if interfaces are unique.
Raises a :class:`ValueError` if the interfaces are not unique.
:param interfaces: The list of interfaces.
:type interfaces: list[:class:`utils.types.LAGMember`]
:return: The list of interfaces
:rtype: list[:class:`utils.types.LAGMember`]
"""
interface_names = [member.interface_name for member in interfaces]
if len(interface_names) != len(set(interface_names)):
msg = "Interfaces must be unique."
raise ValueError(msg)
return interfaces
def validate_juniper_phy_interface_name(interface_name: str) -> str:
"""Validate that the provided interface name matches the expected pattern.
The expected pattern for the interface name is one of 'ge', 'et', 'xe' followed by a dash '-',
then a number between 0 and 19, a forward slash '/', another number between 0 and 99,
another forward slash '/', and ends with a number between 0 and 99.
For example: 'xe-1/0/0'. This only applies to Juniper-brand hardware.
:param str interface_name: Interface name to validate.
:return str: The interface name if match was successful, otherwise it will throw a ValueError exception.
"""
pattern = re.compile(r"^(ge|et|xe)-1?[0-9]/[0-9]{1,2}/[0-9]{1,2}$")
if not bool(pattern.match(interface_name)):
error_msg = (
f"Invalid interface name. The interface name should be of format: xe-1/0/0. " f"Got: {interface_name}"
)
raise ValueError(error_msg)
return interface_name
def validate_juniper_ae_interface_name(interface_name: str) -> str:
"""Validate that the provided interface name matches the expected pattern for a :term:`LAG` interface.
Interface names must match the pattern 'ae' followed by a one- or two-digit number.
"""
juniper_lag_re = re.compile("^ae\\d{1,2}$")
if not juniper_lag_re.match(interface_name):
msg = "Invalid LAG name, please try again."
raise ValueError(msg)
return interface_name
JuniperPhyInterface = Annotated[str, AfterValidator(validate_juniper_phy_interface_name)]
JuniperAEInterface = Annotated[str, AfterValidator(validate_juniper_ae_interface_name)]
LAGMemberList = Annotated[
list[T],
AfterValidator(validate_unique_list),
AfterValidator(validate_interface_names_are_unique),
Len(min_length=0),
Doc("A list of :term:`LAG` member interfaces."),
]
class JuniperLAGMember(LAGMember):
"""A Juniper-specific :term:`LAG` member interface."""
interface_name: JuniperPhyInterface
class PhysicalPortCapacity(strEnum):
"""Physical port capacity enumerator.
An enumerator that has the different possible capacities of ports that are available to use in subscriptions.
"""
ONE_GIGABIT_PER_SECOND = "1G"
TEN_GIGABIT_PER_SECOND = "10G"
HUNDRED_GIGABIT_PER_SECOND = "100G"
FOUR_HUNDRED_GIGABIT_PER_SECOND = "400G"
"""IP addresses."""
import ipaddress
def validate_ipv4_or_ipv6(value: str) -> str:
"""Validate that a value is a valid IPv4 or IPv6 address."""
try:
ipaddress.ip_address(value)
except ValueError as e:
msg = "Enter a valid IPv4 or IPv6 address."
raise ValueError(msg) from e
else:
return value
"""A router that must be present in Netbox."""
from pydantic_forms.types import UUIDstr
from gso.products.product_types.router import Router
from gso.services.netbox_client import NetboxClient
from gso.utils.shared_enums import Vendor
def validate_router_in_netbox(subscription_id: UUIDstr) -> UUIDstr:
"""Verify if a device exists in Netbox.
Raises a :class:`ValueError` if the device is not found.
:param subscription_id: The :term:`UUID` of the router subscription.
:type subscription_id: :class:`UUIDstr`
:return: The :term:`UUID` of the router subscription.
:rtype: :class:`UUIDstr`
"""
router_type = Router.from_subscription(subscription_id)
if router_type.router.vendor == Vendor.NOKIA:
device = NetboxClient().get_device_by_name(router_type.router.router_fqdn)
if not device:
msg = "The selected router does not exist in Netbox."
raise ValueError(msg)
return subscription_id
"""Type for the name of a site."""
import re
def validate_site_name(site_name: str) -> None:
"""Validate the site name.
The site name must consist of three uppercase letters, optionally followed by a single digit.
"""
pattern = re.compile(r"^[A-Z]{3}[0-9]?$")
if not pattern.match(site_name):
msg = (
"Enter a valid site name. It must consist of three uppercase letters (A-Z), followed by an optional single "
f"digit (0-9). Received: {site_name}"
)
raise ValueError(msg)
"""An enumerator of SNMP version numbers."""
from enum import StrEnum
class SNMPVersion(StrEnum):
"""An enumerator for the two relevant versions of :term:`SNMP`: v2c and 3."""
V2C = "v2c"
V3 = "v3"
"""A Trouble Ticket number."""
import re
from typing import Annotated
from pydantic import AfterValidator
def validate_tt_number(tt_number: str) -> str:
"""Validate a string to match a specific pattern.
This method checks if the input string starts with 'TT#' and is followed by exactly 16 digits.
:param str tt_number: The TT number as string to validate
:return str: The TT number string if TT number match was successful, otherwise it will raise a ValueError.
"""
pattern = r"^TT#\d{16}$"
if not bool(re.match(pattern, tt_number)):
err_msg = (
f"The given TT number: {tt_number} is not valid. "
f" A valid TT number starts with 'TT#' followed by 16 digits."
)
raise ValueError(err_msg)
return tt_number
TTNumber = Annotated[str, AfterValidator(validate_tt_number)]
"""An input field that must be unique in the database."""
from gso.services import subscriptions
def validate_field_is_unique(field_name: str, value: str | int) -> None:
"""Validate that a site field is unique."""
if len(subscriptions.get_active_subscriptions_by_field_and_value(field_name, str(value))) > 0:
msg = f"{field_name} must be unique"
raise ValueError(msg)
"""Helper methods that are used across :term:`GSO`."""
import ipaddress
import re
from enum import StrEnum
from typing import TYPE_CHECKING
from uuid import UUID
import pycountry
from orchestrator.types import UUIDstr
from pydantic import BaseModel, field_validator
from pydantic_core.core_schema import ValidationInfo
from pydantic_forms.validators import Choice
from gso import settings
from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock, PhysicalPortCapacity
from gso.products.product_blocks.router import RouterRole
from gso.products.product_blocks.site import LatitudeCoordinate, LongitudeCoordinate, SiteTier
from gso.products.product_types.router import Router
from gso.services import subscriptions
from gso.services.netbox_client import NetboxClient
from gso.services.subscriptions import get_active_router_subscriptions, get_active_subscriptions_by_field_and_value
from gso.types.interfaces import PhysicalPortCapacity
from gso.utils.shared_enums import IPv4AddressType, Vendor
class LAGMember(BaseModel):
"""A :term:`LAG` member interface that consists of a name and description."""
interface_name: str
interface_description: str | None = None
def __hash__(self) -> int:
"""Calculate the hash based on the interface name and description, so that uniqueness can be determined."""
return hash((self.interface_name, self.interface_description))
class SNMPVersion(StrEnum):
"""An enumerator for the two relevant versions of :term:`SNMP`: v2c and 3."""
V2C = "v2c"
V3 = "v3"
if TYPE_CHECKING:
from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock
def available_interfaces_choices(router_id: UUID, speed: str) -> Choice | None:
......@@ -57,7 +36,7 @@ def available_interfaces_choices(router_id: UUID, speed: str) -> Choice | None:
def available_interfaces_choices_including_current_members(
router_id: UUID,
speed: str,
interfaces: list[IptrunkInterfaceBlock],
interfaces: list["IptrunkInterfaceBlock"],
) -> Choice | None:
"""Return a list of available interfaces for a given router and speed including the current members.
......@@ -119,184 +98,6 @@ def iso_from_ipv4(ipv4_address: IPv4AddressType) -> str:
return f"49.51e5.0001.{re_split}.00"
def validate_router_in_netbox(subscription_id: UUIDstr) -> UUIDstr:
"""Verify if a device exists in Netbox.
Raises a :class:`ValueError` if the device is not found.
:param subscription_id: The :term:`UUID` of the router subscription.
:type subscription_id: :class:`UUIDstr`
:return: The :term:`UUID` of the router subscription.
:rtype: :class:`UUIDstr`
"""
router_type = Router.from_subscription(subscription_id)
if router_type.router.vendor == Vendor.NOKIA:
device = NetboxClient().get_device_by_name(router_type.router.router_fqdn)
if not device:
msg = "The selected router does not exist in Netbox."
raise ValueError(msg)
return subscription_id
def validate_iptrunk_unique_interface(interfaces: list[LAGMember]) -> list[LAGMember]:
"""Verify if the interfaces are unique.
Raises a :class:`ValueError` if the interfaces are not unique.
:param interfaces: The list of interfaces.
:type interfaces: list[:class:`LAGMember`]
:return: The list of interfaces
:rtype: list[:class:`LAGMember`]
"""
interface_names = [member.interface_name for member in interfaces]
if len(interface_names) != len(set(interface_names)):
msg = "Interfaces must be unique."
raise ValueError(msg)
return interfaces
def validate_site_fields_is_unique(field_name: str, value: str | int) -> None:
"""Validate that a site field is unique."""
if len(get_active_subscriptions_by_field_and_value(field_name, str(value))) > 0:
msg = f"{field_name} must be unique"
raise ValueError(msg)
def validate_ipv4_or_ipv6(value: str) -> str:
"""Validate that a value is a valid IPv4 or IPv6 address."""
try:
ipaddress.ip_address(value)
except ValueError as e:
msg = "Enter a valid IPv4 or IPv6 address."
raise ValueError(msg) from e
else:
return value
def validate_country_code(country_code: str) -> str:
"""Validate that a country code is valid."""
# Check for the UK code before attempting to look it up since it's known as "GB" in the pycountry database.
if country_code != "UK":
try:
pycountry.countries.lookup(country_code)
except LookupError as e:
msg = "Invalid or non-existent country code, it must be in ISO 3166-1 alpha-2 format."
raise ValueError(msg) from e
return country_code
def validate_site_name(site_name: str) -> None:
"""Validate the site name.
The site name must consist of three uppercase letters, optionally followed by a single digit.
"""
pattern = re.compile(r"^[A-Z]{3}[0-9]?$")
if not pattern.match(site_name):
msg = (
"Enter a valid site name. It must consist of three uppercase letters (A-Z), followed by an optional single "
f"digit (0-9). Received: {site_name}"
)
raise ValueError(msg)
class BaseSiteValidatorModel(BaseModel):
"""A base site validator model extended by create site and by import site."""
site_bgp_community_id: int
site_internal_id: int
site_tier: SiteTier
site_ts_address: str
site_country_code: str
site_name: str
site_city: str
site_country: str
site_latitude: LatitudeCoordinate
site_longitude: LongitudeCoordinate
partner: str
@field_validator("site_ts_address")
def validate_ts_address(cls, site_ts_address: str) -> str:
"""Validate that a terminal server address is valid."""
validate_ipv4_or_ipv6(site_ts_address)
return site_ts_address
@field_validator("site_country_code")
def country_code_must_exist(cls, country_code: str) -> str:
"""Validate that the country code exists."""
validate_country_code(country_code)
return country_code
@field_validator("site_ts_address", "site_internal_id", "site_bgp_community_id", "site_name")
def field_must_be_unique(cls, value: str | int, info: ValidationInfo) -> str | int:
"""Validate that a field is unique."""
if not info.field_name:
msg = "Field name must be provided."
raise ValueError(msg)
validate_site_fields_is_unique(info.field_name, value)
return value
@field_validator("site_name")
def site_name_must_be_valid(cls, site_name: str) -> str:
"""Validate the site name.
The site name must consist of three uppercase letters, followed by an optional single digit.
"""
validate_site_name(site_name)
return site_name
def validate_interface_name_list(interface_name_list: list, vendor: str) -> list:
"""Validate that the provided interface name matches the expected pattern.
The expected pattern for the interface name is one of 'ge', 'et', 'xe' followed by a dash '-',
then a number between 0 and 19, a forward slash '/', another number between 0 and 99,
another forward slash '/', and ends with a number between 0 and 99.
For example: 'xe-1/0/0'.
:param list interface_name_list: List of interface names to validate.
:param str vendor: Router vendor to check interface names
:return list: The list of interface names if all match was successful, otherwise it will throw a ValueError
exception.
"""
# For Nokia nothing to do
if vendor == Vendor.NOKIA:
return interface_name_list
pattern = re.compile(r"^(ge|et|xe)-1?[0-9]/[0-9]{1,2}/[0-9]{1,2}$")
for interface in interface_name_list:
if not bool(pattern.match(interface.interface_name)):
error_msg = (
f"Invalid interface name. The interface name should be of format: xe-1/0/0. "
f"Got: [{interface.interface_name}]"
)
raise ValueError(error_msg)
return interface_name_list
def validate_tt_number(tt_number: str) -> str:
"""Validate a string to match a specific pattern.
This method checks if the input string starts with 'TT#' and is followed by exactly 16 digits.
:param str tt_number: The TT number as string to validate
:return str: The TT number string if TT number match was successful, otherwise it will raise a ValueError.
"""
pattern = r"^TT#\d{16}$"
if not bool(re.match(pattern, tt_number)):
err_msg = (
f"The given TT number: {tt_number} is not valid. "
f" A valid TT number starts with 'TT#' followed by 16 digits."
)
raise ValueError(err_msg)
return tt_number
def generate_fqdn(hostname: str, site_name: str, country_code: str) -> str:
"""Generate an :term:`FQDN` from a hostname, site name, and a country code."""
oss = settings.load_oss_params()
......@@ -318,7 +119,9 @@ def generate_inventory_for_active_routers(
:return: A dictionary representing the inventory of active routers.
:rtype: dict[str, Any]
"""
all_routers = [Router.from_subscription(r["subscription_id"]) for r in get_active_router_subscriptions()]
all_routers = [
Router.from_subscription(r["subscription_id"]) for r in subscriptions.get_active_router_subscriptions()
]
exclude_routers = exclude_routers or []
return {
......@@ -351,3 +154,33 @@ def calculate_recommended_minimum_links(iptrunk_number_of_members: int, iptrunk_
if iptrunk_speed == PhysicalPortCapacity.FOUR_HUNDRED_GIGABIT_PER_SECOND:
return iptrunk_number_of_members - 1
return iptrunk_number_of_members
def active_site_selector() -> Choice:
"""Generate a dropdown selector for choosing an active site in an input form."""
site_subscriptions = {
str(site["subscription_id"]): site["description"]
for site in subscriptions.get_active_site_subscriptions(includes=["subscription_id", "description"])
}
return Choice("Select a site", zip(site_subscriptions.keys(), site_subscriptions.items(), strict=True)) # type: ignore[arg-type]
def active_router_selector() -> Choice:
"""Generate a dropdown selector for choosing an active Router in an input form."""
router_subscriptions = {
str(router["subscription_id"]): router["description"]
for router in subscriptions.get_active_router_subscriptions(includes=["subscription_id", "description"])
}
return Choice("Select a router", zip(router_subscriptions.keys(), router_subscriptions.items(), strict=True)) # type: ignore[arg-type]
def active_switch_selector() -> Choice:
"""Generate a dropdown selector for choosing an active Switch in an input form."""
switch_subscriptions = {
str(switch["subscription_id"]): switch["description"]
for switch in subscriptions.get_active_switch_subscriptions(includes=["subscription_id", "description"])
}
return Choice("Select a switch", zip(switch_subscriptions.keys(), switch_subscriptions.items(), strict=True)) # type: ignore[arg-type]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment