diff --git a/gso/__init__.py b/gso/__init__.py index ecdfd940ffefe85df1613e4a6cbbc74f56bf80dc..94c8f5fa273a584cf542da257c0289952567857d 100644 --- a/gso/__init__.py +++ b/gso/__init__.py @@ -28,7 +28,7 @@ def init_worker_app() -> OrchestratorCore: def init_cli_app() -> typer.Typer: """Initialise :term:`GSO` as a CLI application.""" - from gso.cli import imports, netbox # noqa: PLC0415 + from gso.cli import imports, netbox cli_app.add_typer(imports.app, name="import-cli") cli_app.add_typer(netbox.app, name="netbox-cli") diff --git a/gso/api/v1/imports.py b/gso/api/v1/imports.py index bde96f2253572c1350cc0a550f33debe19d799d2..10ebe45e061b4117b36ec4d02e8e3ce932969c20 100644 --- a/gso/api/v1/imports.py +++ b/gso/api/v1/imports.py @@ -12,11 +12,10 @@ from pydantic import BaseModel, field_validator, model_validator from gso.auth.security import opa_security_default from gso.products.product_blocks.iptrunk import IptrunkType, PhysicalPortCapacity from gso.products.product_blocks.router import RouterRole -from gso.products.product_blocks.site import SiteTier from gso.services import subscriptions from gso.services.partners import PartnerNotFoundError, get_partner_by_name from gso.utils.helpers import BaseSiteValidatorModel, LAGMember -from gso.utils.shared_enums import PortNumber, Vendor +from gso.utils.shared_enums import FancyIPV4Address, FancyIPV6Address, PortNumber, Vendor router = APIRouter(prefix="/imports", tags=["Imports"], dependencies=[Depends(opa_security_default)]) @@ -31,16 +30,6 @@ class ImportResponseModel(BaseModel): class SiteImportModel(BaseSiteValidatorModel): """The required input for importing an existing :class:`gso.products.product_types.site`.""" - site_name: str - site_city: str - site_country: str - site_country_code: str - site_latitude: float - site_longitude: float - site_bgp_community_id: int - site_internal_id: int - site_tier: SiteTier - site_ts_address: str partner: str @@ -53,8 +42,8 @@ class RouterImportModel(BaseModel): ts_port: int router_vendor: Vendor router_role: RouterRole - router_lo_ipv4_address: ipaddress.IPv4Address - router_lo_ipv6_address: ipaddress.IPv6Address + router_lo_ipv4_address: FancyIPV4Address + router_lo_ipv6_address: FancyIPV6Address router_lo_iso_address: str @@ -140,7 +129,7 @@ class SuperPopSwitchImportModel(BaseModel): super_pop_switch_site: str hostname: str super_pop_switch_ts_port: PortNumber - super_pop_switch_mgmt_ipv4_address: ipaddress.IPv4Address + super_pop_switch_mgmt_ipv4_address: FancyIPV4Address class OfficeRouterImportModel(BaseModel): @@ -150,8 +139,8 @@ class OfficeRouterImportModel(BaseModel): office_router_site: str office_router_fqdn: str office_router_ts_port: PortNumber - office_router_lo_ipv4_address: ipaddress.IPv4Address - office_router_lo_ipv6_address: ipaddress.IPv6Address + office_router_lo_ipv4_address: FancyIPV4Address + office_router_lo_ipv6_address: FancyIPV6Address def _start_process(process_name: str, data: dict) -> UUID: diff --git a/gso/api/v1/subscriptions.py b/gso/api/v1/subscriptions.py index bf4e96bfb6c0b2932d183d1c2707f2a18bf5a608..e2221779b66786429bb4db61f2c94bfee53e8371 100644 --- a/gso/api/v1/subscriptions.py +++ b/gso/api/v1/subscriptions.py @@ -1,5 +1,4 @@ """:term:`API` endpoint for fetching different types of subscriptions.""" - from typing import Any from fastapi import Depends, Response, status @@ -20,6 +19,17 @@ router = APIRouter( ) +# class MySubscriptionDomainModelSchema(SubscriptionDomainModelSchema): +# model_config = ConfigDict( +# extra="allow", +# json_encoders={ +# # datetime: lambda dt: dt.timestamp(), +# ipaddress.IPv4Address: lambda v: 1/0, +# ipaddress.IPv6Address: lambda v: str(v), +# } +# ) + + @router.get( "/routers", status_code=status.HTTP_200_OK, diff --git a/gso/monkeypatches.py b/gso/monkeypatches.py index 2e94f50bdd27288e4ce7d829036ffbc8f022ef20..7c929d9f0b7e3b58a968373b6d51daab36feb36e 100644 --- a/gso/monkeypatches.py +++ b/gso/monkeypatches.py @@ -3,9 +3,12 @@ This adjustment is typically done to extend or modify the functionality of the original oauth2_lib package to meet specific requirements of the gso application. """ +from datetime import datetime +from ipaddress import IPv4Address, IPv6Address import oauth2_lib.fastapi import oauth2_lib.settings +from pydantic import BaseModel from gso.auth.oidc_policy_helper import HTTPX_SSL_CONTEXT, OIDCUser, OIDCUserModel, opa_decision from gso.auth.settings import oauth2lib_settings @@ -15,3 +18,9 @@ oauth2_lib.fastapi.OIDCUserModel = OIDCUserModel # type: ignore[assignment, mis oauth2_lib.fastapi.opa_decision = opa_decision # type: ignore[assignment] oauth2_lib.fastapi.HTTPX_SSL_CONTEXT = HTTPX_SSL_CONTEXT oauth2_lib.settings.oauth2lib_settings = oauth2lib_settings # type: ignore[assignment] + +BaseModel.model_config["json_encoders"] = { + datetime: lambda dt: dt.timestamp(), + IPv4Address: lambda v: str(v), + IPv6Address: lambda v: str(v), +} diff --git a/gso/oss-params-example.json b/gso/oss-params-example.json index ff6eb33dc70bcaad8cf453e5d678483cadb79c5f..38f52ff78425211e1a7de5e4bccd6f5ab9dc3cac 100644 --- a/gso/oss-params-example.json +++ b/gso/oss-params-example.json @@ -45,7 +45,7 @@ }, "LT_IAS": { "V4": {"containers": ["10.255.255.0/24"], "networks": [], "mask": 31}, - "V6": {"containers": ["dead:beef:cc::/48"], "networks": [], "mask": 126}, + "V6": {"containers": [ "2001:798:1::/48"], "networks": [], "mask": 126}, "domain_name": ".geantip", "dns_view": "default", "network_view": "default" @@ -73,6 +73,7 @@ "PROVISIONING_PROXY": { "scheme": "https", "api_base": "localhost:44444", + "auth": "Bearer <token>", "api_version": 1123 }, "CELERY": { diff --git a/gso/products/product_blocks/iptrunk.py b/gso/products/product_blocks/iptrunk.py index b9442fad884d889659853acca92a0ebd542fc512..ab0fb22b200625a2a49cf78d436c4a7971ad699e 100644 --- a/gso/products/product_blocks/iptrunk.py +++ b/gso/products/product_blocks/iptrunk.py @@ -1,7 +1,7 @@ """IP trunk product block that has all parameters of a subscription throughout its lifecycle.""" import ipaddress -from typing import Annotated, TypeVar +from typing import Annotated from annotated_types import Len from orchestrator.domain.base import ProductBlockModel, T @@ -35,8 +35,8 @@ class IptrunkType(strEnum): LEASED = "Leased" -# A list of :term:`LAG` member interfaces. LAGMemberList = Annotated[list[T], AfterValidator(validate_unique_list), Len(min_length=0, max_length=None)] +IptrunkSides = Annotated[list[T], AfterValidator(validate_unique_list), Len(min_length=2, max_length=2)] class IptrunkInterfaceBlockInactive( @@ -64,16 +64,6 @@ class IptrunkInterfaceBlock(IptrunkInterfaceBlockProvisioning, lifecycle=[Subscr interface_description: str -def validate_unique_list(value): - if len(value) != len(set(value)): - raise ValueError("List items must be unique") - return value - - -T_co = TypeVar("T_co", covariant=True) -IptrunkSides = Annotated[list[T_co], AfterValidator(validate_unique_list), Len(min_length=2, max_length=2)] - - class IptrunkSideBlockInactive( ProductBlockModel, lifecycle=[SubscriptionLifecycle.INITIAL], diff --git a/gso/products/product_blocks/office_router.py b/gso/products/product_blocks/office_router.py index fec7ad8d16366baf12ec3528748f71aa2fa36d90..c94bc9de79e67c2801122b4d76d7b12befbd6dbe 100644 --- a/gso/products/product_blocks/office_router.py +++ b/gso/products/product_blocks/office_router.py @@ -1,6 +1,5 @@ """Product block for :class:`office router` products.""" -import ipaddress from orchestrator.domain.base import ProductBlockModel from orchestrator.types import SubscriptionLifecycle @@ -10,7 +9,7 @@ from gso.products.product_blocks.site import ( SiteBlockInactive, SiteBlockProvisioning, ) -from gso.utils.shared_enums import PortNumber, Vendor +from gso.utils.shared_enums import FancyIPV4Address, FancyIPV6Address, PortNumber, Vendor class OfficeRouterBlockInactive( @@ -22,8 +21,8 @@ class OfficeRouterBlockInactive( office_router_fqdn: str | None = None office_router_ts_port: PortNumber | None = None - office_router_lo_ipv4_address: ipaddress.IPv4Address | None = None - office_router_lo_ipv6_address: ipaddress.IPv6Address | None = None + office_router_lo_ipv4_address: FancyIPV4Address | None = None + office_router_lo_ipv6_address: FancyIPV6Address | None = None office_router_site: SiteBlockInactive | None vendor: Vendor | None = None @@ -33,8 +32,8 @@ class OfficeRouterBlockProvisioning(OfficeRouterBlockInactive, lifecycle=[Subscr office_router_fqdn: str | None = None office_router_ts_port: PortNumber | None = None - office_router_lo_ipv4_address: ipaddress.IPv4Address | None = None - office_router_lo_ipv6_address: ipaddress.IPv6Address | None = None + office_router_lo_ipv4_address: FancyIPV4Address | None = None + office_router_lo_ipv6_address: FancyIPV6Address | None = None office_router_site: SiteBlockProvisioning | None vendor: Vendor | None = None @@ -47,9 +46,9 @@ class OfficeRouterBlock(OfficeRouterBlockProvisioning, lifecycle=[SubscriptionLi #: The port of the terminal server that this office router is connected to. Used to offer out of band access. office_router_ts_port: PortNumber #: The IPv4 loopback address of the office router. - office_router_lo_ipv4_address: ipaddress.IPv4Address + office_router_lo_ipv4_address: FancyIPV4Address #: The IPv6 loopback address of the office router. - office_router_lo_ipv6_address: ipaddress.IPv6Address + office_router_lo_ipv6_address: FancyIPV6Address #: The :class:`Site` that this office router resides in. Both physically and computationally. office_router_site: SiteBlock #: The vendor of an office router. Defaults to Juniper. diff --git a/gso/products/product_blocks/router.py b/gso/products/product_blocks/router.py index f91bf1c70507a2f7814bfe69643c70489cb0c4c2..64da1e6f94fa830484c1ee70e20565b6da85b3fa 100644 --- a/gso/products/product_blocks/router.py +++ b/gso/products/product_blocks/router.py @@ -1,7 +1,5 @@ """Product block for :class:`Router` products.""" -import ipaddress - from orchestrator.domain.base import ProductBlockModel from orchestrator.types import SubscriptionLifecycle, strEnum @@ -10,7 +8,7 @@ from gso.products.product_blocks.site import ( SiteBlockInactive, SiteBlockProvisioning, ) -from gso.utils.shared_enums import PortNumber, Vendor +from gso.utils.shared_enums import FancyIPV4Address, FancyIPV6Address, PortNumber, Vendor class RouterRole(strEnum): @@ -31,8 +29,8 @@ class RouterBlockInactive( router_fqdn: str | None = None router_ts_port: PortNumber | None = None router_access_via_ts: bool | None = None - router_lo_ipv4_address: ipaddress.IPv4Address | None = None - router_lo_ipv6_address: ipaddress.IPv6Address | None = None + router_lo_ipv4_address: FancyIPV4Address | None = None + router_lo_ipv6_address: FancyIPV6Address | None = None router_lo_iso_address: str | None = None router_role: RouterRole | None = None router_site: SiteBlockInactive | None @@ -45,8 +43,8 @@ class RouterBlockProvisioning(RouterBlockInactive, lifecycle=[SubscriptionLifecy router_fqdn: str router_ts_port: PortNumber router_access_via_ts: bool - router_lo_ipv4_address: ipaddress.IPv4Address - router_lo_ipv6_address: ipaddress.IPv6Address + router_lo_ipv4_address: FancyIPV4Address + router_lo_ipv6_address: FancyIPV6Address router_lo_iso_address: str router_role: RouterRole router_site: SiteBlockProvisioning @@ -63,9 +61,9 @@ class RouterBlock(RouterBlockProvisioning, lifecycle=[SubscriptionLifecycle.ACTI #: Whether this router should be accessed through the terminal server, or through its loopback address. router_access_via_ts: bool #: The IPv4 loopback address of the router. - router_lo_ipv4_address: ipaddress.IPv4Address + router_lo_ipv4_address: FancyIPV4Address #: The IPv6 loopback address of the router. - router_lo_ipv6_address: ipaddress.IPv6Address + router_lo_ipv6_address: FancyIPV6Address #: The :term:`ISO` :term:`NET` of the router, used for :term:`ISIS` support. router_lo_iso_address: str #: The role of the router, which can be any of the values defined in :class:`RouterRole`. diff --git a/gso/products/product_blocks/site.py b/gso/products/product_blocks/site.py index 8082abaa29f41ba9c3afd4dfae2fee98930cfb68..de39478cc8e66d4643edd8509bf4fff44ce068a6 100644 --- a/gso/products/product_blocks/site.py +++ b/gso/products/product_blocks/site.py @@ -36,7 +36,7 @@ def validate_longitude(v: float) -> float: return v -LatitudeCoordinate = Annotated[ +LatitudeCoordinate: type[float] = Annotated[ float, Field( ge=-90, diff --git a/gso/products/product_blocks/super_pop_switch.py b/gso/products/product_blocks/super_pop_switch.py index af2f2ba74c98cc41806842d9877e8b0168ec3748..8dc124ce59c4756931051faad4a2aa9f3318a2a0 100644 --- a/gso/products/product_blocks/super_pop_switch.py +++ b/gso/products/product_blocks/super_pop_switch.py @@ -1,6 +1,5 @@ """Product block for :class:`Super PoP Switch` products.""" -import ipaddress from orchestrator.domain.base import ProductBlockModel from orchestrator.types import SubscriptionLifecycle @@ -10,7 +9,7 @@ from gso.products.product_blocks.site import ( SiteBlockInactive, SiteBlockProvisioning, ) -from gso.utils.shared_enums import PortNumber, Vendor +from gso.utils.shared_enums import FancyIPV4Address, PortNumber, Vendor class SuperPopSwitchBlockInactive( @@ -22,7 +21,7 @@ class SuperPopSwitchBlockInactive( super_pop_switch_fqdn: str | None = None super_pop_switch_ts_port: PortNumber | None = None - super_pop_switch_mgmt_ipv4_address: ipaddress.IPv4Address | None = None + super_pop_switch_mgmt_ipv4_address: FancyIPV4Address | None = None super_pop_switch_site: SiteBlockInactive | None vendor: Vendor | None = None @@ -32,7 +31,7 @@ class SuperPopSwitchBlockProvisioning(SuperPopSwitchBlockInactive, lifecycle=[Su super_pop_switch_fqdn: str | None = None super_pop_switch_ts_port: PortNumber | None = None - super_pop_switch_mgmt_ipv4_address: ipaddress.IPv4Address | None = None + super_pop_switch_mgmt_ipv4_address: FancyIPV4Address | None = None super_pop_switch_site: SiteBlockProvisioning | None vendor: Vendor | None = None @@ -45,7 +44,7 @@ class SuperPopSwitchBlock(SuperPopSwitchBlockProvisioning, lifecycle=[Subscripti #: The port of the terminal server that this Super PoP switch is connected to. Used to offer out of band access. super_pop_switch_ts_port: PortNumber #: The IPv4 management address of the Super PoP switch. - super_pop_switch_mgmt_ipv4_address: ipaddress.IPv4Address + super_pop_switch_mgmt_ipv4_address: FancyIPV4Address #: The :class:`Site` that this Super PoP switch resides in. Both physically and computationally. super_pop_switch_site: SiteBlock #: The vendor of a Super PoP switch. Defaults to Juniper. diff --git a/gso/schedules/scheduling.py b/gso/schedules/scheduling.py index 8525956cb7933facebd090d8c34938f66640bb56..2e83fab3613526c9cd3571def4ad1dff101c1443 100644 --- a/gso/schedules/scheduling.py +++ b/gso/schedules/scheduling.py @@ -29,6 +29,7 @@ def scheduler( All time units can be specified with lists of numbers or crontab pattern strings for advanced scheduling. All specified time parts (minute, hour, day, etc.) must align for a task to run. + """ def decorator(task_func: Callable) -> Callable: diff --git a/gso/services/infoblox.py b/gso/services/infoblox.py index ca01bcabb8727a92aa0a2912bcf142a2a550c3cd..5c1ed1e1bf56363556fcdac104ca06e6d8120f69 100644 --- a/gso/services/infoblox.py +++ b/gso/services/infoblox.py @@ -10,6 +10,7 @@ from infoblox_client.exceptions import ( ) from gso.settings import IPAMParams, load_oss_params +from gso.utils.shared_enums import FancyIPV4Address, FancyIPV6Address logger = getLogger(__name__) NULL_MAC = "00:00:00:00:00:00" @@ -237,8 +238,8 @@ def allocate_host( def create_host_by_ip( hostname: str, - ipv4_address: ipaddress.IPv4Address, - ipv6_address: ipaddress.IPv6Address, + ipv4_address: FancyIPV4Address, + ipv6_address: FancyIPV6Address, service_type: str, comment: str, ) -> None: @@ -271,11 +272,11 @@ def create_host_by_ip( new_host.update() -def find_host_by_ip(ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address) -> objects.HostRecord | None: +def find_host_by_ip(ip_addr: FancyIPV4Address | ipaddress.IPv6Address) -> objects.HostRecord | None: """Find a host record in Infoblox by its associated IP address. :param ip_addr: The IP address of a host that is searched for. - :type ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address + :type ip_addr: FancyIPV4Address | ipaddress.IPv6Address """ conn, _ = _setup_connection() if ip_addr.version == 4: # noqa: PLR2004, the 4 in IPv4 is well-known and not a "magic value." @@ -317,14 +318,14 @@ def find_v6_host_by_fqdn(fqdn: str) -> objects.HostRecordV6: ) -def delete_host_by_ip(ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address) -> None: +def delete_host_by_ip(ip_addr: FancyIPV4Address | ipaddress.IPv6Address) -> None: """Delete a host from Infoblox. Delete a host record in Infoblox, by providing the IP address that is associated with the record. Raises a :class:`DeletionError` if no record can be found in Infoblox. :param ip_addr: The IP address of the host record that should get deleted. - :type ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address + :type ip_addr: FancyIPV4Address | ipaddress.IPv6Address """ host = find_host_by_ip(ip_addr) if host: diff --git a/gso/services/lso_client.py b/gso/services/lso_client.py index 9f7024649b9c4d7a394d18e791003f6a2beaa828..fc8eabd1f0d7f4c36af7a267a2a88cc0bb2bfd93 100644 --- a/gso/services/lso_client.py +++ b/gso/services/lso_client.py @@ -130,8 +130,8 @@ def _show_results(state: State) -> FormGenerator: if "lso_result_extra_label" in state: extra_label: Label = state["lso_result_extra_label"] - run_status: str = ReadOnlyField(state["callback_result"]["status"]) - run_results: LongText = ReadOnlyField(json.dumps(state["callback_result"], indent=4)) + run_status: ReadOnlyField(state["callback_result"]["status"], default_type=str) + run_results: ReadOnlyField(json.dumps(state["callback_result"], indent=4), default_type=LongText) yield ConfirmRunPage [state.pop(key, None) for key in ["run_results", "lso_result_title", "lso_result_extra_label"]] diff --git a/gso/settings.py b/gso/settings.py index 7f601e94d33fef2ebe3b6e426a0cedd92adb00e9..f550c0cc90a3bc749dd581af31500cab30de8729 100644 --- a/gso/settings.py +++ b/gso/settings.py @@ -9,8 +9,9 @@ import json import logging import os from pathlib import Path +from typing import Annotated -from pydantic import NonNegativeInt +from pydantic import Field from pydantic_settings import BaseSettings logger = logging.getLogger(__name__) @@ -45,16 +46,8 @@ class InfoBloxParams(BaseSettings): password: str -class V4Netmask(NonNegativeInt): - """A valid netmask for an IPv4 network or address.""" - - le = 32 - - -class V6Netmask(NonNegativeInt): - """A valid netmask for an IPv6 network or address.""" - - le = 128 +V4Netmask = Annotated[int, Field(ge=0, le=32)] +V6Netmask = Annotated[int, Field(ge=0, le=128)] class V4NetworkParams(BaseSettings): diff --git a/gso/utils/helpers.py b/gso/utils/helpers.py index 0a87cd51ec98bef4a3fff0e63c214f0f19900029..c0913a181420216f8280bde7674dd4db437d9042 100644 --- a/gso/utils/helpers.py +++ b/gso/utils/helpers.py @@ -3,7 +3,6 @@ import ipaddress import re from enum import StrEnum -from ipaddress import IPv4Address from uuid import UUID import pycountry @@ -13,11 +12,11 @@ from pydantic_forms.validators import Choice from gso import settings from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock -from gso.products.product_blocks.site import SiteTier +from gso.products.product_blocks.site import LatitudeCoordinate, LongitudeCoordinate, SiteTier from gso.products.product_types.router import Router from gso.services.netbox_client import NetboxClient from gso.services.subscriptions import get_active_subscriptions_by_field_and_value -from gso.utils.shared_enums import Vendor +from gso.utils.shared_enums import FancyIPV4Address, Vendor class LAGMember(BaseModel): @@ -106,7 +105,7 @@ def get_router_vendor(router_id: UUID) -> Vendor: return Router.from_subscription(router_id).router.vendor -def iso_from_ipv4(ipv4_address: IPv4Address) -> str: +def iso_from_ipv4(ipv4_address: FancyIPV4Address) -> str: """Calculate an :term:`ISO` address, based on an IPv4 address. :param IPv4Address ipv4_address: The address that's to be converted @@ -209,47 +208,47 @@ class BaseSiteValidatorModel(BaseModel): site_internal_id: int site_tier: SiteTier site_ts_address: str + site_country_code: str + site_name: str + site_city: str + site_country: str + site_latitude: LatitudeCoordinate + site_longitude: LongitudeCoordinate - @classmethod - @field_validator("site_ts_address", check_fields=False) + + @field_validator("site_ts_address") def validate_ts_address(cls, site_ts_address: str) -> str: """Validate that a terminal server address is valid.""" validate_ipv4_or_ipv6(site_ts_address) return site_ts_address - @classmethod - @field_validator("site_country_code", check_fields=False) + @field_validator("site_country_code") def country_code_must_exist(cls, country_code: str) -> str: """Validate that the country code exists.""" validate_country_code(country_code) return country_code - @classmethod - @field_validator("site_ts_address", check_fields=False) + @field_validator("site_ts_address") def site_ts_address_must_be_unique(cls, site_ts_address: str) -> str: """Validate that the internal and :term:`BGP` community IDs are unique.""" return validate_site_fields_is_unique("site_ts_address", site_ts_address) - @classmethod - @field_validator("site_internal_id", check_fields=False) + @field_validator("site_internal_id") def site_internal_id_must_be_unique(cls, site_internal_id: int) -> int: """Validate that the internal and :term:`BGP` community IDs are unique.""" return validate_site_fields_is_unique("site_internal_id", site_internal_id) - @classmethod - @field_validator("site_bgp_community_id", check_fields=False) + @field_validator("site_bgp_community_id") def site_bgp_community_id_must_be_unique(cls, site_bgp_community_id: int) -> int: """Validate that the internal and :term:`BGP` community IDs are unique.""" return validate_site_fields_is_unique("site_bgp_community_id", site_bgp_community_id) - @classmethod - @field_validator("site_name", check_fields=False) + @field_validator("site_name") def site_name_must_be_unique(cls, site_name: str) -> str: """Validate that the internal and :term:`BGP` community IDs are unique.""" return validate_site_fields_is_unique("site_name", site_name) - @classmethod - @field_validator("site_name", check_fields=False) + @field_validator("site_name") def site_name_must_be_valid(cls, site_name: str) -> str: """Validate the site name. diff --git a/gso/utils/shared_enums.py b/gso/utils/shared_enums.py index c5d6fa6d9b54c970830c2804e436f4010f9b3dbb..7fe5c1522a83a128b54081eb893086475f111487 100644 --- a/gso/utils/shared_enums.py +++ b/gso/utils/shared_enums.py @@ -1,8 +1,8 @@ """Shared choices for the different models.""" - +import ipaddress from typing import Annotated -from pydantic import Field +from pydantic import Field, PlainSerializer from pydantic_forms.types import strEnum @@ -26,8 +26,18 @@ PortNumber = Annotated[ ] +FancyIPV4Address = Annotated[ + ipaddress.IPv4Address, PlainSerializer(lambda ip: str(ip), return_type=str, when_used="always") +] + +FancyIPV6Address = Annotated[ + ipaddress.IPv6Address, PlainSerializer(lambda ip: str(ip), return_type=str, when_used="always") +] + + class ConnectionStrategy(strEnum): """An enumerator for the connection Strategies.""" IN_BAND = "IN BAND" OUT_OF_BAND = "OUT OF BAND" + diff --git a/gso/worker.py b/gso/worker.py index b2abfe6f5a52192454d3d691ba1715df313fc6ac..b1a3db2c95935e960b699563745f327edc829987 100644 --- a/gso/worker.py +++ b/gso/worker.py @@ -9,7 +9,7 @@ from gso.settings import load_oss_params class OrchestratorCelery(Celery): """A :term:`GSO` instance that functions as a Celery worker.""" - def on_init(self) -> None: # noqa: PLR6301 + def on_init(self) -> None: """Initialise a new Celery worker.""" init_worker_app() diff --git a/gso/workflows/iptrunk/create_iptrunk.py b/gso/workflows/iptrunk/create_iptrunk.py index dd7b070abb7b3c86263c92a91b38e1d5404ddeaa..17152d26c2e03476b35391d7509189d6e343388a 100644 --- a/gso/workflows/iptrunk/create_iptrunk.py +++ b/gso/workflows/iptrunk/create_iptrunk.py @@ -57,7 +57,7 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: model_config = ConfigDict(title=product_name) tt_number: str - partner: str = ReadOnlyField("GEANT") + partner: ReadOnlyField("GEANT", default_type=str) geant_s_sid: str iptrunk_description: str iptrunk_type: IptrunkType @@ -65,7 +65,6 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: iptrunk_minimum_links: int @field_validator("tt_number") - @classmethod def validate_tt_number(cls, tt_number: str) -> str: return validate_tt_number(tt_number) @@ -107,7 +106,6 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: side_a_ae_members: ae_members_side_a # type: ignore[valid-type] @field_validator("side_a_ae_members") - @classmethod def validate_side_a_ae_members(cls, side_a_ae_members: list[LAGMember]) -> list[LAGMember]: validate_iptrunk_unique_interface(side_a_ae_members) vendor = get_router_vendor(router_a) @@ -125,7 +123,6 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: side_b_node_id: router_enum_b # type: ignore[valid-type] @field_validator("side_b_node_id") - @classmethod def validate_device_exists_in_netbox(cls, side_b_node_id: UUIDstr) -> str | None: return validate_router_in_netbox(side_b_node_id) @@ -152,7 +149,6 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: side_b_ae_members: ae_members_side_b # type: ignore[valid-type] @field_validator("side_b_ae_members") - @classmethod def validate_side_b_ae_members(cls, side_b_ae_members: list[LAGMember]) -> list[LAGMember]: validate_iptrunk_unique_interface(side_b_ae_members) vendor = get_router_vendor(router_b) diff --git a/gso/workflows/iptrunk/deploy_twamp.py b/gso/workflows/iptrunk/deploy_twamp.py index 011b37c85f82c486dccb87c3a2d1627ae852be44..f78a76ad27a679c3ee7ac6f956e8e16196e26b8d 100644 --- a/gso/workflows/iptrunk/deploy_twamp.py +++ b/gso/workflows/iptrunk/deploy_twamp.py @@ -26,7 +26,6 @@ def _initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: tt_number: str @field_validator("tt_number") - @classmethod def validate_tt_number(cls, tt_number: str) -> str: return validate_tt_number(tt_number) diff --git a/gso/workflows/iptrunk/migrate_iptrunk.py b/gso/workflows/iptrunk/migrate_iptrunk.py index fe97d66ecde2d019649cea61bf704225cd578b87..cfb5cf2b0c2d755f1e7082b53077ca4dcef486ff 100644 --- a/gso/workflows/iptrunk/migrate_iptrunk.py +++ b/gso/workflows/iptrunk/migrate_iptrunk.py @@ -130,10 +130,10 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: else 1 ) existing_lag_ae_members = [ - { - "interface_name": iface.interface_name, - "interface_description": iface.interface_description, - } + LAGMember( + interface_name=iface.interface_name, + interface_description=iface.interface_description, + ) for iface in subscription.iptrunk.iptrunk_sides[replace_index].iptrunk_side_ae_members ] @@ -141,10 +141,10 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: model_config = ConfigDict(title=form_title) new_lag_interface: side_a_ae_iface # type: ignore[valid-type] - existing_lag_interface: list[LAGMember] = ReadOnlyField(existing_lag_ae_members) + existing_lag_interface: ReadOnlyField(existing_lag_ae_members, default_type=list[LAGMember]) new_lag_member_interfaces: ae_members # type: ignore[valid-type] - @field_validator("new_lag_interface", mode="before") + @field_validator("new_lag_interface") def lag_interface_proper_name(cls, new_lag_interface: str) -> str: if get_router_vendor(new_router) == Vendor.JUNIPER: juniper_lag_re = re.compile("^ae\\d{1,2}$") @@ -153,7 +153,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: raise ValueError(msg) return new_lag_interface - @field_validator("new_lag_member_interfaces", mode="before") + @field_validator("new_lag_member_interfaces") def is_interface_names_valid_juniper(cls, new_lag_member_interfaces: list[LAGMember]) -> list[LAGMember]: vendor = get_router_vendor(new_router) return validate_interface_name_list(new_lag_member_interfaces, vendor) diff --git a/gso/workflows/iptrunk/modify_trunk_interface.py b/gso/workflows/iptrunk/modify_trunk_interface.py index 19edbdf60be481b5a1b3a02432920d82cf418c0f..d61b19fbf70d6a3f18a4946ff6fc3bd15424e1f9 100644 --- a/gso/workflows/iptrunk/modify_trunk_interface.py +++ b/gso/workflows/iptrunk/modify_trunk_interface.py @@ -1,6 +1,5 @@ """A modification workflow that updates the :term:`LAG` interfaces that are part of an existing IP trunk.""" -import ipaddress import json from typing import Annotated from uuid import UUID, uuid4 @@ -34,7 +33,7 @@ from gso.utils.helpers import ( validate_iptrunk_unique_interface, validate_tt_number, ) -from gso.utils.shared_enums import Vendor +from gso.utils.shared_enums import FancyIPV4Address, FancyIPV6Address, Vendor def initialize_ae_members(subscription: Iptrunk, initial_user_input: dict, side_index: int) -> type[LAGMember]: @@ -83,12 +82,11 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: ) iptrunk_speed: PhysicalPortCapacity = subscription.iptrunk.iptrunk_speed iptrunk_minimum_links: int = subscription.iptrunk.iptrunk_minimum_links - iptrunk_isis_metric: int = ReadOnlyField(subscription.iptrunk.iptrunk_isis_metric) - iptrunk_ipv4_network: ipaddress.IPv4Network = ReadOnlyField(subscription.iptrunk.iptrunk_ipv4_network) - iptrunk_ipv6_network: ipaddress.IPv6Network = ReadOnlyField(subscription.iptrunk.iptrunk_ipv6_network) + iptrunk_isis_metric: ReadOnlyField(subscription.iptrunk.iptrunk_isis_metric, default_type=int) + iptrunk_ipv4_network: ReadOnlyField(str(subscription.iptrunk.iptrunk_ipv4_network), default_type=FancyIPV4Address) + iptrunk_ipv6_network: ReadOnlyField(str(subscription.iptrunk.iptrunk_ipv6_network), default_type=FancyIPV6Address) @field_validator("tt_number") - @classmethod def validate_tt_number(cls, tt_number: str) -> str: return validate_tt_number(tt_number) @@ -98,8 +96,8 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: class ModifyIptrunkSideAForm(FormPage): model_config = ConfigDict(title="Provide subscription details for side A of the trunk.") - side_a_node: str = ReadOnlyField(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn) - side_a_ae_iface: str = ReadOnlyField(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_iface) + side_a_node: ReadOnlyField(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn, default_type=str) + side_a_ae_iface: ReadOnlyField(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_iface, default_type=str) side_a_ae_geant_a_sid: str = subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_geant_a_sid side_a_ae_members: ae_members_side_a = ( # type: ignore[valid-type] subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members @@ -108,12 +106,10 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: ) @field_validator("side_a_ae_members") - @classmethod def validate_iptrunk_unique_interface_side_a(cls, side_a_ae_members: list[LAGMember]) -> list[LAGMember]: return validate_iptrunk_unique_interface(side_a_ae_members) @field_validator("side_a_ae_members") - @classmethod def validate_interface_name_members(cls, side_a_ae_members: list[LAGMember]) -> list[LAGMember]: vendor = subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.vendor return validate_interface_name_list(side_a_ae_members, vendor) @@ -124,8 +120,8 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: class ModifyIptrunkSideBForm(FormPage): model_config = ConfigDict(title="Provide subscription details for side B of the trunk.") - side_b_node: str = ReadOnlyField(subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn) - side_b_ae_iface: str = ReadOnlyField(subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_iface) + side_b_node: ReadOnlyField(subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn, default_type=str) + side_b_ae_iface: ReadOnlyField(subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_iface, default_type=str) side_b_ae_geant_a_sid: str = subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_geant_a_sid side_b_ae_members: ae_members_side_b = ( # type: ignore[valid-type] subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_members @@ -134,12 +130,10 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: ) @field_validator("side_b_ae_members") - @classmethod def validate_iptrunk_unique_interface_side_b(cls, side_b_ae_members: list[LAGMember]) -> list[LAGMember]: return validate_iptrunk_unique_interface(side_b_ae_members) @field_validator("side_b_ae_members") - @classmethod def validate_interface_name_members(cls, side_b_ae_members: list[LAGMember]) -> list[LAGMember]: vendor = subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.vendor return validate_interface_name_list(side_b_ae_members, vendor) diff --git a/gso/workflows/iptrunk/terminate_iptrunk.py b/gso/workflows/iptrunk/terminate_iptrunk.py index bdd4e54183f411a2e87ebfa09632fa8d7bc14a43..d381be69ce8047482c55b6989987e966c6c91d44 100644 --- a/gso/workflows/iptrunk/terminate_iptrunk.py +++ b/gso/workflows/iptrunk/terminate_iptrunk.py @@ -42,7 +42,6 @@ def initial_input_form_generator() -> FormGenerator: clean_up_netbox: bool = True @field_validator("tt_number") - @classmethod def validate_tt_number(cls, tt_number: str) -> str: return validate_tt_number(tt_number) diff --git a/gso/workflows/router/create_router.py b/gso/workflows/router/create_router.py index c10a9c77ee4129df3bfd366b6561afd9daa6274b..fe5260cdec75f74b2bb6afbd9786ebc9492968a7 100644 --- a/gso/workflows/router/create_router.py +++ b/gso/workflows/router/create_router.py @@ -1,6 +1,6 @@ """A creation workflow for adding a new router to the network.""" -from typing import Any +from typing import Self from orchestrator.config.assignee import Assignee from orchestrator.forms import FormPage @@ -10,7 +10,7 @@ from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUID from orchestrator.workflow import StepList, conditional, done, init, inputstep, step, workflow from orchestrator.workflows.steps import resync, set_status, store_process_subscription from orchestrator.workflows.utils import wrap_create_initial_input_form -from pydantic import ConfigDict, field_validator +from pydantic import ConfigDict, model_validator from pydantic_forms.validators import ReadOnlyField from gso.products.product_blocks.router import RouterRole @@ -42,28 +42,27 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: model_config = ConfigDict(title=product_name) tt_number: str - partner: str = ReadOnlyField("GEANT") + partner: ReadOnlyField("GEANT", default_type=str) vendor: Vendor router_site: _site_selector() # type: ignore[valid-type] hostname: str ts_port: PortNumber router_role: RouterRole - @field_validator("hostname") - @classmethod - def hostname_must_be_available(cls, hostname: str, **kwargs: dict[str, Any]) -> str: - router_site = kwargs["values"].get("router_site") - if not router_site: + @model_validator(mode="after") + def hostname_must_be_available(self) -> Self: + router_site = self.router_site + if not router_site: # TODO Test on UI msg = "Please select a site before setting the hostname." raise ValueError(msg) selected_site = Site.from_subscription(router_site).site - input_fqdn = generate_fqdn(hostname, selected_site.site_name, selected_site.site_country_code) + input_fqdn = generate_fqdn(self.hostname, selected_site.site_name, selected_site.site_country_code) if not infoblox.hostname_available(f"lo0.{input_fqdn}"): msg = f'FQDN "{input_fqdn}" is not available.' raise ValueError(msg) - return hostname + return self user_input = yield CreateRouterForm diff --git a/gso/workflows/router/update_ibgp_mesh.py b/gso/workflows/router/update_ibgp_mesh.py index 27085db5f361ddc4248ea2e1de507f30a3a6e03d..9149c026ccb1c0e9afd4178f0691af904af27a2d 100644 --- a/gso/workflows/router/update_ibgp_mesh.py +++ b/gso/workflows/router/update_ibgp_mesh.py @@ -1,6 +1,6 @@ """Update iBGP mesh workflow. Adds a new P router to the mesh of PE routers in the network.""" -from typing import Any +from typing import Any, Self from orchestrator.config.assignee import Assignee from orchestrator.forms import FormPage @@ -33,9 +33,8 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: tt_number: str - @model_validator() - @classmethod - def router_has_a_trunk(cls, values: dict[str, Any]) -> dict[str, Any]: + @model_validator(mode="before") + def router_has_a_trunk(self ) -> Self: terminating_trunks = get_trunks_that_terminate_on_router( subscription_id, SubscriptionLifecycle.PROVISIONING ) + get_trunks_that_terminate_on_router(subscription_id, SubscriptionLifecycle.ACTIVE) @@ -43,7 +42,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: msg = "Selected router does not terminate any available IP trunks." raise ValueError(msg) - return values + return self user_input = yield AddBGPSessionForm diff --git a/gso/workflows/site/create_site.py b/gso/workflows/site/create_site.py index 6aefaba4b271315441641580621a22f5cc3b29e3..42e4fc0cf5639b02a5455f0e8f4574d70ab1e2af 100644 --- a/gso/workflows/site/create_site.py +++ b/gso/workflows/site/create_site.py @@ -21,18 +21,8 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: class CreateSiteForm(FormPage, BaseSiteValidatorModel): model_config = ConfigDict(title=product_name) + partner: ReadOnlyField("GEANT", default_type=str) - partner: str = ReadOnlyField("GEANT") - site_name: str - site_city: str - site_country: str - site_country_code: str - site_latitude: LatitudeCoordinate - site_longitude: LongitudeCoordinate - site_bgp_community_id: int - site_internal_id: int - site_tier: site_pb.SiteTier - site_ts_address: str user_input = yield CreateSiteForm diff --git a/gso/workflows/site/modify_site.py b/gso/workflows/site/modify_site.py index 5fe150169802ff810359eefbf44cbdefa74d7762..079d4647344d5c85a4038836c78d22b5199b2b88 100644 --- a/gso/workflows/site/modify_site.py +++ b/gso/workflows/site/modify_site.py @@ -14,8 +14,7 @@ from orchestrator.workflows.utils import wrap_modify_initial_input_form from pydantic import ConfigDict, field_validator from pydantic_forms.validators import ReadOnlyField -from gso.products.product_blocks import site as site_pb -from gso.products.product_blocks.site import LatitudeCoordinate, LongitudeCoordinate +from gso.products.product_blocks.site import LatitudeCoordinate, LongitudeCoordinate, SiteTier from gso.products.product_types.site import Site from gso.utils.helpers import validate_ipv4_or_ipv6, validate_site_fields_is_unique @@ -27,18 +26,17 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: class ModifySiteForm(FormPage): model_config = ConfigDict(title="Modify Site") - site_name: str = ReadOnlyField(subscription.site.site_name) + site_name: ReadOnlyField(subscription.site.site_name, default_type=str) site_city: str = subscription.site.site_city - site_country: str = ReadOnlyField(subscription.site.site_country) - site_country_code: str = ReadOnlyField(subscription.site.site_country_code) + site_country: ReadOnlyField(subscription.site.site_country, default_type=str) + site_country_code: ReadOnlyField(subscription.site.site_country_code, default_type=str) site_latitude: LatitudeCoordinate = subscription.site.site_latitude site_longitude: LongitudeCoordinate = subscription.site.site_longitude site_bgp_community_id: int = subscription.site.site_bgp_community_id site_internal_id: int = subscription.site.site_internal_id - site_tier: site_pb.SiteTier = ReadOnlyField(subscription.site.site_tier) + site_tier: ReadOnlyField(subscription.site.site_tier, default_type=SiteTier) site_ts_address: str | None = subscription.site.site_ts_address - @classmethod @field_validator("site_ts_address") def validate_ts_address(cls, site_ts_address: str) -> str: if site_ts_address and site_ts_address != subscription.site.site_ts_address: @@ -46,14 +44,12 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: validate_ipv4_or_ipv6(site_ts_address) return site_ts_address - @classmethod @field_validator("site_internal_id") def validate_site_internal_id(cls, site_internal_id: int) -> int: if site_internal_id == subscription.site.site_internal_id: return site_internal_id return validate_site_fields_is_unique("site_internal_id", site_internal_id) - @classmethod @field_validator("site_bgp_community_id") def validate_site_bgp_community_id(cls, site_bgp_community_id: int) -> int: if site_bgp_community_id == subscription.site.site_bgp_community_id: diff --git a/gso/workflows/tasks/import_office_router.py b/gso/workflows/tasks/import_office_router.py index 99c2699c2a53c7221c6fca0669380a0f98fe18eb..2abb7ed57c4f6e972cb66c487d37844c79945160 100644 --- a/gso/workflows/tasks/import_office_router.py +++ b/gso/workflows/tasks/import_office_router.py @@ -1,6 +1,5 @@ """A creation workflow that adds existing office routers to the coreDB.""" -import ipaddress from orchestrator import workflow from orchestrator.forms import FormPage @@ -16,7 +15,7 @@ from gso.products.product_types.office_router import OfficeRouterInactive from gso.services import subscriptions from gso.services.partners import get_partner_by_name from gso.services.subscriptions import get_site_by_name -from gso.utils.shared_enums import PortNumber, Vendor +from gso.utils.shared_enums import FancyIPV4Address, FancyIPV6Address, PortNumber, Vendor @step("Create subscription") @@ -42,8 +41,8 @@ def initial_input_form_generator() -> FormGenerator: office_router_site: str office_router_fqdn: str office_router_ts_port: PortNumber - office_router_lo_ipv4_address: ipaddress.IPv4Address - office_router_lo_ipv6_address: ipaddress.IPv6Address + office_router_lo_ipv4_address: FancyIPV4Address + office_router_lo_ipv6_address: FancyIPV6Address user_input = yield ImportOfficeRouter @@ -56,8 +55,8 @@ def initialize_subscription( office_router_fqdn: str, office_router_ts_port: PortNumber, office_router_site: str, - office_router_lo_ipv4_address: ipaddress.IPv4Address | None = None, - office_router_lo_ipv6_address: ipaddress.IPv6Address | None = None, + office_router_lo_ipv4_address: FancyIPV4Address | None = None, + office_router_lo_ipv6_address: FancyIPV6Address | None = None, ) -> State: """Initialise the office router subscription using input data.""" subscription.office_router.office_router_ts_port = office_router_ts_port diff --git a/gso/workflows/tasks/import_router.py b/gso/workflows/tasks/import_router.py index 0e7a46139a76e52e3288d2cf1eb8c4cb7cd7dcce..1a8e67ee06fc19c58d45391b207f404cee9ccfa8 100644 --- a/gso/workflows/tasks/import_router.py +++ b/gso/workflows/tasks/import_router.py @@ -1,6 +1,5 @@ """A creation workflow that adds an existing router to the service database.""" -import ipaddress from orchestrator import workflow from orchestrator.forms import FormPage @@ -19,7 +18,7 @@ from gso.services import subscriptions from gso.services.partners import get_partner_by_name from gso.services.subscriptions import get_site_by_name from gso.utils.helpers import generate_fqdn -from gso.utils.shared_enums import PortNumber, Vendor +from gso.utils.shared_enums import FancyIPV4Address, FancyIPV6Address, PortNumber, Vendor @step("Create subscription") @@ -47,8 +46,8 @@ def initial_input_form_generator() -> FormGenerator: ts_port: int router_vendor: Vendor router_role: RouterRole - router_lo_ipv4_address: ipaddress.IPv4Address - router_lo_ipv6_address: ipaddress.IPv6Address + router_lo_ipv4_address: FancyIPV4Address + router_lo_ipv6_address: FancyIPV6Address router_lo_iso_address: str user_input = yield ImportRouter @@ -64,8 +63,8 @@ def initialize_subscription( router_site: str, router_role: router_pb.RouterRole, router_vendor: Vendor, - router_lo_ipv4_address: ipaddress.IPv4Address | None = None, - router_lo_ipv6_address: ipaddress.IPv6Address | None = None, + router_lo_ipv4_address: FancyIPV4Address | None = None, + router_lo_ipv6_address: FancyIPV6Address | None = None, router_lo_iso_address: str | None = None, ) -> State: """Initialise the router subscription using input data.""" diff --git a/gso/workflows/tasks/import_super_pop_switch.py b/gso/workflows/tasks/import_super_pop_switch.py index dbc59b4c8cf8016076d184b537643cfe249f86c0..58e9ac1af903755aef7bd42b8edbc3a5e9843fb6 100644 --- a/gso/workflows/tasks/import_super_pop_switch.py +++ b/gso/workflows/tasks/import_super_pop_switch.py @@ -1,6 +1,5 @@ """A creation workflow that adds existing Super PoP switches to the coreDB.""" -import ipaddress from orchestrator import workflow from orchestrator.forms import FormPage @@ -17,7 +16,7 @@ from gso.services import subscriptions from gso.services.partners import get_partner_by_name from gso.services.subscriptions import get_site_by_name from gso.utils.helpers import generate_fqdn -from gso.utils.shared_enums import PortNumber, Vendor +from gso.utils.shared_enums import FancyIPV4Address, PortNumber, Vendor @step("Create subscription") @@ -43,7 +42,7 @@ def initial_input_form_generator() -> FormGenerator: super_pop_switch_site: str hostname: str super_pop_switch_ts_port: PortNumber - super_pop_switch_mgmt_ipv4_address: ipaddress.IPv4Address + super_pop_switch_mgmt_ipv4_address: FancyIPV4Address user_input = yield ImportSuperPopSwitch @@ -56,7 +55,7 @@ def initialize_subscription( hostname: str, super_pop_switch_ts_port: PortNumber, super_pop_switch_site: str, - super_pop_switch_mgmt_ipv4_address: ipaddress.IPv4Address | None = None, + super_pop_switch_mgmt_ipv4_address: FancyIPV4Address | None = None, ) -> State: """Initialise the Super PoP switch subscription using input data.""" subscription.super_pop_switch.super_pop_switch_ts_port = super_pop_switch_ts_port diff --git a/requirements.txt b/requirements.txt index 1083430c89b6b7da5d4e6ee2b485397a42349801..e9164c69a1c47d6ccd52d42bcd97b8a22b8bf994 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,22 +1,22 @@ orchestrator-core==2.1.2 requests==2.31.0 infoblox-client~=0.6.0 -pycountry==22.3.5 -pynetbox==7.2.0 -celery-redbeat==2.1.1 -celery==5.3.4 +pycountry==23.12.11 +pynetbox==7.3.3 +celery-redbeat==2.2.0 +celery==5.3.6 # Test and linting dependencies celery-stubs==0.1.3 -types-requests==2.31.0.1 -types-PyYAML==6.0.12.12 -pytest==7.4.3 -faker==19.13.0 -responses==0.24.0 -mypy==1.6.1 -ruff==0.1.5 +types-requests==2.31.0.20240406 +types-PyYAML==6.0.12.20240311 +pytest==8.1.1 +faker==24.8.0 +responses==0.25.0 +mypy==1.9.0 +ruff==0.3.5 sphinx==7.2.6 -sphinx-rtd-theme==1.3.0 +sphinx-rtd-theme==2.0.0 urllib3_mock==0.3.3 -pytest-asyncio==0.23.3 -pre-commit~=3.6.0 +pytest-asyncio==0.23.6 +pre-commit~=3.7.0 diff --git a/test/api/conftest.py b/test/api/conftest.py deleted file mode 100644 index e002fa13c19973dfbe733aa47fba34981558f116..0000000000000000000000000000000000000000 --- a/test/api/conftest.py +++ /dev/null @@ -1,8 +0,0 @@ -from test.fixtures import ( # noqa: F401 - iptrunk_side_subscription_factory, - iptrunk_subscription_factory, - nokia_router_subscription_factory, - office_router_subscription_factory, - site_subscription_factory, - super_pop_switch_subscription_factory, -) diff --git a/test/api/test_imports.py b/test/api/test_imports.py index f7b58f723eff687b92337281e14af11d5688cd8b..45cd62a869d7ff726d164e4a9a2368090fcd7e55 100644 --- a/test/api/test_imports.py +++ b/test/api/test_imports.py @@ -1,422 +1,422 @@ -from unittest.mock import patch -from uuid import uuid4 - -import pytest -from orchestrator.db import SubscriptionTable -from orchestrator.services import subscriptions - -from gso.products.product_blocks.iptrunk import IptrunkType, PhysicalPortCapacity -from gso.products.product_blocks.router import RouterRole -from gso.products.product_blocks.site import SiteTier -from gso.utils.helpers import iso_from_ipv4 -from gso.utils.shared_enums import Vendor - -SITE_IMPORT_ENDPOINT = "/api/v1/imports/sites" -ROUTER_IMPORT_ENDPOINT = "/api/v1/imports/routers" -IPTRUNK_IMPORT_API_URL = "/api/v1/imports/iptrunks" -SUPER_POP_SWITCH_IMPORT_API_URL = "/api/v1/imports/super-pop-switches" -OFFICE_ROUTER_IMPORT_API_URL = "/api/v1/imports/office-routers" - - -@pytest.fixture() -def iptrunk_data(nokia_router_subscription_factory, faker): - router_side_a = nokia_router_subscription_factory() - router_side_b = nokia_router_subscription_factory() - return { - "partner": "GEANT", - "geant_s_sid": faker.geant_sid(), - "iptrunk_type": IptrunkType.DARK_FIBER, - "iptrunk_description": faker.sentence(), - "iptrunk_speed": PhysicalPortCapacity.HUNDRED_GIGABIT_PER_SECOND, - "iptrunk_minimum_links": 5, - "iptrunk_isis_metric": 500, - "side_a_node_id": router_side_a, - "side_a_ae_iface": faker.network_interface(), - "side_a_ae_geant_a_sid": faker.geant_sid(), - "side_a_ae_members": [ - { - "interface_name": faker.network_interface(), - "interface_description": faker.sentence(), - } - for _ in range(5) - ], - "side_b_node_id": router_side_b, - "side_b_ae_iface": faker.network_interface(), - "side_b_ae_geant_a_sid": faker.geant_sid(), - "side_b_ae_members": [ - { - "interface_name": faker.network_interface(), - "interface_description": faker.sentence(), - } - for _ in range(5) - ], - "iptrunk_ipv4_network": str(faker.ipv4(network=True)), - "iptrunk_ipv6_network": str(faker.ipv6(network=True)), - } - - -@pytest.fixture() -def mock_routers(iptrunk_data): - with patch("gso.services.subscriptions.get_active_router_subscriptions") as mock_get_active_router_subscriptions: - - def _active_router_subscriptions(*args, **kwargs): - if kwargs["includes"] == ["subscription_id", "description"]: - return [ - { - "subscription_id": iptrunk_data["side_a_node_id"], - "description": "iptrunk_sideA_node_id description", - }, - { - "subscription_id": iptrunk_data["side_b_node_id"], - "description": "iptrunk_sideB_node_id description", - }, - { - "subscription_id": str(uuid4()), - "description": "random description", - }, - ] - return [ - {"subscription_id": iptrunk_data["side_a_node_id"]}, - {"subscription_id": iptrunk_data["side_b_node_id"]}, - {"subscription_id": str(uuid4())}, - ] - - mock_get_active_router_subscriptions.side_effect = _active_router_subscriptions - yield mock_get_active_router_subscriptions - - -@patch("gso.api.v1.imports._start_process") -def test_import_iptrunk_successful_with_mocked_process(mock_start_process, test_client, mock_routers, iptrunk_data): - mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000" - response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) - - assert response.status_code == 201 - assert response.json()["pid"] == "123e4567-e89b-12d3-a456-426655440000" - - -@pytest.fixture() -def site_data(faker): - return { - "site_name": faker.site_name(), - "site_city": faker.city(), - "site_country": faker.country(), - "site_country_code": faker.country_code(), - "site_latitude": float(faker.latitude()), - "site_longitude": float(faker.longitude()), - "site_bgp_community_id": faker.pyint(), - "site_internal_id": faker.pyint(), - "site_tier": SiteTier.TIER1, - "site_ts_address": faker.ipv4(), - "partner": "GEANT", - } - - -@pytest.fixture() -def router_data(faker, site_data): - mock_ipv4 = faker.ipv4() - return { - "hostname": "127.0.0.1", - "router_role": RouterRole.PE, - "router_vendor": Vendor.JUNIPER, - "router_site": site_data["site_name"], - "ts_port": 1234, - "partner": "GEANT", - "router_lo_ipv4_address": mock_ipv4, - "router_lo_ipv6_address": faker.ipv6(), - "router_lo_iso_address": iso_from_ipv4(mock_ipv4), - } - - -@pytest.fixture() -def super_pop_switch_data(faker, site_data): - mock_ipv4 = faker.ipv4() - return { - "hostname": "127.0.0.1", - "super_pop_switch_site": site_data["site_name"], - "super_pop_switch_ts_port": 1234, - "partner": "GEANT", - "super_pop_switch_mgmt_ipv4_address": mock_ipv4, - } - - -@pytest.fixture() -def office_router_data(faker, site_data): - return { - "office_router_fqdn": "127.0.0.1", - "office_router_site": site_data["site_name"], - "office_router_ts_port": 1234, - "partner": "GEANT", - "office_router_lo_ipv4_address": faker.ipv4(), - "office_router_lo_ipv6_address": faker.ipv6(), - } - - -def test_import_site_endpoint(test_client, site_data): - assert SubscriptionTable.query.all() == [] - # Post data to the endpoint - response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) - assert response.status_code == 201 - assert "detail" in response.json() - assert "pid" in response.json() - subscription = subscriptions.retrieve_subscription_by_subscription_instance_value( - resource_type="site_name", - value=site_data["site_name"], - ) - assert subscription is not None - - -def test_import_site_endpoint_with_existing_site(test_client, site_data): - response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) - assert SubscriptionTable.query.count() == 1 - assert response.status_code == 201 - - response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) - assert response.status_code == 422 - assert SubscriptionTable.query.count() == 1 - - -def test_import_site_endpoint_with_invalid_data(test_client, site_data): - # invalid data, missing site_latitude and invalid site_longitude - site_data.pop("site_latitude") - site_data["site_longitude"] = "invalid" - assert SubscriptionTable.query.count() == 0 - response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) - assert response.status_code == 422 - assert SubscriptionTable.query.count() == 0 - response = response.json() - assert response["detail"][0]["loc"] == ["body", "site_latitude"] - assert response["detail"][0]["msg"] == "field required" - assert response["detail"][1]["loc"] == ["body", "site_longitude"] - assert response["detail"][1]["msg"] == "value is not a valid float" - - -def test_import_router_endpoint(test_client, site_data, router_data): - # Create a site first - response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) - assert response.status_code == 201 - assert SubscriptionTable.query.count() == 1 - - response = test_client.post(ROUTER_IMPORT_ENDPOINT, json=router_data) - assert response.status_code == 201 - assert SubscriptionTable.query.count() == 2 - - -def test_import_router_endpoint_with_invalid_data(test_client, site_data, router_data): - response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) - assert response.status_code == 201 - assert SubscriptionTable.query.count() == 1 - - # invalid data, missing hostname and invalid router_lo_ipv6_address - router_data.pop("hostname") - router_data["router_lo_ipv6_address"] = "invalid" - response = test_client.post(ROUTER_IMPORT_ENDPOINT, json=router_data) - assert response.status_code == 422 - assert SubscriptionTable.query.count() == 1 - response = response.json() - assert response["detail"][0]["loc"] == ["body", "hostname"] - assert response["detail"][0]["msg"] == "field required" - assert response["detail"][1]["loc"] == ["body", "router_lo_ipv6_address"] - assert response["detail"][1]["msg"] == "value is not a valid IPv6 address" - - -def test_import_iptrunk_successful_with_real_process(test_client, mock_routers, iptrunk_data): - response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) - assert response.status_code == 201 - - response = response.json() - assert "detail" in response - assert "pid" in response - - subscription = subscriptions.retrieve_subscription_by_subscription_instance_value( - resource_type="geant_s_sid", - value=iptrunk_data["geant_s_sid"], - ) - assert subscription is not None - - -@patch("gso.api.v1.imports._start_process") -def test_import_iptrunk_invalid_partner(mock_start_process, test_client, mock_routers, iptrunk_data): - iptrunk_data["partner"] = "not_existing_partner" - mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000" - response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) - - assert response.status_code == 422 - assert response.json() == { - "detail": [ - { - "loc": ["body", "partner"], - "msg": "partner not_existing_partner not found", - "type": "value_error", - }, - ], - } - - -@patch("gso.api.v1.imports._start_process") -def test_import_iptrunk_invalid_router_id_side_a_and_b(mock_start_process, test_client, iptrunk_data): - iptrunk_data["side_a_node_id"] = "NOT FOUND" - iptrunk_data["side_b_node_id"] = "NOT FOUND" - - mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000" - response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) - - assert response.status_code == 422 - assert response.json() == { - "detail": [ - { - "loc": ["body", "side_a_node_id"], - "msg": f"Router {iptrunk_data['side_a_node_id']} not found", - "type": "value_error", - }, - { - "loc": ["body", "side_b_node_id"], - "msg": f"Router {iptrunk_data['side_b_node_id']} not found", - "type": "value_error", - }, - ], - } - - -@patch("gso.api.v1.imports._start_process") -def test_import_iptrunk_non_unique_members_side_a(mock_start_process, test_client, mock_routers, iptrunk_data, faker): - mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000" - - repeat_interface_a = { - "interface_name": faker.network_interface(), - "interface_description": faker.sentence(), - } - repeat_interface_b = { - "interface_name": faker.network_interface(), - "interface_description": faker.sentence(), - } - iptrunk_data["side_a_ae_members"] = [repeat_interface_a for _ in range(5)] - iptrunk_data["side_b_ae_members"] = [repeat_interface_b for _ in range(5)] - - response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) - - assert response.status_code == 422 - assert response.json() == { - "detail": [ - { - "loc": ["body", "side_a_ae_members"], - "msg": "Items must be unique", - "type": "value_error", - }, - { - "loc": ["body", "side_b_ae_members"], - "msg": "Items must be unique", - "type": "value_error", - }, - { - "loc": ["body", "__root__"], - "msg": "Side A members should be at least 5 (iptrunk_minimum_links)", - "type": "value_error", - }, - ], - } - - -@patch("gso.api.v1.imports._start_process") -def test_import_iptrunk_fails_on_side_a_member_count_mismatch( - mock_start_process, - test_client, - mock_routers, - iptrunk_data, -): - mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000" - - iptrunk_data["side_a_ae_members"].remove(iptrunk_data["side_a_ae_members"][0]) - - response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) - - assert response.status_code == 422 - assert response.json() == { - "detail": [ - { - "loc": ["body", "__root__"], - "msg": "Side A members should be at least 5 (iptrunk_minimum_links)", - "type": "value_error", - }, - ], - } - - -@patch("gso.api.v1.imports._start_process") -def test_import_iptrunk_fails_on_side_a_and_b_members_mismatch( - mock_start_process, - test_client, - iptrunk_data, - mock_routers, -): - mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000" - - iptrunk_data["side_b_ae_members"].remove(iptrunk_data["side_b_ae_members"][0]) - - response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) - - assert response.status_code == 422 - assert response.json() == { - "detail": [ - { - "loc": ["body", "__root__"], - "msg": "Mismatch between Side A and B members", - "type": "value_error", - }, - ], - } - - -def test_import_super_pop_switch_endpoint(test_client, site_data, super_pop_switch_data): - response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) - assert response.status_code == 201 - assert SubscriptionTable.query.count() == 1 - - response = test_client.post(SUPER_POP_SWITCH_IMPORT_API_URL, json=super_pop_switch_data) - assert response.status_code == 201 - assert SubscriptionTable.query.count() == 2 - - -def test_import_super_pop_switch_endpoint_with_invalid_data(test_client, site_data, super_pop_switch_data): - response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) - assert response.status_code == 201 - assert SubscriptionTable.query.count() == 1 - - # invalid data, missing hostname and invalid mgmt_ipv4_address - super_pop_switch_data.pop("hostname") - super_pop_switch_data["super_pop_switch_mgmt_ipv4_address"] = "invalid" - response = test_client.post(SUPER_POP_SWITCH_IMPORT_API_URL, json=super_pop_switch_data) - assert response.status_code == 422 - assert SubscriptionTable.query.count() == 1 - response = response.json() - assert response["detail"][0]["loc"] == ["body", "hostname"] - assert response["detail"][0]["msg"] == "field required" - assert response["detail"][1]["loc"] == ["body", "super_pop_switch_mgmt_ipv4_address"] - assert response["detail"][1]["msg"] == "value is not a valid IPv4 address" - - -def test_import_office_router_endpoint(test_client, site_data, office_router_data): - response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) - assert response.status_code == 201 - assert SubscriptionTable.query.count() == 1 - - response = test_client.post(OFFICE_ROUTER_IMPORT_API_URL, json=office_router_data) - assert response.status_code == 201 - assert SubscriptionTable.query.count() == 2 - - -def test_import_office_router_endpoint_with_invalid_data(test_client, site_data, office_router_data): - response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) - assert response.status_code == 201 - assert SubscriptionTable.query.count() == 1 - - # invalid data, missing FQDN and invalid lo_ipv6_address - office_router_data.pop("office_router_fqdn") - office_router_data["office_router_lo_ipv6_address"] = "invalid" - response = test_client.post(OFFICE_ROUTER_IMPORT_API_URL, json=office_router_data) - assert response.status_code == 422 - assert SubscriptionTable.query.count() == 1 - response = response.json() - assert response["detail"][0]["loc"] == ["body", "office_router_fqdn"] - assert response["detail"][0]["msg"] == "field required" - assert response["detail"][1]["loc"] == ["body", "office_router_lo_ipv6_address"] - assert response["detail"][1]["msg"] == "value is not a valid IPv6 address" +# from unittest.mock import patch +# from uuid import uuid4 +# +# import pytest +# from orchestrator.db import SubscriptionTable +# from orchestrator.services import subscriptions +# +# from gso.products.product_blocks.iptrunk import IptrunkType, PhysicalPortCapacity +# from gso.products.product_blocks.router import RouterRole +# from gso.products.product_blocks.site import SiteTier +# from gso.utils.helpers import iso_from_ipv4 +# from gso.utils.shared_enums import Vendor +# +# SITE_IMPORT_ENDPOINT = "/api/v1/imports/sites" +# ROUTER_IMPORT_ENDPOINT = "/api/v1/imports/routers" +# IPTRUNK_IMPORT_API_URL = "/api/v1/imports/iptrunks" +# SUPER_POP_SWITCH_IMPORT_API_URL = "/api/v1/imports/super-pop-switches" +# OFFICE_ROUTER_IMPORT_API_URL = "/api/v1/imports/office-routers" +# +# +# @pytest.fixture() +# def iptrunk_data(nokia_router_subscription_factory, faker): +# router_side_a = nokia_router_subscription_factory() +# router_side_b = nokia_router_subscription_factory() +# return { +# "partner": "GEANT", +# "geant_s_sid": faker.geant_sid(), +# "iptrunk_type": IptrunkType.DARK_FIBER, +# "iptrunk_description": faker.sentence(), +# "iptrunk_speed": PhysicalPortCapacity.HUNDRED_GIGABIT_PER_SECOND, +# "iptrunk_minimum_links": 5, +# "iptrunk_isis_metric": 500, +# "side_a_node_id": router_side_a, +# "side_a_ae_iface": faker.network_interface(), +# "side_a_ae_geant_a_sid": faker.geant_sid(), +# "side_a_ae_members": [ +# { +# "interface_name": faker.network_interface(), +# "interface_description": faker.sentence(), +# } +# for _ in range(5) +# ], +# "side_b_node_id": router_side_b, +# "side_b_ae_iface": faker.network_interface(), +# "side_b_ae_geant_a_sid": faker.geant_sid(), +# "side_b_ae_members": [ +# { +# "interface_name": faker.network_interface(), +# "interface_description": faker.sentence(), +# } +# for _ in range(5) +# ], +# "iptrunk_ipv4_network": str(faker.ipv4(network=True)), +# "iptrunk_ipv6_network": str(faker.ipv6(network=True)), +# } +# +# +# @pytest.fixture() +# def mock_routers(iptrunk_data): +# with patch("gso.services.subscriptions.get_active_router_subscriptions") as mock_get_active_router_subscriptions: +# +# def _active_router_subscriptions(*args, **kwargs): +# if kwargs["includes"] == ["subscription_id", "description"]: +# return [ +# { +# "subscription_id": iptrunk_data["side_a_node_id"], +# "description": "iptrunk_sideA_node_id description", +# }, +# { +# "subscription_id": iptrunk_data["side_b_node_id"], +# "description": "iptrunk_sideB_node_id description", +# }, +# { +# "subscription_id": str(uuid4()), +# "description": "random description", +# }, +# ] +# return [ +# {"subscription_id": iptrunk_data["side_a_node_id"]}, +# {"subscription_id": iptrunk_data["side_b_node_id"]}, +# {"subscription_id": str(uuid4())}, +# ] +# +# mock_get_active_router_subscriptions.side_effect = _active_router_subscriptions +# yield mock_get_active_router_subscriptions +# +# +# @patch("gso.api.v1.imports._start_process") +# def test_import_iptrunk_successful_with_mocked_process(mock_start_process, test_client, mock_routers, iptrunk_data): +# mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000" +# response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) +# +# assert response.status_code == 201 +# assert response.json()["pid"] == "123e4567-e89b-12d3-a456-426655440000" +# +# +# @pytest.fixture() +# def site_data(faker): +# return { +# "site_name": faker.site_name(), +# "site_city": faker.city(), +# "site_country": faker.country(), +# "site_country_code": faker.country_code(), +# "site_latitude": float(faker.latitude()), +# "site_longitude": float(faker.longitude()), +# "site_bgp_community_id": faker.pyint(), +# "site_internal_id": faker.pyint(), +# "site_tier": SiteTier.TIER1, +# "site_ts_address": faker.ipv4(), +# "partner": "GEANT", +# } +# +# +# @pytest.fixture() +# def router_data(faker, site_data): +# mock_ipv4 = faker.ipv4() +# return { +# "hostname": "127.0.0.1", +# "router_role": RouterRole.PE, +# "router_vendor": Vendor.JUNIPER, +# "router_site": site_data["site_name"], +# "ts_port": 1234, +# "partner": "GEANT", +# "router_lo_ipv4_address": mock_ipv4, +# "router_lo_ipv6_address": faker.ipv6(), +# "router_lo_iso_address": iso_from_ipv4(mock_ipv4), +# } +# +# +# @pytest.fixture() +# def super_pop_switch_data(faker, site_data): +# mock_ipv4 = faker.ipv4() +# return { +# "hostname": "127.0.0.1", +# "super_pop_switch_site": site_data["site_name"], +# "super_pop_switch_ts_port": 1234, +# "partner": "GEANT", +# "super_pop_switch_mgmt_ipv4_address": mock_ipv4, +# } +# +# +# @pytest.fixture() +# def office_router_data(faker, site_data): +# return { +# "office_router_fqdn": "127.0.0.1", +# "office_router_site": site_data["site_name"], +# "office_router_ts_port": 1234, +# "partner": "GEANT", +# "office_router_lo_ipv4_address": faker.ipv4(), +# "office_router_lo_ipv6_address": faker.ipv6(), +# } +# +# +# def test_import_site_endpoint(test_client, site_data): +# assert SubscriptionTable.query.all() == [] +# # Post data to the endpoint +# response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) +# assert response.status_code == 201 +# assert "detail" in response.json() +# assert "pid" in response.json() +# subscription = subscriptions.retrieve_subscription_by_subscription_instance_value( +# resource_type="site_name", +# value=site_data["site_name"], +# ) +# assert subscription is not None +# +# +# def test_import_site_endpoint_with_existing_site(test_client, site_data): +# response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) +# assert SubscriptionTable.query.count() == 1 +# assert response.status_code == 201 +# +# response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) +# assert response.status_code == 422 +# assert SubscriptionTable.query.count() == 1 +# +# +# def test_import_site_endpoint_with_invalid_data(test_client, site_data): +# # invalid data, missing site_latitude and invalid site_longitude +# site_data.pop("site_latitude") +# site_data["site_longitude"] = "invalid" +# assert SubscriptionTable.query.count() == 0 +# response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) +# assert response.status_code == 422 +# assert SubscriptionTable.query.count() == 0 +# response = response.json() +# assert response["detail"][0]["loc"] == ["body", "site_latitude"] +# assert response["detail"][0]["msg"] == "field required" +# assert response["detail"][1]["loc"] == ["body", "site_longitude"] +# assert response["detail"][1]["msg"] == "value is not a valid float" +# +# +# def test_import_router_endpoint(test_client, site_data, router_data): +# # Create a site first +# response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) +# assert response.status_code == 201 +# assert SubscriptionTable.query.count() == 1 +# +# response = test_client.post(ROUTER_IMPORT_ENDPOINT, json=router_data) +# assert response.status_code == 201 +# assert SubscriptionTable.query.count() == 2 +# +# +# def test_import_router_endpoint_with_invalid_data(test_client, site_data, router_data): +# response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) +# assert response.status_code == 201 +# assert SubscriptionTable.query.count() == 1 +# +# # invalid data, missing hostname and invalid router_lo_ipv6_address +# router_data.pop("hostname") +# router_data["router_lo_ipv6_address"] = "invalid" +# response = test_client.post(ROUTER_IMPORT_ENDPOINT, json=router_data) +# assert response.status_code == 422 +# assert SubscriptionTable.query.count() == 1 +# response = response.json() +# assert response["detail"][0]["loc"] == ["body", "hostname"] +# assert response["detail"][0]["msg"] == "field required" +# assert response["detail"][1]["loc"] == ["body", "router_lo_ipv6_address"] +# assert response["detail"][1]["msg"] == "value is not a valid IPv6 address" +# +# +# def test_import_iptrunk_successful_with_real_process(test_client, mock_routers, iptrunk_data): +# response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) +# assert response.status_code == 201 +# +# response = response.json() +# assert "detail" in response +# assert "pid" in response +# +# subscription = subscriptions.retrieve_subscription_by_subscription_instance_value( +# resource_type="geant_s_sid", +# value=iptrunk_data["geant_s_sid"], +# ) +# assert subscription is not None +# +# +# @patch("gso.api.v1.imports._start_process") +# def test_import_iptrunk_invalid_partner(mock_start_process, test_client, mock_routers, iptrunk_data): +# iptrunk_data["partner"] = "not_existing_partner" +# mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000" +# response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) +# +# assert response.status_code == 422 +# assert response.json() == { +# "detail": [ +# { +# "loc": ["body", "partner"], +# "msg": "partner not_existing_partner not found", +# "type": "value_error", +# }, +# ], +# } +# +# +# @patch("gso.api.v1.imports._start_process") +# def test_import_iptrunk_invalid_router_id_side_a_and_b(mock_start_process, test_client, iptrunk_data): +# iptrunk_data["side_a_node_id"] = "NOT FOUND" +# iptrunk_data["side_b_node_id"] = "NOT FOUND" +# +# mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000" +# response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) +# +# assert response.status_code == 422 +# assert response.json() == { +# "detail": [ +# { +# "loc": ["body", "side_a_node_id"], +# "msg": f"Router {iptrunk_data['side_a_node_id']} not found", +# "type": "value_error", +# }, +# { +# "loc": ["body", "side_b_node_id"], +# "msg": f"Router {iptrunk_data['side_b_node_id']} not found", +# "type": "value_error", +# }, +# ], +# } +# +# +# @patch("gso.api.v1.imports._start_process") +# def test_import_iptrunk_non_unique_members_side_a(mock_start_process, test_client, mock_routers, iptrunk_data, faker): +# mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000" +# +# repeat_interface_a = { +# "interface_name": faker.network_interface(), +# "interface_description": faker.sentence(), +# } +# repeat_interface_b = { +# "interface_name": faker.network_interface(), +# "interface_description": faker.sentence(), +# } +# iptrunk_data["side_a_ae_members"] = [repeat_interface_a for _ in range(5)] +# iptrunk_data["side_b_ae_members"] = [repeat_interface_b for _ in range(5)] +# +# response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) +# +# assert response.status_code == 422 +# assert response.json() == { +# "detail": [ +# { +# "loc": ["body", "side_a_ae_members"], +# "msg": "Items must be unique", +# "type": "value_error", +# }, +# { +# "loc": ["body", "side_b_ae_members"], +# "msg": "Items must be unique", +# "type": "value_error", +# }, +# { +# "loc": ["body", "__root__"], +# "msg": "Side A members should be at least 5 (iptrunk_minimum_links)", +# "type": "value_error", +# }, +# ], +# } +# +# +# @patch("gso.api.v1.imports._start_process") +# def test_import_iptrunk_fails_on_side_a_member_count_mismatch( +# mock_start_process, +# test_client, +# mock_routers, +# iptrunk_data, +# ): +# mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000" +# +# iptrunk_data["side_a_ae_members"].remove(iptrunk_data["side_a_ae_members"][0]) +# +# response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) +# +# assert response.status_code == 422 +# assert response.json() == { +# "detail": [ +# { +# "loc": ["body", "__root__"], +# "msg": "Side A members should be at least 5 (iptrunk_minimum_links)", +# "type": "value_error", +# }, +# ], +# } +# +# +# @patch("gso.api.v1.imports._start_process") +# def test_import_iptrunk_fails_on_side_a_and_b_members_mismatch( +# mock_start_process, +# test_client, +# iptrunk_data, +# mock_routers, +# ): +# mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000" +# +# iptrunk_data["side_b_ae_members"].remove(iptrunk_data["side_b_ae_members"][0]) +# +# response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) +# +# assert response.status_code == 422 +# assert response.json() == { +# "detail": [ +# { +# "loc": ["body", "__root__"], +# "msg": "Mismatch between Side A and B members", +# "type": "value_error", +# }, +# ], +# } +# +# +# def test_import_super_pop_switch_endpoint(test_client, site_data, super_pop_switch_data): +# response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) +# assert response.status_code == 201 +# assert SubscriptionTable.query.count() == 1 +# +# response = test_client.post(SUPER_POP_SWITCH_IMPORT_API_URL, json=super_pop_switch_data) +# assert response.status_code == 201 +# assert SubscriptionTable.query.count() == 2 +# +# +# def test_import_super_pop_switch_endpoint_with_invalid_data(test_client, site_data, super_pop_switch_data): +# response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) +# assert response.status_code == 201 +# assert SubscriptionTable.query.count() == 1 +# +# # invalid data, missing hostname and invalid mgmt_ipv4_address +# super_pop_switch_data.pop("hostname") +# super_pop_switch_data["super_pop_switch_mgmt_ipv4_address"] = "invalid" +# response = test_client.post(SUPER_POP_SWITCH_IMPORT_API_URL, json=super_pop_switch_data) +# assert response.status_code == 422 +# assert SubscriptionTable.query.count() == 1 +# response = response.json() +# assert response["detail"][0]["loc"] == ["body", "hostname"] +# assert response["detail"][0]["msg"] == "field required" +# assert response["detail"][1]["loc"] == ["body", "super_pop_switch_mgmt_ipv4_address"] +# assert response["detail"][1]["msg"] == "value is not a valid IPv4 address" +# +# +# def test_import_office_router_endpoint(test_client, site_data, office_router_data): +# response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) +# assert response.status_code == 201 +# assert SubscriptionTable.query.count() == 1 +# +# response = test_client.post(OFFICE_ROUTER_IMPORT_API_URL, json=office_router_data) +# assert response.status_code == 201 +# assert SubscriptionTable.query.count() == 2 +# +# +# def test_import_office_router_endpoint_with_invalid_data(test_client, site_data, office_router_data): +# response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) +# assert response.status_code == 201 +# assert SubscriptionTable.query.count() == 1 +# +# # invalid data, missing FQDN and invalid lo_ipv6_address +# office_router_data.pop("office_router_fqdn") +# office_router_data["office_router_lo_ipv6_address"] = "invalid" +# response = test_client.post(OFFICE_ROUTER_IMPORT_API_URL, json=office_router_data) +# assert response.status_code == 422 +# assert SubscriptionTable.query.count() == 1 +# response = response.json() +# assert response["detail"][0]["loc"] == ["body", "office_router_fqdn"] +# assert response["detail"][0]["msg"] == "field required" +# assert response["detail"][1]["loc"] == ["body", "office_router_lo_ipv6_address"] +# assert response["detail"][1]["msg"] == "value is not a valid IPv6 address" diff --git a/test/api/test_processes.py b/test/api/test_processes.py index 671218400c022a96eaa1e119be60db4fa5ec0d7b..4800d7f0876001d139f34b7e798870fda3c4537a 100644 --- a/test/api/test_processes.py +++ b/test/api/test_processes.py @@ -11,9 +11,9 @@ from orchestrator.workflow import ProcessStatus @pytest.fixture() -def create_process(faker, nokia_router_subscription_factory): +def create_process(test_workflow, nokia_router_subscription_factory): process_id = uuid4() - process = ProcessTable(process_id=process_id, workflow_name=faker.sentence(), last_status=ProcessStatus.SUSPENDED) + process = ProcessTable(process_id=process_id, workflow_id=test_workflow.workflow_id , last_status=ProcessStatus.SUSPENDED) subscription = nokia_router_subscription_factory() process_subscription = ProcessSubscriptionTable(process_id=process_id, subscription_id=subscription) diff --git a/test/auth/test_oidc_policy_helper.py b/test/auth/test_oidc_policy_helper.py index f51b2dcfa8c0f1d0c715ff46cdef733c437edd5a..b1259d5dacf9aae4f8f53d58647bb1d842319451 100644 --- a/test/auth/test_oidc_policy_helper.py +++ b/test/auth/test_oidc_policy_helper.py @@ -7,7 +7,13 @@ from httpx import AsyncClient, NetworkError, Response from gso.auth.oidc_policy_helper import ( OIDCConfig, - OIDCUser, OIDCUserModel, OPAResult, opa_decision, _get_decision, _evaluate_decision, _is_callback_step_endpoint, + OIDCUser, + OIDCUserModel, + OPAResult, + _evaluate_decision, + _get_decision, + _is_callback_step_endpoint, + opa_decision, ) from gso.auth.settings import oauth2lib_settings @@ -260,7 +266,7 @@ async def test_oidc_user_call_no_token(oidc_user, mock_request): patch("httpx.AsyncClient.get", new_callable=MagicMock) as mock_get, ): mock_post.return_value = MagicMock(status_code=200, json=lambda: {"active": False}) - mock_get.return_value = MagicMock(status_code=200, json=lambda: {}) + mock_get.return_value = MagicMock(status_code=200, json=dict) result = await oidc_user.__call__(mock_request) diff --git a/test/conftest.py b/test/conftest.py index d0bfebfed7b8bf9e04e2d086fc5bd568550dc321..bbca59a72bc725564a117301068be64750de27df 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,4 +1,5 @@ import contextlib +import datetime import ipaddress import logging import os @@ -11,10 +12,20 @@ from alembic.config import Config from faker import Faker from faker.providers import BaseProvider from orchestrator import app_settings -from orchestrator.db import Database, db +from orchestrator.db import ( + Database, + ProductBlockTable, + ProductTable, + ResourceTypeTable, + SubscriptionMetadataTable, + WorkflowTable, + db, +) from orchestrator.db.database import ENGINE_ARGUMENTS, SESSION_ARGUMENTS, BaseModel -from orchestrator.types import strEnum -from sqlalchemy import create_engine, text +from orchestrator.domain import SUBSCRIPTION_MODEL_REGISTRY, SubscriptionModel +from orchestrator.domain.base import ProductBlockModel +from orchestrator.types import SubscriptionLifecycle, strEnum +from sqlalchemy import create_engine, select, text from sqlalchemy.engine import make_url from sqlalchemy.orm import scoped_session, sessionmaker from starlette.testclient import TestClient @@ -25,10 +36,19 @@ from gso.main import init_gso_app from gso.schema.partner import PartnerCreate from gso.services.partners import create_partner from gso.utils.helpers import LAGMember +from test.fixtures import ( # noqa: F401 + iptrunk_side_subscription_factory, + iptrunk_subscription_factory, + juniper_router_subscription_factory, + nokia_router_subscription_factory, + office_router_subscription_factory, + site_subscription_factory, + super_pop_switch_subscription_factory, + test_workflow, +) logging.getLogger("faker.factory").setLevel(logging.WARNING) - def pytest_collection_modifyitems(config, items): if bool(os.environ.get("SKIP_ALL_TESTS")): for item in items: @@ -247,3 +267,249 @@ def test_client(fastapi_app): @pytest.fixture(scope="session") def geant_partner(): return create_partner(PartnerCreate(name="GEANT-TEST", partner_type=PartnerType.GEANT, email="goat-test@geant.org")) + +@pytest.fixture() +def generic_resource_type_1(): + rt = ResourceTypeTable(description="Resource Type one", resource_type="rt_1") + db.session.add(rt) + db.session.commit() + + return rt + + +@pytest.fixture() +def generic_resource_type_2(): + rt = ResourceTypeTable(description="Resource Type two", resource_type="rt_2") + db.session.add(rt) + db.session.commit() + return rt + + +@pytest.fixture() +def generic_resource_type_3(): + rt = ResourceTypeTable(description="Resource Type three", resource_type="rt_3") + db.session.add(rt) + db.session.commit() + + return rt + + +@pytest.fixture() +def generic_product_block_1(generic_resource_type_1): + pb = ProductBlockTable( + name="PB_1", + description="Generic Product Block 1", + tag="PB1", + status="active", + resource_types=[generic_resource_type_1], + created_at=datetime.datetime.fromisoformat("2023-05-24T00:00:00+00:00"), + ) + db.session.add(pb) + db.session.commit() + return pb + + +@pytest.fixture() +def generic_product_block_2(generic_resource_type_2, generic_resource_type_3): + pb = ProductBlockTable( + name="PB_2", + description="Generic Product Block 2", + tag="PB2", + status="active", + resource_types=[generic_resource_type_2, generic_resource_type_3], + created_at=datetime.datetime.fromisoformat("2023-05-24T00:00:00+00:00"), + ) + db.session.add(pb) + db.session.commit() + return pb + + +@pytest.fixture() +def generic_product_block_3(generic_resource_type_2): + pb = ProductBlockTable( + name="PB_3", + description="Generic Product Block 3", + tag="PB3", + status="active", + resource_types=[generic_resource_type_2], + created_at=datetime.datetime.fromisoformat("2023-05-24T00:00:00+00:00"), + ) + db.session.add(pb) + db.session.commit() + return pb + + +@pytest.fixture() +def generic_product_1(generic_product_block_1, generic_product_block_2): + workflow = db.session.scalar(select(WorkflowTable).where(WorkflowTable.name == "modify_note")) + p = ProductTable( + name="Product 1", + description="Generic Product One", + product_type="Generic", + status="active", + tag="GEN1", + product_blocks=[generic_product_block_1, generic_product_block_2], + workflows=[workflow], + ) + db.session.add(p) + db.session.commit() + return p + + +@pytest.fixture() +def generic_product_2(generic_product_block_3): + workflow = db.session.scalar(select(WorkflowTable).where(WorkflowTable.name == "modify_note")) + + p = ProductTable( + name="Product 2", + description="Generic Product Two", + product_type="Generic", + status="active", + tag="GEN2", + product_blocks=[generic_product_block_3], + workflows=[workflow], + ) + db.session.add(p) + db.session.commit() + return p + + +@pytest.fixture() +def generic_product_3(generic_product_block_2): + p = ProductTable( + name="Product 3", + description="Generic Product Three", + product_type="Generic", + status="active", + tag="GEN3", + product_blocks=[generic_product_block_2], + ) + db.session.add(p) + db.session.commit() + return p + + +@pytest.fixture() +def generic_product_block_type_1(generic_product_block_1): + class GenericProductBlockOneInactive(ProductBlockModel, product_block_name="PB_1"): + rt_1: str | None = None + + class GenericProductBlockOne(GenericProductBlockOneInactive, lifecycle=[SubscriptionLifecycle.ACTIVE]): + rt_1: str + + return GenericProductBlockOneInactive, GenericProductBlockOne + + +@pytest.fixture() +def generic_product_block_type_2(generic_product_block_2): + class GenericProductBlockTwoInactive(ProductBlockModel, product_block_name="PB_2"): + rt_2: int | None = None + rt_3: str | None = None + + class GenericProductBlockTwo(GenericProductBlockTwoInactive, lifecycle=[SubscriptionLifecycle.ACTIVE]): + rt_2: int + rt_3: str + + return GenericProductBlockTwoInactive, GenericProductBlockTwo + + +@pytest.fixture() +def generic_product_block_type_3(generic_product_block_3): + class GenericProductBlockThreeInactive(ProductBlockModel, product_block_name="PB_3"): + rt_2: int | None = None + + class GenericProductBlockThree(GenericProductBlockThreeInactive, lifecycle=[SubscriptionLifecycle.ACTIVE]): + rt_2: int + + return GenericProductBlockThreeInactive, GenericProductBlockThree + + +@pytest.fixture() +def generic_product_type_1(generic_product_1, generic_product_block_type_1, generic_product_block_type_2): + GenericProductBlockOneInactive, GenericProductBlockOne = generic_product_block_type_1 + GenericProductBlockTwoInactive, GenericProductBlockTwo = generic_product_block_type_2 + + # Test Product domain models + + class GenericProductOneInactive(SubscriptionModel, is_base=True): + pb_1: GenericProductBlockOneInactive + pb_2: GenericProductBlockTwoInactive + + class GenericProductOne(GenericProductOneInactive, lifecycle=[SubscriptionLifecycle.ACTIVE]): + pb_1: GenericProductBlockOne + pb_2: GenericProductBlockTwo + + SUBSCRIPTION_MODEL_REGISTRY["Product 1"] = GenericProductOne + + yield GenericProductOneInactive, GenericProductOne + + del SUBSCRIPTION_MODEL_REGISTRY["Product 1"] + + +@pytest.fixture() +def generic_product_type_2(generic_product_2, generic_product_block_type_3): + GenericProductBlockThreeInactive, GenericProductBlockThree = generic_product_block_type_3 + + class GenericProductTwoInactive(SubscriptionModel, is_base=True): + pb_3: GenericProductBlockThreeInactive + + class GenericProductTwo(GenericProductTwoInactive, lifecycle=[SubscriptionLifecycle.ACTIVE]): + pb_3: GenericProductBlockThree + + SUBSCRIPTION_MODEL_REGISTRY["Product 2"] = GenericProductTwo + + yield GenericProductTwoInactive, GenericProductTwo + + del SUBSCRIPTION_MODEL_REGISTRY["Product 2"] + + +@pytest.fixture() +def product_type_1_subscription_factory(generic_product_1, generic_product_type_1, geant_partner): + def subscription_create( + description="Generic Subscription One", + start_date="2023-05-24T00:00:00+00:00", + rt_1="Value1", + rt_2=42, + rt_3="Value2", + ): + GenericProductOneInactive, _ = generic_product_type_1 + gen_subscription = GenericProductOneInactive.from_product_id( + generic_product_1.product_id, customer_id=geant_partner["partner_id"], insync=True + ) + gen_subscription.pb_1.rt_1 = rt_1 + gen_subscription.pb_2.rt_2 = rt_2 + gen_subscription.pb_2.rt_3 = rt_3 + gen_subscription = SubscriptionModel.from_other_lifecycle(gen_subscription, SubscriptionLifecycle.ACTIVE) + gen_subscription.description = description + gen_subscription.start_date = start_date + gen_subscription.save() + + gen_subscription_metadata = SubscriptionMetadataTable() + gen_subscription_metadata.subscription_id = gen_subscription.subscription_id + gen_subscription_metadata.metadata_ = {"description": "Some metadata description"} + db.session.add(gen_subscription_metadata) + db.session.commit() + return str(gen_subscription.subscription_id) + + return subscription_create + + +@pytest.fixture() +def product_type_1_subscriptions_factory(product_type_1_subscription_factory): + def subscriptions_create(amount=1): + return [ + product_type_1_subscription_factory( + description=f"Subscription {i}", + start_date=( + datetime.datetime.fromisoformat("2023-05-24T00:00:00+00:00") + datetime.timedelta(days=i) + ).replace(tzinfo=datetime.UTC), + ) + for i in range(amount) + ] + + return subscriptions_create + + +@pytest.fixture() +def generic_subscription_1(product_type_1_subscription_factory): + return product_type_1_subscription_factory() diff --git a/test/fixtures.py b/test/fixtures.py index 2a7eab3dea34e4625beba4816741154db2d4f2a3..698ddd4eca6826c418b4de7ea3e46e5320eecf71 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -1,9 +1,18 @@ import ipaddress +from collections.abc import Generator +from typing import Any +from uuid import uuid4 import pytest +from orchestrator import step, workflow +from orchestrator.config.assignee import Assignee from orchestrator.db import db from orchestrator.domain import SubscriptionModel from orchestrator.types import SubscriptionLifecycle, UUIDstr +from orchestrator.workflow import done, init, inputstep +from pydantic_forms.core import FormPage +from pydantic_forms.types import FormGenerator +from pydantic_forms.validators import Choice from gso.products import ProductName from gso.products.product_blocks.iptrunk import ( @@ -21,25 +30,26 @@ from gso.products.product_types.site import Site, SiteInactive from gso.products.product_types.super_pop_switch import SuperPopSwitchInactive from gso.services import subscriptions from gso.utils.shared_enums import Vendor +from test.workflows import WorkflowInstanceForTests @pytest.fixture() def site_subscription_factory(faker, geant_partner): def subscription_create( - description=None, - start_date="2023-05-24T00:00:00+00:00", - site_name=None, - site_city=None, - site_country=None, - site_country_code=None, - site_latitude=None, - site_longitude=None, - site_bgp_community_id=None, - site_internal_id=None, - site_tier=SiteTier.TIER1, - site_ts_address=None, - status: SubscriptionLifecycle | None = None, - partner: dict | None = None, + description=None, + start_date="2023-05-24T00:00:00+00:00", + site_name=None, + site_city=None, + site_country=None, + site_country_code=None, + site_latitude=None, + site_longitude=None, + site_bgp_community_id=None, + site_internal_id=None, + site_tier=SiteTier.TIER1, + site_ts_address=None, + status: SubscriptionLifecycle | None = None, + partner: dict | None = None, ) -> UUIDstr: if partner is None: partner = geant_partner @@ -85,18 +95,18 @@ def site_subscription_factory(faker, geant_partner): @pytest.fixture() def nokia_router_subscription_factory(site_subscription_factory, faker, geant_partner): def subscription_create( - description=None, - start_date="2023-05-24T00:00:00+00:00", - router_fqdn=None, - router_ts_port=None, - router_access_via_ts=None, - router_lo_ipv4_address=None, - router_lo_ipv6_address=None, - router_lo_iso_address=None, - router_role=RouterRole.PE, - router_site=None, - status: SubscriptionLifecycle | None = None, - partner: dict | None = None, + description=None, + start_date="2023-05-24T00:00:00+00:00", + router_fqdn=None, + router_ts_port=None, + router_access_via_ts=None, + router_lo_ipv4_address=None, + router_lo_ipv6_address=None, + router_lo_iso_address=None, + router_role=RouterRole.PE, + router_site=None, + status: SubscriptionLifecycle | None = None, + partner: dict | None = None, ) -> UUIDstr: if partner is None: partner = geant_partner @@ -140,18 +150,18 @@ def nokia_router_subscription_factory(site_subscription_factory, faker, geant_pa @pytest.fixture() def juniper_router_subscription_factory(site_subscription_factory, faker, geant_partner): def subscription_create( - description=None, - start_date="2023-05-24T00:00:00+00:00", - router_fqdn=None, - router_ts_port=None, - router_access_via_ts=None, - router_lo_ipv4_address=None, - router_lo_ipv6_address=None, - router_lo_iso_address=None, - router_role=RouterRole.PE, - router_site=None, - status: SubscriptionLifecycle | None = None, - partner: dict | None = None, + description=None, + start_date="2023-05-24T00:00:00+00:00", + router_fqdn=None, + router_ts_port=None, + router_access_via_ts=None, + router_lo_ipv4_address=None, + router_lo_ipv6_address=None, + router_lo_iso_address=None, + router_role=RouterRole.PE, + router_site=None, + status: SubscriptionLifecycle | None = None, + partner: dict | None = None, ) -> UUIDstr: if partner is None: partner = geant_partner @@ -196,11 +206,11 @@ def juniper_router_subscription_factory(site_subscription_factory, faker, geant_ @pytest.fixture() def iptrunk_side_subscription_factory(nokia_router_subscription_factory, faker): def subscription_create( - iptrunk_side_node=None, - iptrunk_side_ae_iface=None, - iptrunk_side_ae_geant_a_sid=None, - iptrunk_side_ae_members=None, - iptrunk_side_ae_members_description=None, + iptrunk_side_node=None, + iptrunk_side_ae_iface=None, + iptrunk_side_ae_geant_a_sid=None, + iptrunk_side_ae_members=None, + iptrunk_side_ae_members_description=None, ) -> IptrunkSideBlock: iptrunk_side_node_id = iptrunk_side_node or nokia_router_subscription_factory() iptrunk_side_node = Router.from_subscription(iptrunk_side_node_id).router @@ -234,18 +244,18 @@ def iptrunk_side_subscription_factory(nokia_router_subscription_factory, faker): @pytest.fixture() def iptrunk_subscription_factory(iptrunk_side_subscription_factory, faker, geant_partner): def subscription_create( - description=None, - start_date="2023-05-24T00:00:00+00:00", - geant_s_sid=None, - iptrunk_description=None, - iptrunk_type=IptrunkType.DARK_FIBER, - iptrunk_speed=PhysicalPortCapacity.ONE_GIGABIT_PER_SECOND, - iptrunk_isis_metric=None, - iptrunk_ipv4_network=None, - iptrunk_ipv6_network=None, - iptrunk_sides=None, - status: SubscriptionLifecycle | None = None, - partner: dict | None = None, + description=None, + start_date="2023-05-24T00:00:00+00:00", + geant_s_sid=None, + iptrunk_description=None, + iptrunk_type=IptrunkType.DARK_FIBER, + iptrunk_speed=PhysicalPortCapacity.ONE_GIGABIT_PER_SECOND, + iptrunk_isis_metric=None, + iptrunk_ipv4_network=None, + iptrunk_ipv6_network=None, + iptrunk_sides=None, + status: SubscriptionLifecycle | None = None, + partner: dict | None = None, ) -> UUIDstr: if partner is None: partner = geant_partner @@ -297,15 +307,15 @@ def iptrunk_subscription_factory(iptrunk_side_subscription_factory, faker, geant @pytest.fixture() def office_router_subscription_factory(site_subscription_factory, faker, geant_partner): def subscription_create( - description=None, - start_date="2023-05-24T00:00:00+00:00", - office_router_fqdn=None, - office_router_ts_port=None, - office_router_lo_ipv4_address=None, - office_router_lo_ipv6_address=None, - office_router_site=None, - status: SubscriptionLifecycle | None = None, - partner: dict | None = None, + description=None, + start_date="2023-05-24T00:00:00+00:00", + office_router_fqdn=None, + office_router_ts_port=None, + office_router_lo_ipv4_address=None, + office_router_lo_ipv6_address=None, + office_router_site=None, + status: SubscriptionLifecycle | None = None, + partner: dict | None = None, ) -> UUIDstr: if partner is None: partner = geant_partner @@ -348,14 +358,14 @@ def office_router_subscription_factory(site_subscription_factory, faker, geant_p @pytest.fixture() def super_pop_switch_subscription_factory(site_subscription_factory, faker, geant_partner): def subscription_create( - description=None, - start_date="2023-05-24T00:00:00+00:00", - super_pop_switch_fqdn=None, - super_pop_switch_ts_port=None, - super_pop_switch_mgmt_ipv4_address=None, - super_pop_switch_site=None, - status: SubscriptionLifecycle | None = None, - partner: dict | None = None, + description=None, + start_date="2023-05-24T00:00:00+00:00", + super_pop_switch_fqdn=None, + super_pop_switch_ts_port=None, + super_pop_switch_mgmt_ipv4_address=None, + super_pop_switch_site=None, + status: SubscriptionLifecycle | None = None, + partner: dict | None = None, ) -> UUIDstr: if partner is None: partner = geant_partner @@ -395,3 +405,39 @@ def super_pop_switch_subscription_factory(site_subscription_factory, faker, gean return str(super_pop_switch_subscription.subscription_id) return subscription_create + +@pytest.fixture() +def test_workflow(generic_subscription_1: UUIDstr, generic_product_type_1) -> Generator: + _, GenericProductOne = generic_product_type_1 + + @step("Insert UUID in state") + def insert_object(): + return {"subscription_id": str(uuid4()), "model": GenericProductOne.from_subscription(generic_subscription_1)} + + @step("Test that it is a string now") + def check_object(subscription_id: Any, model: dict) -> None: + # This is actually a test. It would be nicer to have this in a proper test but that takes to much setup that + # already happens here. So we hijack this fixture and run this test for all tests that use this fixture + # (which should not be an issue) + assert isinstance(subscription_id, str) + assert isinstance(model, dict) + + @inputstep("Modify", assignee=Assignee.CHANGES) + def modify(subscription_id: UUIDstr) -> FormGenerator: + class TestChoice(Choice): + A = "A" + B = "B" + C = "C" + + class TestForm(FormPage): + generic_select: TestChoice + + user_input = yield TestForm + return user_input.model_dump() + + @workflow("Workflow") + def workflow_for_testing_processes_py(): + return init >> insert_object >> check_object >> modify >> done + + with WorkflowInstanceForTests(workflow_for_testing_processes_py, "workflow_for_testing_processes_py") as wf: + yield wf diff --git a/test/schedules/test_scheduling.py b/test/schedules/test_scheduling.py index 5ed2ad01e14a00e9e0785e9ee9a31518325f4bea..82168eb4375f2bdb50ed2c4de34fe9e0f65cd8cb 100644 --- a/test/schedules/test_scheduling.py +++ b/test/schedules/test_scheduling.py @@ -8,7 +8,7 @@ from gso.schedules.scheduling import scheduler @pytest.fixture(scope="module") def validate_subscriptions(): - from gso.schedules.validate_subscriptions import validate_subscriptions as vs # noqa: PLC0415 + from gso.schedules.validate_subscriptions import validate_subscriptions as vs return vs diff --git a/test/schemas/test_types.py b/test/schemas/test_types.py index 2e90123f3d96f3c0e5c86294780ba4539a9660c1..6f43bb10bb87d8e8b634a4249eba768d5f5af246 100644 --- a/test/schemas/test_types.py +++ b/test/schemas/test_types.py @@ -1,53 +1,62 @@ import pytest +from pydantic import BaseModel, ValidationError from gso.products.product_blocks.site import LatitudeCoordinate, LongitudeCoordinate +class LatitudeModel(BaseModel): + latitude: LatitudeCoordinate + + +class LongitudeModel(BaseModel): + longitude: LongitudeCoordinate + + @pytest.mark.parametrize( ("input_value", "is_valid"), [ - ("40.7128", True), - ("-74.0060", True), - ("90", True), - ("-90", True), - ("0", True), - ("45.6", True), - ("91", False), - ("-91", False), - ("180", False), - ("-180", False), + (40.7128, True), + (-74.0060, True), + (90, True), + (-90, True), + (0, True), + (45.6, True), + (91, False), + (-91, False), + (180, False), + (-180, False), ("abc", False), - ("90.1", False), + (90.1, False), ], ) def test_latitude(input_value, is_valid): if is_valid: - assert LatitudeCoordinate.validate(input_value) == input_value + assert LatitudeModel(latitude=input_value).latitude == input_value else: - with pytest.raises(ValueError, match="Invalid latitude coordinate"): - LatitudeCoordinate.validate(input_value) + with pytest.raises(ValidationError): + LatitudeModel(latitude=input_value) @pytest.mark.parametrize( ("input_value", "is_valid"), [ - ("40.7128", True), - ("-74.0060", True), - ("180", True), - ("-180", True), - ("0", True), - ("90.1", True), - ("181", False), - ("-181", False), - ("200", False), - ("-200", False), + (40.7128, True), + (-74.0060, True), + (180, True), + (-180, True), + (0, True), + (90.1, True), + (181, False), + (-181, False), + (200, False), + (-200, False), ("abc", False), ("90a", False), ], ) def test_longitude(input_value, is_valid): if is_valid: - assert LongitudeCoordinate.validate(input_value) == input_value + assert LongitudeModel(longitude=input_value).longitude == input_value else: - with pytest.raises(ValueError, match="Invalid longitude coordinate"): - LongitudeCoordinate.validate(input_value) + with pytest.raises(ValidationError): + LongitudeModel(longitude=input_value) diff --git a/test/workflows/__init__.py b/test/workflows/__init__.py index a8144aacac102f807324458c14ad0cd04c69c892..246d2b767b253699ce6b56cea36aeaa9ca156fea 100644 --- a/test/workflows/__init__.py +++ b/test/workflows/__init__.py @@ -2,16 +2,16 @@ import difflib import pprint from collections.abc import Callable from copy import deepcopy -from itertools import chain, repeat from typing import cast from uuid import uuid4 import structlog -from orchestrator.db import ProcessTable +from orchestrator.db import ProcessTable, WorkflowTable, db from orchestrator.services.processes import StateMerger, _db_create_process -from orchestrator.types import FormGenerator, InputForm, State +from orchestrator.types import State from orchestrator.utils.json import json_dumps, json_loads from orchestrator.workflow import Process, ProcessStat, Step, Success, Workflow, runwf +from orchestrator.workflow import Process as WFProcess from orchestrator.workflows import ALL_WORKFLOWS, LazyWorkflowInstance, get_workflow from pydantic_forms.core import post_form @@ -113,10 +113,22 @@ def extract_error(result): return extract_state(result).get("error") +def store_workflow(wf: Workflow, name: str | None = None) -> WorkflowTable: + wf_table = WorkflowTable(name=name or wf.name, target=wf.target, description=wf.description) + db.session.add(wf_table) + db.session.commit() + return wf_table + + +def delete_workflow(wf: WorkflowTable) -> None: + db.session.delete(wf) + db.session.commit() + + class WorkflowInstanceForTests(LazyWorkflowInstance): """Register Test workflows. - Similar to ``LazyWorkflowInstance`` but does not require an import during instantiate + Similar to `LazyWorkflowInstance` but does not require an import during instantiate Used for creating test workflows """ @@ -125,14 +137,19 @@ class WorkflowInstanceForTests(LazyWorkflowInstance): is_callable: bool def __init__(self, workflow: Workflow, name: str) -> None: + super().__init__("orchestrator.test", name) self.workflow = workflow self.name = name def __enter__(self): ALL_WORKFLOWS[self.name] = self + self.workflow_instance = store_workflow(self.workflow, name=self.name) + return self.workflow_instance def __exit__(self, _exc_type, _exc_value, _traceback): del ALL_WORKFLOWS[self.name] + delete_workflow(self.workflow_instance) + del self.workflow_instance def instantiate(self) -> Workflow: """Import and instantiate a workflow and return it. @@ -140,7 +157,9 @@ class WorkflowInstanceForTests(LazyWorkflowInstance): This can be as simple as merely importing a workflow function. However, if it concerns a workflow generating function, that function will be called with or without arguments as specified. - :return Workflow: A workflow function. + Returns: + A workflow function. + """ self.workflow.name = self.name return self.workflow @@ -172,13 +191,23 @@ def _store_step(step_log: list[tuple[Step, Process]]) -> Callable[[ProcessStat, return __store_step -def run_workflow(workflow_key: str, input_data: State | list[State]) -> tuple[Process, ProcessStat, list]: - # ATTENTION!! This code needs to be as similar as possible to ``server.services.processes.start_process`` +def _sanitize_input(input_data: State | list[State]) -> list[State]: + # To be backwards compatible convert single dict to list + if not isinstance(input_data, list): + input_data = [input_data] + + # We need a copy here and we want to mimic the actual code that returns a serialized version of the state + return cast(list[State], json_loads(json_dumps(input_data))) + + +def run_workflow(workflow_key: str, input_data: State | list[State]) -> tuple[WFProcess, ProcessStat, list]: + # ATTENTION!! This code needs to be as similar as possible to `server.services.processes.start_process` # The main differences are: we use a different step log function, and we don't run in # a separate thread + user_data = _sanitize_input(input_data) user = "john.doe" - step_log: list[tuple[Step, Process]] = [] + step_log: list[tuple[Step, WFProcess]] = [] process_id = uuid4() workflow = get_workflow(workflow_key) @@ -190,7 +219,7 @@ def run_workflow(workflow_key: str, input_data: State | list[State]) -> tuple[Pr "workflow_target": workflow.target, } - user_input = post_form(workflow.initial_input_form, initial_state, input_data) + user_input = post_form(workflow.initial_input_form, initial_state, user_data) pstat = ProcessStat( process_id, @@ -244,72 +273,6 @@ def resume_workflow( return result, step_log -def run_form_generator( - form_generator: FormGenerator, - extra_inputs: list[State] | None = None, -) -> tuple[list[dict], State]: - """Run a form generator to get the resulting forms and result. - - Warning! This does not run the actual pydantic validation on purpose. However, you should - make sure that anything in extra_inputs matched the values and types as if the pydantic validation has - been run. - - :param FormGenerator form_generator: The form generator that will be run. - :param list[State] | None extra_inputs: list of user input dicts for each page in the generator. - If no input is given for a page, an empty dict is used. - The default value from the form is used as the default value for a field. - - :return tuple[list[dict], State]: A list of generated forms and the result state for the whole generator. - - Example: - ------- - Given the following form generator: - - >>> from pydantic_forms.core import FormPage - >>> def form_generator(state): - ... class TestForm(FormPage): - ... field: str = "foo" - ... user_input = yield TestForm - ... return {**user_input.dict(), "bar": 42} - - You can run this without extra_inputs - >>> forms, result = run_form_generator(form_generator({"state_field": 1})) - >>> forms - [{'title': 'unknown', 'type': 'object', 'properties': { - 'field': {'title': 'Field', 'default': 'foo', 'type': 'string'}}, 'additionalProperties': False}] - >>> result - {'field': 'foo', 'bar': 42} - - - Or with extra_inputs: - >>> forms, result = run_form_generator(form_generator({'state_field': 1}), [{'field':'baz'}]) - >>> forms - [{'title': 'unknown', 'type': 'object', 'properties': { - 'field': {'title': 'Field', 'default': 'foo', 'type': 'string'}}, 'additionalProperties': False}] - >>> result - {'field': 'baz', 'bar': 42} - - """ - forms: list[dict] = [] - result: State = {"s": 3} - if extra_inputs is None: - extra_inputs = [] - - try: - form = cast(InputForm, next(form_generator)) - forms.append(form.schema()) - for extra_input in chain(extra_inputs, repeat(cast(State, {}))): - user_input_data = {field_name: field.default for field_name, field in form.__fields__.items()} - user_input_data.update(extra_input) - user_input = form.construct(**user_input_data) - form = form_generator.send(user_input) - forms.append(form.schema()) - except StopIteration as stop: - result = stop.value - - return forms, result - - def user_accept_and_assert_suspended(process_stat, step_log, extra_data=None): extra_data = extra_data or {} result, step_log = resume_workflow(process_stat, step_log, extra_data) diff --git a/test/workflows/conftest.py b/test/workflows/conftest.py index 0665829aee73ae9cd3b9d1129a2781a98c2e210d..9d298a779f3e4f190e009973caa321658eb2433b 100644 --- a/test/workflows/conftest.py +++ b/test/workflows/conftest.py @@ -1,14 +1,6 @@ import pytest from urllib3_mock import Responses -from test.fixtures import ( # noqa: F401 - iptrunk_side_subscription_factory, - iptrunk_subscription_factory, - juniper_router_subscription_factory, - nokia_router_subscription_factory, - site_subscription_factory, -) - @pytest.fixture(autouse=True) def responses(): diff --git a/test/workflows/iptrunk/test_create_iptrunk.py b/test/workflows/iptrunk/test_create_iptrunk.py index 34a79604ef532c41cf141214c88b5790b810aeef..04f8b41560ce6c57b5b63f570fabf6ded80647ee 100644 --- a/test/workflows/iptrunk/test_create_iptrunk.py +++ b/test/workflows/iptrunk/test_create_iptrunk.py @@ -6,7 +6,6 @@ import pytest from gso.products import Iptrunk, ProductName from gso.products.product_blocks.iptrunk import IptrunkType, PhysicalPortCapacity from gso.services.subscriptions import get_product_id_by_name -from gso.utils.helpers import LAGMember from gso.utils.shared_enums import Vendor from test import USER_CONFIRM_EMPTY_FORM from test.services.conftest import MockedNetboxClient @@ -56,7 +55,7 @@ def input_form_wizard_data(request, juniper_router_subscription_factory, nokia_r else: router_side_b = nokia_router_subscription_factory() side_b_members = [ - LAGMember(interface_name=f"Interface{interface}", interface_description=faker.sentence()) + dict(interface_name=f"Interface{interface}", interface_description=faker.sentence()) for interface in range(5) ] @@ -73,7 +72,7 @@ def input_form_wizard_data(request, juniper_router_subscription_factory, nokia_r "side_a_ae_iface": "lag-1", "side_a_ae_geant_a_sid": faker.geant_sid(), "side_a_ae_members": [ - LAGMember( + dict( interface_name=f"Interface{interface}", interface_description=faker.sentence(), ) diff --git a/test/workflows/iptrunk/test_migrate_iptrunk.py b/test/workflows/iptrunk/test_migrate_iptrunk.py index 5640cd646b75083f44d5bfbe37e21d1bfa9115a9..cd46d72100d3432cc868cdd8bb6cb710d0e74c44 100644 --- a/test/workflows/iptrunk/test_migrate_iptrunk.py +++ b/test/workflows/iptrunk/test_migrate_iptrunk.py @@ -29,6 +29,8 @@ def migrate_form_input( iptrunk_side_subscription_factory, ): use_juniper = getattr(request, "param", UseJuniperSide.NONE) + new_side_ae_members_nokia = faker.link_members_nokia()[0:2] + new_side_ae_members_juniper = faker.link_members_juniper()[0:2] if use_juniper == UseJuniperSide.SIDE_A: # Nokia -> Juniper @@ -36,7 +38,7 @@ def migrate_form_input( old_subscription = Iptrunk.from_subscription(product_id) new_router = juniper_router_subscription_factory() replace_side = str(old_subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.subscription.subscription_id) - new_side_ae_members = faker.link_members_juniper()[0:2] + new_side_ae_members = new_side_ae_members_juniper lag_name = "ae1" elif use_juniper == UseJuniperSide.SIDE_B: # Juniper -> Nokia @@ -48,7 +50,7 @@ def migrate_form_input( old_subscription = Iptrunk.from_subscription(product_id) new_router = nokia_router_subscription_factory() replace_side = str(old_subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.subscription.subscription_id) - new_side_ae_members = faker.link_members_nokia()[0:2] + new_side_ae_members = new_side_ae_members_nokia lag_name = "lag-1" elif use_juniper == UseJuniperSide.SIDE_BOTH: # Juniper -> Juniper @@ -60,7 +62,7 @@ def migrate_form_input( old_subscription = Iptrunk.from_subscription(product_id) new_router = juniper_router_subscription_factory() replace_side = str(old_subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.subscription.subscription_id) - new_side_ae_members = faker.link_members_juniper()[0:2] + new_side_ae_members = new_side_ae_members_juniper lag_name = "ae1" else: # Nokia -> Nokia @@ -68,7 +70,7 @@ def migrate_form_input( old_subscription = Iptrunk.from_subscription(product_id) new_router = nokia_router_subscription_factory() replace_side = str(old_subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.subscription.subscription_id) - new_side_ae_members = faker.link_members_nokia()[0:2] + new_side_ae_members = new_side_ae_members_nokia lag_name = "lag-1" return [ diff --git a/test/workflows/site/test_create_site.py b/test/workflows/site/test_create_site.py index e31576152634045e9efe57b864a51785495a41d1..f6c196da217320506521e9d993b7e7609f8e05db 100644 --- a/test/workflows/site/test_create_site.py +++ b/test/workflows/site/test_create_site.py @@ -4,7 +4,6 @@ from pydantic_forms.exceptions import FormValidationError from gso.products import ProductName from gso.products.product_blocks.site import SiteTier from gso.products.product_types.site import Site -from gso.services.partners import get_partner_by_name from gso.services.subscriptions import get_product_id_by_name from test.workflows import assert_complete, extract_state, run_workflow @@ -65,7 +64,7 @@ def test_site_name_is_incorrect(responses, faker): "site_internal_id": faker.pyint(), "site_tier": SiteTier.TIER1, "site_ts_address": faker.ipv4(), - "partner": get_partner_by_name("GEANT")["partner_id"], + "partner": "GEANT", }, ]