diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 61d37c99aa91a89f8c195261fd647924c051007e..6cc76019e6555e40b0c8a4aca3db42defbad859c 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -12,7 +12,7 @@ run-tox-pipeline: stage: tox tags: - docker-executor - image: python:3.11 + image: python:3.12 services: - postgres:15.4 diff --git a/Dockerfile b/Dockerfile index 5be8cb5440ee78244ae72f0d8df2073660e576f6..b2802dd07a91be30f25831fc71f97361e2ac46d1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.11-alpine +FROM python:3.12-alpine WORKDIR /app ARG ARTIFACT_VERSION diff --git a/docs/source/quickstart.rst b/docs/source/quickstart.rst index a7cc2d9058a220cfd6db4e43bb3a1cb2a4ad9181..d82a04c1816b3ebb75e3f05213d5163df3ad6fd3 100644 --- a/docs/source/quickstart.rst +++ b/docs/source/quickstart.rst @@ -4,15 +4,15 @@ Quickstart Development environment and dependencies ---------------------------------------- -- Install python 3.11 if you do not have it already: +- Install python 3.12 if you do not have it already: - ``add-apt-repository ppa:deadsnakes/ppa`` - - ``apt install python3.11 python3.11-distutils`` + - ``apt install python3.12 python3.12-distutils`` - Follow Steps 1 and 2 from here to install dependencies and setup DB: `<https://workfloworchestrator.org/orchestrator-core/workshops/beginner/debian/>`_ - To install the orchestrator GUI, you can follow the steps 5 and 6 from the previous link. - Create a virtual environment: - ``source /usr/share/virtualenvwrapper/virtualenvwrapper.sh`` - - ``mkvirtualenv --python python3.11 gso`` + - ``mkvirtualenv --python python3.12 gso`` - To use the virtual environment: - ``source /usr/share/virtualenvwrapper/virtualenvwrapper.sh`` - ``workon gso`` @@ -25,7 +25,7 @@ Do all this inside the virtual environment. - Clone this repository - ``pip install -r requirements.txt`` - If you get an error because you pip version is too old, run this: - ``curl -sS https://bootstrap.pypa.io/get-pip.py | python3.11`` + ``curl -sS https://bootstrap.pypa.io/get-pip.py | python3.12`` - ``pip install -e .`` - Create an ``oss-params.json`` based on the ``oss-params-example.json`` file inside ``/gso``. - Export the oss-params file: ``export OSS_PARAMS_FILENAME="/path/to/oss-params.json"`` diff --git a/gso/__init__.py b/gso/__init__.py index ecdfd940ffefe85df1613e4a6cbbc74f56bf80dc..cf1b39a29faa0d570fd822bbd783433ab45dbe27 100644 --- a/gso/__init__.py +++ b/gso/__init__.py @@ -16,6 +16,7 @@ from gso.middlewares import ModifyProcessEndpointResponse def init_gso_app() -> OrchestratorCore: """Initialise the :term:`GSO` app.""" app = OrchestratorCore(base_settings=app_settings) + # app.register_graphql() # TODO: uncomment this line when the GUI V2 is ready app.include_router(api_router, prefix="/api") app.add_middleware(ModifyProcessEndpointResponse) return app @@ -28,7 +29,7 @@ def init_worker_app() -> OrchestratorCore: def init_cli_app() -> typer.Typer: """Initialise :term:`GSO` as a CLI application.""" - from gso.cli import imports, netbox # noqa: PLC0415 + from gso.cli import imports, netbox cli_app.add_typer(imports.app, name="import-cli") cli_app.add_typer(netbox.app, name="netbox-cli") diff --git a/gso/api/v1/imports.py b/gso/api/v1/imports.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/gso/auth/oidc_policy_helper.py b/gso/auth/oidc_policy_helper.py index 1ba2eb3b8cdb3db63babe30beabf2c6186e3ae3c..96edfbf3e08b21951a320dc40dfb6e1c8012a10e 100644 --- a/gso/auth/oidc_policy_helper.py +++ b/gso/auth/oidc_policy_helper.py @@ -252,7 +252,7 @@ class OIDCUser(HTTPBearer): return response = await async_request.get(self.openid_url + "/.well-known/openid-configuration") - self.openid_config = OIDCConfig.parse_obj(response.json()) + self.openid_config = OIDCConfig.model_validate(response.json()) async def userinfo(self, async_request: AsyncClient, token: str) -> OIDCUserModel: """Get the userinfo from the openid server. diff --git a/gso/auth/settings.py b/gso/auth/settings.py index 29c1fc806a8589b38158a3f95dddf3f10cb8bdf3..b3ab1a6a569e2e594e181c23c231366e212f4905 100644 --- a/gso/auth/settings.py +++ b/gso/auth/settings.py @@ -6,7 +6,8 @@ with external authentication providers for enhanced security management. Todo: Remove token and sensitive data from OPA console and API. """ -from pydantic import BaseSettings, Field +from pydantic import Field +from pydantic_settings import BaseSettings class Oauth2LibSettings(BaseSettings): diff --git a/gso/middlewares.py b/gso/middlewares.py index 58106502b70a794cde29cfb714ce61101d056dbb..5ffca88ef1caf559eafd10a9a3d6700767929661 100644 --- a/gso/middlewares.py +++ b/gso/middlewares.py @@ -93,9 +93,9 @@ class ModifyProcessEndpointResponse(BaseHTTPMiddleware): if callback_result and isinstance(callback_result, str): callback_result = json.loads(callback_result) if callback_result.get("output") and len(callback_result["output"]) > max_output_length: - callback_result[ - "output" - ] = f'{request.base_url}api/v1/processes/steps/{step["step_id"]}/callback-results{token}' + callback_result["output"] = ( + f'{request.base_url}api/v1/processes/steps/{step["step_id"]}/callback-results{token}' + ) step["state"]["callback_result"] = callback_result except (AttributeError, KeyError, TypeError): pass diff --git a/gso/migrations/env.py b/gso/migrations/env.py index 45dc109d4786205b3359743edf3681283ca58797..968abeb94a1145de0c923cdc8d27dd2030a55df7 100644 --- a/gso/migrations/env.py +++ b/gso/migrations/env.py @@ -15,7 +15,7 @@ config = context.config # This line sets up loggers basically. logger = logging.getLogger("alembic.env") -config.set_main_option("sqlalchemy.url", app_settings.DATABASE_URI) +config.set_main_option("sqlalchemy.url", str(app_settings.DATABASE_URI)) target_metadata = BaseModel.metadata diff --git a/gso/migrations/versions/2024-04-20_1ec810b289c0_add_orchestrator_2_1_2_migrations.py b/gso/migrations/versions/2024-04-20_1ec810b289c0_add_orchestrator_2_1_2_migrations.py new file mode 100644 index 0000000000000000000000000000000000000000..89fcbfcd056850a331ca92e379bf392a51a0cdc4 --- /dev/null +++ b/gso/migrations/versions/2024-04-20_1ec810b289c0_add_orchestrator_2_1_2_migrations.py @@ -0,0 +1,61 @@ +"""remove subscription cancellation workflow. + +Revision ID: 1ec810b289c0 +Revises: +Create Date: 2024-04-02 10:21:08.539591 + +""" +from alembic import op +from orchestrator.migrations.helpers import create_workflow, delete_workflow + +# revision identifiers, used by Alembic. +revision = '1ec810b289c0' +down_revision = '393acfa175c0' +branch_labels = None +# TODO: check it carefuly +depends_on = '048219045729' # in this revision, SURF has added a new columns to the workflow table like delted_at, so we need to add a dependency on the revision that added the columns to the workflow table. + +new_workflows = [ + { + "name": "import_site", + "target": "SYSTEM", + "description": "Import a site without provisioning it.", + "product_type": "Site" + }, + { + "name": "import_router", + "target": "SYSTEM", + "description": "Import a router without provisioning it.", + "product_type": "Router" + }, + { + "name": "import_iptrunk", + "target": "SYSTEM", + "description": "Import an IP trunk without provisioning it.", + "product_type": "Iptrunk" + }, + { + "name": "import_super_pop_switch", + "target": "SYSTEM", + "description": "Import a Super PoP switch without provisioning it.", + "product_type": "SuperPopSwitch" + }, + { + "name": "import_office_router", + "target": "SYSTEM", + "description": "Import an office router without provisioning it.", + "product_type": "OfficeRouter" + }, +] + + +def upgrade() -> None: + conn = op.get_bind() + for workflow in new_workflows: + create_workflow(conn, workflow) + + +def downgrade() -> None: + conn = op.get_bind() + for workflow in new_workflows: + delete_workflow(conn, workflow["name"]) diff --git a/gso/oss-params-example.json b/gso/oss-params-example.json index ff6eb33dc70bcaad8cf453e5d678483cadb79c5f..38f52ff78425211e1a7de5e4bccd6f5ab9dc3cac 100644 --- a/gso/oss-params-example.json +++ b/gso/oss-params-example.json @@ -45,7 +45,7 @@ }, "LT_IAS": { "V4": {"containers": ["10.255.255.0/24"], "networks": [], "mask": 31}, - "V6": {"containers": ["dead:beef:cc::/48"], "networks": [], "mask": 126}, + "V6": {"containers": [ "2001:798:1::/48"], "networks": [], "mask": 126}, "domain_name": ".geantip", "dns_view": "default", "network_view": "default" @@ -73,6 +73,7 @@ "PROVISIONING_PROXY": { "scheme": "https", "api_base": "localhost:44444", + "auth": "Bearer <token>", "api_version": 1123 }, "CELERY": { diff --git a/gso/products/product_blocks/iptrunk.py b/gso/products/product_blocks/iptrunk.py index cac186262f641fc52fdc62a4e4c556c7d924f22b..2d12b9c55924844338ce68d2af0a4a975e7b7c89 100644 --- a/gso/products/product_blocks/iptrunk.py +++ b/gso/products/product_blocks/iptrunk.py @@ -1,11 +1,13 @@ """IP trunk product block that has all parameters of a subscription throughout its lifecycle.""" import ipaddress -from typing import TypeVar +from typing import Annotated -from orchestrator.domain.base import ProductBlockModel -from orchestrator.forms.validators import UniqueConstrainedList +from annotated_types import Len +from orchestrator.domain.base import ProductBlockModel, T from orchestrator.types import SubscriptionLifecycle, strEnum +from pydantic import AfterValidator +from pydantic_forms.validators import validate_unique_list from gso.products.product_blocks.router import ( RouterBlock, @@ -33,11 +35,8 @@ class IptrunkType(strEnum): LEASED = "Leased" -T_co = TypeVar("T_co", covariant=True) - - -class LAGMemberList(UniqueConstrainedList[T_co]): # type: ignore[type-var] - """A list of :term:`LAG` member interfaces.""" +LAGMemberList = Annotated[list[T], AfterValidator(validate_unique_list), Len(min_length=0)] +IptrunkSides = Annotated[list[T], AfterValidator(validate_unique_list), Len(min_length=2, max_length=2)] class IptrunkInterfaceBlockInactive( @@ -65,13 +64,6 @@ class IptrunkInterfaceBlock(IptrunkInterfaceBlockProvisioning, lifecycle=[Subscr interface_description: str | None = None -class IptrunkSides(UniqueConstrainedList[T_co]): # type: ignore[type-var] - """A list of IP trunk interfaces that make up one side of a link.""" - - min_items = 2 - max_items = 2 - - class IptrunkSideBlockInactive( ProductBlockModel, lifecycle=[SubscriptionLifecycle.INITIAL], @@ -91,7 +83,7 @@ class IptrunkSideBlockProvisioning(IptrunkSideBlockInactive, lifecycle=[Subscrip iptrunk_side_node: RouterBlockProvisioning iptrunk_side_ae_iface: str | None = None iptrunk_side_ae_geant_a_sid: str | None = None - iptrunk_side_ae_members: LAGMemberList[IptrunkInterfaceBlockProvisioning] + iptrunk_side_ae_members: LAGMemberList[IptrunkInterfaceBlockProvisioning] # type: ignore[assignment] class IptrunkSideBlock(IptrunkSideBlockProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]): @@ -100,7 +92,7 @@ class IptrunkSideBlock(IptrunkSideBlockProvisioning, lifecycle=[SubscriptionLife iptrunk_side_node: RouterBlock iptrunk_side_ae_iface: str | None = None iptrunk_side_ae_geant_a_sid: str | None = None - iptrunk_side_ae_members: LAGMemberList[IptrunkInterfaceBlock] + iptrunk_side_ae_members: LAGMemberList[IptrunkInterfaceBlock] # type: ignore[assignment] class IptrunkBlockInactive( @@ -132,7 +124,7 @@ class IptrunkBlockProvisioning(IptrunkBlockInactive, lifecycle=[SubscriptionLife iptrunk_isis_metric: int | None = None iptrunk_ipv4_network: ipaddress.IPv4Network | None = None iptrunk_ipv6_network: ipaddress.IPv6Network | None = None - iptrunk_sides: IptrunkSides[IptrunkSideBlockProvisioning] + iptrunk_sides: IptrunkSides[IptrunkSideBlockProvisioning] # type: ignore[assignment] class IptrunkBlock(IptrunkBlockProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]): @@ -155,4 +147,4 @@ class IptrunkBlock(IptrunkBlockProvisioning, lifecycle=[SubscriptionLifecycle.AC #: The IPv6 network used for this trunk. iptrunk_ipv6_network: ipaddress.IPv6Network #: The two sides that the trunk is connected to. - iptrunk_sides: IptrunkSides[IptrunkSideBlock] + iptrunk_sides: IptrunkSides[IptrunkSideBlock] # type: ignore[assignment] diff --git a/gso/products/product_blocks/lan_switch_interconnect.py b/gso/products/product_blocks/lan_switch_interconnect.py index 55a7b97a3ca788dbe3e61f5ddd9c8a5344e5976f..a9b1c77366662e24499f936f9762d515b882b934 100644 --- a/gso/products/product_blocks/lan_switch_interconnect.py +++ b/gso/products/product_blocks/lan_switch_interconnect.py @@ -63,7 +63,7 @@ class LanSwitchInterconnectRouterSideBlockProvisioning( node: RouterBlockProvisioning ae_iface: str | None = None - ae_members: LAGMemberList[LanSwitchInterconnectInterfaceBlockProvisioning] + ae_members: LAGMemberList[LanSwitchInterconnectInterfaceBlockProvisioning] # type: ignore[assignment] class LanSwitchInterconnectRouterSideBlock( @@ -73,7 +73,7 @@ class LanSwitchInterconnectRouterSideBlock( node: RouterBlock ae_iface: str - ae_members: LAGMemberList[LanSwitchInterconnectInterfaceBlock] + ae_members: LAGMemberList[LanSwitchInterconnectInterfaceBlock] # type: ignore[assignment] class LanSwitchInterconnectSwitchSideBlockInactive( @@ -95,7 +95,7 @@ class LanSwitchInterconnectSwitchSideBlockProvisioning( node: SwitchBlockProvisioning ae_iface: str | None = None - ae_members: LAGMemberList[LanSwitchInterconnectInterfaceBlockProvisioning] + ae_members: LAGMemberList[LanSwitchInterconnectInterfaceBlockProvisioning] # type: ignore[assignment] class LanSwitchInterconnectSwitchSideBlock( @@ -105,7 +105,7 @@ class LanSwitchInterconnectSwitchSideBlock( node: SwitchBlock ae_iface: str - ae_members: LAGMemberList[LanSwitchInterconnectInterfaceBlock] + ae_members: LAGMemberList[LanSwitchInterconnectInterfaceBlock] # type: ignore[assignment] class LanSwitchInterconnectBlockInactive( diff --git a/gso/products/product_blocks/office_router.py b/gso/products/product_blocks/office_router.py index fec7ad8d16366baf12ec3528748f71aa2fa36d90..65eab0256a073c699f3ea2ef84d96e3352096722 100644 --- a/gso/products/product_blocks/office_router.py +++ b/gso/products/product_blocks/office_router.py @@ -1,7 +1,5 @@ """Product block for :class:`office router` products.""" -import ipaddress - from orchestrator.domain.base import ProductBlockModel from orchestrator.types import SubscriptionLifecycle @@ -10,7 +8,7 @@ from gso.products.product_blocks.site import ( SiteBlockInactive, SiteBlockProvisioning, ) -from gso.utils.shared_enums import PortNumber, Vendor +from gso.utils.shared_enums import IPv4AddressType, IPv6AddressType, PortNumber, Vendor class OfficeRouterBlockInactive( @@ -22,8 +20,8 @@ class OfficeRouterBlockInactive( office_router_fqdn: str | None = None office_router_ts_port: PortNumber | None = None - office_router_lo_ipv4_address: ipaddress.IPv4Address | None = None - office_router_lo_ipv6_address: ipaddress.IPv6Address | None = None + office_router_lo_ipv4_address: IPv4AddressType | None = None + office_router_lo_ipv6_address: IPv6AddressType | None = None office_router_site: SiteBlockInactive | None vendor: Vendor | None = None @@ -33,8 +31,8 @@ class OfficeRouterBlockProvisioning(OfficeRouterBlockInactive, lifecycle=[Subscr office_router_fqdn: str | None = None office_router_ts_port: PortNumber | None = None - office_router_lo_ipv4_address: ipaddress.IPv4Address | None = None - office_router_lo_ipv6_address: ipaddress.IPv6Address | None = None + office_router_lo_ipv4_address: IPv4AddressType | None = None + office_router_lo_ipv6_address: IPv6AddressType | None = None office_router_site: SiteBlockProvisioning | None vendor: Vendor | None = None @@ -47,9 +45,9 @@ class OfficeRouterBlock(OfficeRouterBlockProvisioning, lifecycle=[SubscriptionLi #: The port of the terminal server that this office router is connected to. Used to offer out of band access. office_router_ts_port: PortNumber #: The IPv4 loopback address of the office router. - office_router_lo_ipv4_address: ipaddress.IPv4Address + office_router_lo_ipv4_address: IPv4AddressType #: The IPv6 loopback address of the office router. - office_router_lo_ipv6_address: ipaddress.IPv6Address + office_router_lo_ipv6_address: IPv6AddressType #: The :class:`Site` that this office router resides in. Both physically and computationally. office_router_site: SiteBlock #: The vendor of an office router. Defaults to Juniper. diff --git a/gso/products/product_blocks/pop_vlan.py b/gso/products/product_blocks/pop_vlan.py index 4935c2f69966874c686489c7e06e3061d5509365..9942a97991ed4ea0e72f3ea2ad5f95b5fc580e83 100644 --- a/gso/products/product_blocks/pop_vlan.py +++ b/gso/products/product_blocks/pop_vlan.py @@ -1,12 +1,13 @@ """Pop VLAN product block that has all parameters of a subscription throughout its lifecycle.""" from ipaddress import IPv4Network, IPv6Network -from typing import TypeVar +from typing import Annotated, TypeVar from orchestrator.domain.base import ProductBlockModel -from orchestrator.forms.validators import UniqueConstrainedList from orchestrator.types import SubscriptionLifecycle +from pydantic import AfterValidator from pydantic_forms.types import strEnum +from pydantic_forms.validators import validate_unique_list from gso.products.product_blocks.lan_switch_interconnect import ( LanSwitchInterconnectBlock, @@ -14,7 +15,7 @@ from gso.products.product_blocks.lan_switch_interconnect import ( LanSwitchInterconnectBlockProvisioning, ) -T_co = TypeVar("T_co", covariant=True) +T = TypeVar("T") class LayerPreference(strEnum): @@ -24,8 +25,7 @@ class LayerPreference(strEnum): L3 = "L3" -class PortList(UniqueConstrainedList[T_co]): # type: ignore[type-var] - """A list of ports.""" +PortList = Annotated[list[T], AfterValidator(validate_unique_list)] class PopVlanPortBlockInactive( @@ -92,7 +92,7 @@ class PopVlanBlock(PopVlanBlockProvisioning, lifecycle=[SubscriptionLifecycle.AC #: The LAN Switch Interconnect that this Pop VLAN is connected to. lan_switch_interconnect: LanSwitchInterconnectBlock #: The ports of the Pop VLAN. - ports: PortList[PopVlanPortBlock] + ports: PortList[PopVlanPortBlock] # type: ignore[assignment] #: The level of the layer preference for the Pop VLAN (L2 or L3). layer_preference: LayerPreference #: IPv4 network for the Pop VLAN if layer preference is L3. diff --git a/gso/products/product_blocks/router.py b/gso/products/product_blocks/router.py index f91bf1c70507a2f7814bfe69643c70489cb0c4c2..17deeccb1ac8a5ee9bcfaa14fa25f27360881e7c 100644 --- a/gso/products/product_blocks/router.py +++ b/gso/products/product_blocks/router.py @@ -1,7 +1,5 @@ """Product block for :class:`Router` products.""" -import ipaddress - from orchestrator.domain.base import ProductBlockModel from orchestrator.types import SubscriptionLifecycle, strEnum @@ -10,7 +8,7 @@ from gso.products.product_blocks.site import ( SiteBlockInactive, SiteBlockProvisioning, ) -from gso.utils.shared_enums import PortNumber, Vendor +from gso.utils.shared_enums import IPv4AddressType, IPv6AddressType, PortNumber, Vendor class RouterRole(strEnum): @@ -31,8 +29,8 @@ class RouterBlockInactive( router_fqdn: str | None = None router_ts_port: PortNumber | None = None router_access_via_ts: bool | None = None - router_lo_ipv4_address: ipaddress.IPv4Address | None = None - router_lo_ipv6_address: ipaddress.IPv6Address | None = None + router_lo_ipv4_address: IPv4AddressType | None = None + router_lo_ipv6_address: IPv6AddressType | None = None router_lo_iso_address: str | None = None router_role: RouterRole | None = None router_site: SiteBlockInactive | None @@ -45,8 +43,8 @@ class RouterBlockProvisioning(RouterBlockInactive, lifecycle=[SubscriptionLifecy router_fqdn: str router_ts_port: PortNumber router_access_via_ts: bool - router_lo_ipv4_address: ipaddress.IPv4Address - router_lo_ipv6_address: ipaddress.IPv6Address + router_lo_ipv4_address: IPv4AddressType + router_lo_ipv6_address: IPv6AddressType router_lo_iso_address: str router_role: RouterRole router_site: SiteBlockProvisioning @@ -63,9 +61,9 @@ class RouterBlock(RouterBlockProvisioning, lifecycle=[SubscriptionLifecycle.ACTI #: Whether this router should be accessed through the terminal server, or through its loopback address. router_access_via_ts: bool #: The IPv4 loopback address of the router. - router_lo_ipv4_address: ipaddress.IPv4Address + router_lo_ipv4_address: IPv4AddressType #: The IPv6 loopback address of the router. - router_lo_ipv6_address: ipaddress.IPv6Address + router_lo_ipv6_address: IPv6AddressType #: The :term:`ISO` :term:`NET` of the router, used for :term:`ISIS` support. router_lo_iso_address: str #: The role of the router, which can be any of the values defined in :class:`RouterRole`. diff --git a/gso/products/product_blocks/site.py b/gso/products/product_blocks/site.py index 1852b24615076b2d76dde41db71a9e5d5fcc535f..fc34c4d2b33fad86fe7a3068d0307aacc7ce09c1 100644 --- a/gso/products/product_blocks/site.py +++ b/gso/products/product_blocks/site.py @@ -1,10 +1,12 @@ """The product block that describes a site subscription.""" import re +from typing import Annotated +from annotated_types import doc from orchestrator.domain.base import ProductBlockModel from orchestrator.types import SubscriptionLifecycle, strEnum -from pydantic import ConstrainedStr +from pydantic import AfterValidator, Field class SiteTier(strEnum): @@ -20,44 +22,53 @@ class SiteTier(strEnum): TIER4 = 4 -class LatitudeCoordinate(ConstrainedStr): - """A latitude coordinate, modeled as a constrained string. - - The coordinate must match the format conforming to the latitude range of -90 to +90 degrees. It can be a - floating-point number or an integer. - Valid examples: 40.7128, -74.0060, 90, -90, 0 - """ - +def validate_latitude(v: float) -> float: + """Validate a latitude coordinate.""" regex = re.compile(r"^-?([1-8]?\d(\.\d+)?|90(\.0+)?)$") - - @classmethod - def validate(cls, value: str) -> str: - """Validate that a latitude coordinate is valid.""" - if not cls.regex.match(value): - msg = "Invalid latitude coordinate. Valid examples: '40.7128', '-74.0060', '90', '-90', '0'." - raise ValueError(msg) - - return value + if not regex.match(str(v)): + msg = "Invalid latitude coordinate. Valid examples: '40.7128', '-74.0060', '90', '-90', '0'." + raise ValueError(msg) + return v -class LongitudeCoordinate(ConstrainedStr): - """A longitude coordinate, modeled as a constrained string. - - The coordinate must match the format conforming to the longitude - range of -180 to +180 degrees. It can be a floating point number or an integer. - Valid examples: 40.7128, -74.0060, 180, -180, 0 - """ - +def validate_longitude(v: float) -> float: + """Validate a longitude coordinate.""" regex = re.compile(r"^-?(180(\.0+)?|((1[0-7]\d)|([1-9]?\d))(\.\d+)?)$") - - @classmethod - def validate(cls, value: str) -> str: - """Validate that a longitude coordinate is valid.""" - if not cls.regex.match(value): - msg = "Invalid longitude coordinate. Valid examples: '40.7128', '-74.0060', '180', '-180'" - raise ValueError(msg) - - return value + if not regex.match(str(v)): + msg = "Invalid longitude coordinate. Valid examples: '40.7128', '-74.0060', '180', '-180', '0'." + raise ValueError(msg) + + return v + + +LatitudeCoordinate = Annotated[ + float, + Field( + ge=-90, + le=90, + ), + AfterValidator(validate_latitude), + doc( + "A latitude coordinate, modeled as a string. " + "The coordinate must match the format conforming to the latitude range of -90 to +90 degrees. " + "It can be a floating-point number or an integer. Valid examples: 40.7128, -74.0060, 90, -90, 0." + ), +] + +LongitudeCoordinate = Annotated[ + float, + Field( + ge=-180, + le=180, + ), + AfterValidator(validate_longitude), + doc( + "A longitude coordinate, modeled as a string. " + "The coordinate must match the format conforming to the longitude " + "range of -180 to +180 degrees. It can be a floating-point number or an integer. " + "Valid examples: 40.7128, -74.0060, 180, -180, 0." + ), +] class SiteBlockInactive( diff --git a/gso/products/product_blocks/super_pop_switch.py b/gso/products/product_blocks/super_pop_switch.py index af2f2ba74c98cc41806842d9877e8b0168ec3748..3335b28cf90ee9d55abe59be528f404d44d905b8 100644 --- a/gso/products/product_blocks/super_pop_switch.py +++ b/gso/products/product_blocks/super_pop_switch.py @@ -1,7 +1,5 @@ """Product block for :class:`Super PoP Switch` products.""" -import ipaddress - from orchestrator.domain.base import ProductBlockModel from orchestrator.types import SubscriptionLifecycle @@ -10,7 +8,7 @@ from gso.products.product_blocks.site import ( SiteBlockInactive, SiteBlockProvisioning, ) -from gso.utils.shared_enums import PortNumber, Vendor +from gso.utils.shared_enums import IPv4AddressType, PortNumber, Vendor class SuperPopSwitchBlockInactive( @@ -22,7 +20,7 @@ class SuperPopSwitchBlockInactive( super_pop_switch_fqdn: str | None = None super_pop_switch_ts_port: PortNumber | None = None - super_pop_switch_mgmt_ipv4_address: ipaddress.IPv4Address | None = None + super_pop_switch_mgmt_ipv4_address: IPv4AddressType | None = None super_pop_switch_site: SiteBlockInactive | None vendor: Vendor | None = None @@ -32,7 +30,7 @@ class SuperPopSwitchBlockProvisioning(SuperPopSwitchBlockInactive, lifecycle=[Su super_pop_switch_fqdn: str | None = None super_pop_switch_ts_port: PortNumber | None = None - super_pop_switch_mgmt_ipv4_address: ipaddress.IPv4Address | None = None + super_pop_switch_mgmt_ipv4_address: IPv4AddressType | None = None super_pop_switch_site: SiteBlockProvisioning | None vendor: Vendor | None = None @@ -45,7 +43,7 @@ class SuperPopSwitchBlock(SuperPopSwitchBlockProvisioning, lifecycle=[Subscripti #: The port of the terminal server that this Super PoP switch is connected to. Used to offer out of band access. super_pop_switch_ts_port: PortNumber #: The IPv4 management address of the Super PoP switch. - super_pop_switch_mgmt_ipv4_address: ipaddress.IPv4Address + super_pop_switch_mgmt_ipv4_address: IPv4AddressType #: The :class:`Site` that this Super PoP switch resides in. Both physically and computationally. super_pop_switch_site: SiteBlock #: The vendor of a Super PoP switch. Defaults to Juniper. diff --git a/gso/schedules/scheduling.py b/gso/schedules/scheduling.py index 8525956cb7933facebd090d8c34938f66640bb56..2e83fab3613526c9cd3571def4ad1dff101c1443 100644 --- a/gso/schedules/scheduling.py +++ b/gso/schedules/scheduling.py @@ -29,6 +29,7 @@ def scheduler( All time units can be specified with lists of numbers or crontab pattern strings for advanced scheduling. All specified time parts (minute, hour, day, etc.) must align for a task to run. + """ def decorator(task_func: Callable) -> Callable: diff --git a/gso/schema/partner.py b/gso/schema/partner.py index 890adcb9b20b08f6c244e8986ad20eaf4def83fc..b1c58c2cf91bf544501f6b2e316117b8b83a70c9 100644 --- a/gso/schema/partner.py +++ b/gso/schema/partner.py @@ -3,7 +3,7 @@ from datetime import datetime from uuid import uuid4 -from pydantic import BaseModel, EmailStr, Field +from pydantic import BaseModel, ConfigDict, EmailStr, Field from gso.db.models import PartnerType @@ -14,7 +14,7 @@ class PartnerCreate(BaseModel): partner_id: str = Field(default_factory=lambda: str(uuid4())) name: str email: EmailStr | None = None - as_number: str | None = Field(None, unique=True) + as_number: str | None = None as_set: str | None = None route_set: str | None = None black_listed_as_sets: list[str] | None = None @@ -23,8 +23,4 @@ class PartnerCreate(BaseModel): partner_type: PartnerType created_at: datetime = Field(default_factory=lambda: datetime.now().astimezone()) updated_at: datetime = Field(default_factory=lambda: datetime.now().astimezone()) - - class Config: - """Pydantic model configuration.""" - - orm_mode = True + model_config = ConfigDict(from_attributes=True) diff --git a/gso/services/infoblox.py b/gso/services/infoblox.py index 06ee5719a48010c2cb175200abc3b8276312303d..140b4c851b29b649a05243e9f12c44e729debc03 100644 --- a/gso/services/infoblox.py +++ b/gso/services/infoblox.py @@ -10,6 +10,7 @@ from infoblox_client.exceptions import ( ) from gso.settings import IPAMParams, load_oss_params +from gso.utils.shared_enums import IPv4AddressType, IPv6AddressType logger = getLogger(__name__) NULL_MAC = "00:00:00:00:00:00" @@ -233,8 +234,8 @@ def allocate_host( def create_host_by_ip( hostname: str, - ipv4_address: ipaddress.IPv4Address, - ipv6_address: ipaddress.IPv6Address, + ipv4_address: IPv4AddressType, + ipv6_address: IPv6AddressType, service_type: str, comment: str, ) -> None: @@ -268,11 +269,11 @@ def create_host_by_ip( new_host.update() -def find_host_by_ip(ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address) -> objects.HostRecord | None: +def find_host_by_ip(ip_addr: IPv4AddressType | ipaddress.IPv6Address) -> objects.HostRecord | None: """Find a host record in Infoblox by its associated IP address. :param ip_addr: The IP address of a host that is searched for. - :type ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address + :type ip_addr: FancyIPV4Address | ipaddress.IPv6Address """ conn, _ = _setup_connection() if ip_addr.version == 4: # noqa: PLR2004, the 4 in IPv4 is well-known and not a "magic value." @@ -314,14 +315,14 @@ def find_v6_host_by_fqdn(fqdn: str) -> objects.HostRecordV6: ) -def delete_host_by_ip(ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address) -> None: +def delete_host_by_ip(ip_addr: IPv4AddressType | ipaddress.IPv6Address) -> None: """Delete a host from Infoblox. Delete a host record in Infoblox, by providing the IP address that is associated with the record. Raises a :class:`DeletionError` if no record can be found in Infoblox. :param ip_addr: The IP address of the host record that should get deleted. - :type ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address + :type ip_addr: FancyIPV4Address | ipaddress.IPv6Address """ host = find_host_by_ip(ip_addr) if host: diff --git a/gso/services/lso_client.py b/gso/services/lso_client.py index 67e4b77cce1e4c03f1ba200b6633079cb2476437..92898940edfdb7a2981156a1fd5e0f5c5a44f786 100644 --- a/gso/services/lso_client.py +++ b/gso/services/lso_client.py @@ -13,9 +13,10 @@ from orchestrator.config.assignee import Assignee from orchestrator.types import State from orchestrator.utils.errors import ProcessFailureError from orchestrator.workflow import Step, StepList, begin, callback_step, inputstep -from pydantic_forms.core import FormPage, ReadOnlyField +from pydantic import ConfigDict +from pydantic_forms.core import FormPage from pydantic_forms.types import FormGenerator -from pydantic_forms.validators import Label, LongText +from pydantic_forms.validators import Label, LongText, ReadOnlyField from gso import settings @@ -125,13 +126,12 @@ def _show_results(state: State) -> FormGenerator: return state class ConfirmRunPage(FormPage): - class Config: - title: str = state["lso_result_title"] + model_config = ConfigDict() if "lso_result_extra_label" in state: extra_label: Label = state["lso_result_extra_label"] - run_status: str = ReadOnlyField(state["callback_result"]["status"]) - run_results: LongText = ReadOnlyField(json.dumps(state["callback_result"], indent=4)) + run_status: ReadOnlyField(state["callback_result"]["status"], default_type=str) # type: ignore[valid-type] + run_results: ReadOnlyField(json.dumps(state["callback_result"], indent=4), default_type=LongText) # type: ignore[valid-type] yield ConfirmRunPage [state.pop(key, None) for key in ["run_results", "lso_result_title", "lso_result_extra_label"]] diff --git a/gso/settings.py b/gso/settings.py index ced74ba5c01f59555976396dc43ad268bc786c0b..f550c0cc90a3bc749dd581af31500cab30de8729 100644 --- a/gso/settings.py +++ b/gso/settings.py @@ -9,8 +9,10 @@ import json import logging import os from pathlib import Path +from typing import Annotated -from pydantic import BaseSettings, NonNegativeInt +from pydantic import Field +from pydantic_settings import BaseSettings logger = logging.getLogger(__name__) @@ -44,16 +46,8 @@ class InfoBloxParams(BaseSettings): password: str -class V4Netmask(NonNegativeInt): - """A valid netmask for an IPv4 network or address.""" - - le = 32 - - -class V6Netmask(NonNegativeInt): - """A valid netmask for an IPv6 network or address.""" - - le = 128 +V4Netmask = Annotated[int, Field(ge=0, le=32)] +V6Netmask = Annotated[int, Field(ge=0, le=128)] class V4NetworkParams(BaseSettings): diff --git a/gso/utils/helpers.py b/gso/utils/helpers.py index 0241e836ef8c9db3bcc750b357e03c65d291914d..1daa2d81a1f0c66debfca6bc4fc35fefa33192bb 100644 --- a/gso/utils/helpers.py +++ b/gso/utils/helpers.py @@ -3,22 +3,20 @@ import ipaddress import re from enum import StrEnum -from ipaddress import IPv4Address from uuid import UUID import pycountry from orchestrator.types import UUIDstr -from pydantic import BaseModel, validator -from pydantic.fields import ModelField +from pydantic import BaseModel, field_validator from pydantic_forms.validators import Choice from gso import settings from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock -from gso.products.product_blocks.site import SiteTier +from gso.products.product_blocks.site import LatitudeCoordinate, LongitudeCoordinate, SiteTier from gso.products.product_types.router import Router from gso.services.netbox_client import NetboxClient from gso.services.subscriptions import get_active_subscriptions_by_field_and_value -from gso.utils.shared_enums import Vendor +from gso.utils.shared_enums import IPv4AddressType, Vendor class LAGMember(BaseModel): @@ -107,7 +105,7 @@ def get_router_vendor(router_id: UUID) -> Vendor: return Router.from_subscription(router_id).router.vendor -def iso_from_ipv4(ipv4_address: IPv4Address) -> str: +def iso_from_ipv4(ipv4_address: IPv4AddressType) -> str: """Calculate an :term:`ISO` address, based on an IPv4 address. :param IPv4Address ipv4_address: The address that's to be converted @@ -157,12 +155,11 @@ def validate_iptrunk_unique_interface(interfaces: list[LAGMember]) -> list[LAGMe return interfaces -def validate_site_fields_is_unique(field_name: str, value: str | int) -> str | int: +def validate_site_fields_is_unique(field_name: str, value: str | int) -> None: """Validate that a site field is unique.""" if len(get_active_subscriptions_by_field_and_value(field_name, str(value))) > 0: msg = f"{field_name} must be unique" raise ValueError(msg) - return value def validate_ipv4_or_ipv6(value: str) -> str: @@ -188,7 +185,7 @@ def validate_country_code(country_code: str) -> str: return country_code -def validate_site_name(site_name: str) -> str: +def validate_site_name(site_name: str) -> None: """Validate the site name. The site name must consist of three uppercase letters, optionally followed by a single digit. @@ -200,7 +197,6 @@ def validate_site_name(site_name: str) -> str: f"digit (0-9). Received: {site_name}" ) raise ValueError(msg) - return site_name class BaseSiteValidatorModel(BaseModel): @@ -210,32 +206,50 @@ class BaseSiteValidatorModel(BaseModel): site_internal_id: int site_tier: SiteTier site_ts_address: str - - @validator("site_ts_address", check_fields=False, allow_reuse=True) + site_country_code: str + site_name: str + site_city: str + site_country: str + site_latitude: LatitudeCoordinate + site_longitude: LongitudeCoordinate + + @field_validator("site_ts_address") def validate_ts_address(cls, site_ts_address: str) -> str: """Validate that a terminal server address is valid.""" validate_ipv4_or_ipv6(site_ts_address) return site_ts_address - @validator("site_country_code", check_fields=False, allow_reuse=True) + @field_validator("site_country_code") def country_code_must_exist(cls, country_code: str) -> str: """Validate that the country code exists.""" validate_country_code(country_code) return country_code - @validator( - "site_ts_address", - "site_internal_id", - "site_bgp_community_id", - "site_name", - check_fields=False, - allow_reuse=True, - ) - def validate_unique_fields(cls, value: str, field: ModelField) -> str | int: + @field_validator("site_ts_address") + def site_ts_address_must_be_unique(cls, site_ts_address: str) -> str: + """Validate that the internal and :term:`BGP` community IDs are unique.""" + validate_site_fields_is_unique("site_ts_address", site_ts_address) + return site_ts_address + + @field_validator("site_internal_id") + def site_internal_id_must_be_unique(cls, site_internal_id: int) -> int: """Validate that the internal and :term:`BGP` community IDs are unique.""" - return validate_site_fields_is_unique(field.name, value) + validate_site_fields_is_unique("site_internal_id", site_internal_id) + return site_internal_id + + @field_validator("site_bgp_community_id") + def site_bgp_community_id_must_be_unique(cls, site_bgp_community_id: int) -> int: + """Validate that the internal and :term:`BGP` community IDs are unique.""" + validate_site_fields_is_unique("site_bgp_community_id", site_bgp_community_id) + return site_bgp_community_id + + @field_validator("site_name") + def site_name_must_be_unique(cls, site_name: str) -> str: + """Validate that the internal and :term:`BGP` community IDs are unique.""" + validate_site_fields_is_unique("site_name", site_name) + return site_name - @validator("site_name", check_fields=False, allow_reuse=True) + @field_validator("site_name") def site_name_must_be_valid(cls, site_name: str) -> str: """Validate the site name. diff --git a/gso/utils/shared_enums.py b/gso/utils/shared_enums.py index c0116e1690d6384cabd9ce16cf1ee79201a0d6b8..07dbe34641ce5abac50105dbb00387f4e12f08e7 100644 --- a/gso/utils/shared_enums.py +++ b/gso/utils/shared_enums.py @@ -1,6 +1,10 @@ """Shared choices for the different models.""" -from pydantic import ConstrainedInt +import ipaddress +from typing import Annotated + +from annotated_types import doc +from pydantic import Field, PlainSerializer from pydantic_forms.types import strEnum @@ -11,14 +15,26 @@ class Vendor(strEnum): NOKIA = "nokia" -class PortNumber(ConstrainedInt): - """Constrained integer for valid port numbers. +PortNumber = Annotated[ + int, + Field( + gt=0, + le=49151, + ), + doc( + "Constrained integer for valid port numbers. The range from 49152 to 65535 is marked as ephemeral, " + "and can therefore not be selected for permanent allocation." +), +] + - The range from 49152 to 65535 is marked as ephemeral, and can therefore not be selected for permanent allocation. - """ +IPv4AddressType = Annotated[ + ipaddress.IPv4Address, PlainSerializer(lambda ip: str(ip), return_type=str, when_used="always") +] - gt = 0 - le = 49151 +IPv6AddressType = Annotated[ + ipaddress.IPv6Address, PlainSerializer(lambda ip: str(ip), return_type=str, when_used="always") +] class ConnectionStrategy(strEnum): diff --git a/gso/worker.py b/gso/worker.py index b2abfe6f5a52192454d3d691ba1715df313fc6ac..b1a3db2c95935e960b699563745f327edc829987 100644 --- a/gso/worker.py +++ b/gso/worker.py @@ -9,7 +9,7 @@ from gso.settings import load_oss_params class OrchestratorCelery(Celery): """A :term:`GSO` instance that functions as a Celery worker.""" - def on_init(self) -> None: # noqa: PLR6301 + def on_init(self) -> None: """Initialise a new Celery worker.""" init_worker_app() diff --git a/gso/workflows/__init__.py b/gso/workflows/__init__.py index 7fd75ed90ce883cee989be5aadd1179a08309982..e10c8b7f415f22cfdc0348ea42b445ba198f3f35 100644 --- a/gso/workflows/__init__.py +++ b/gso/workflows/__init__.py @@ -10,19 +10,17 @@ ALL_ALIVE_STATES: list[str] = [ SubscriptionLifecycle.ACTIVE, ] -WF_USABLE_MAP.update( - { - "redeploy_base_config": [SubscriptionLifecycle.PROVISIONING, SubscriptionLifecycle.ACTIVE], - "update_ibgp_mesh": [SubscriptionLifecycle.PROVISIONING, SubscriptionLifecycle.ACTIVE], - "activate_router": [SubscriptionLifecycle.PROVISIONING], - "deploy_twamp": [SubscriptionLifecycle.PROVISIONING, SubscriptionLifecycle.ACTIVE], - "modify_trunk_interface": [SubscriptionLifecycle.PROVISIONING, SubscriptionLifecycle.ACTIVE], - "activate_iptrunk": [SubscriptionLifecycle.PROVISIONING], - "terminate_site": ALL_ALIVE_STATES, - "terminate_router": ALL_ALIVE_STATES, - "terminate_iptrunk": ALL_ALIVE_STATES, - } -) +WF_USABLE_MAP.update({ + "redeploy_base_config": [SubscriptionLifecycle.PROVISIONING, SubscriptionLifecycle.ACTIVE], + "update_ibgp_mesh": [SubscriptionLifecycle.PROVISIONING, SubscriptionLifecycle.ACTIVE], + "activate_router": [SubscriptionLifecycle.PROVISIONING], + "deploy_twamp": [SubscriptionLifecycle.PROVISIONING, SubscriptionLifecycle.ACTIVE], + "modify_trunk_interface": [SubscriptionLifecycle.PROVISIONING, SubscriptionLifecycle.ACTIVE], + "activate_iptrunk": [SubscriptionLifecycle.PROVISIONING], + "terminate_site": ALL_ALIVE_STATES, + "terminate_router": ALL_ALIVE_STATES, + "terminate_iptrunk": ALL_ALIVE_STATES, +}) # IP trunk workflows LazyWorkflowInstance("gso.workflows.iptrunk.activate_iptrunk", "activate_iptrunk") diff --git a/gso/workflows/iptrunk/activate_iptrunk.py b/gso/workflows/iptrunk/activate_iptrunk.py index f686a8cb7e3c825dceffeb876c644a37342ce3d8..a98a0446c50a4de14b8f55502b633babb7d027af 100644 --- a/gso/workflows/iptrunk/activate_iptrunk.py +++ b/gso/workflows/iptrunk/activate_iptrunk.py @@ -16,7 +16,7 @@ def _initial_input_form(subscription_id: UUIDstr) -> FormGenerator: trunk = Iptrunk.from_subscription(subscription_id) class ActivateTrunkForm(FormPage): - info_label: Label = "Start approval process for IP trunk activation." # type:ignore[assignment] + info_label: Label = "Start approval process for IP trunk activation." user_input = yield ActivateTrunkForm @@ -28,7 +28,7 @@ def verify_complete_checklist() -> FormGenerator: """Show a form for the operator to input a link to the completed checklist.""" class VerifyCompleteForm(FormPage): - info_label: Label = "Verify that the checklist has been completed. Then continue this workflow." # type: ignore[assignment] + info_label: Label = "Verify that the checklist has been completed. Then continue this workflow." checklist_url: str = "" user_input = yield VerifyCompleteForm diff --git a/gso/workflows/iptrunk/create_imported_iptrunk.py b/gso/workflows/iptrunk/create_imported_iptrunk.py index 7e28fe64f4f717ad496914d477a8f902f193d9a6..4ce38227266c4431a80113c91802bb2a67977b21 100644 --- a/gso/workflows/iptrunk/create_imported_iptrunk.py +++ b/gso/workflows/iptrunk/create_imported_iptrunk.py @@ -1,15 +1,18 @@ """A creation workflow for adding an existing IP trunk to the service database.""" import ipaddress +from typing import Annotated from uuid import uuid4 from orchestrator import workflow from orchestrator.forms import FormPage -from orchestrator.forms.validators import Choice, UniqueConstrainedList +from orchestrator.forms.validators import Choice from orchestrator.targets import Target from orchestrator.types import FormGenerator, State, SubscriptionLifecycle from orchestrator.workflow import StepList, done, init, step from orchestrator.workflows.steps import resync, set_status, store_process_subscription +from pydantic import AfterValidator, ConfigDict +from pydantic_forms.validators import validate_unique_list from gso.products import ProductName from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlockInactive, IptrunkType, PhysicalPortCapacity @@ -29,14 +32,16 @@ def _generate_routers() -> dict[str, str]: return routers +LAGMemberList = Annotated[list[LAGMember], AfterValidator(validate_unique_list)] + + def initial_input_form_generator() -> FormGenerator: """Take all information passed to this workflow by the :term:`API` endpoint that was called.""" routers = _generate_routers() router_enum = Choice("Select a router", zip(routers.keys(), routers.items(), strict=True)) # type: ignore[arg-type] class CreateIptrunkForm(FormPage): - class Config: - title = "Import Iptrunk" + model_config = ConfigDict(title="Import Iptrunk") partner: str geant_s_sid: str | None @@ -49,12 +54,12 @@ def initial_input_form_generator() -> FormGenerator: side_a_node_id: router_enum # type: ignore[valid-type] side_a_ae_iface: str side_a_ae_geant_a_sid: str | None - side_a_ae_members: UniqueConstrainedList[LAGMember] + side_a_ae_members: LAGMemberList side_b_node_id: router_enum # type: ignore[valid-type] side_b_ae_iface: str side_b_ae_geant_a_sid: str | None - side_b_ae_members: UniqueConstrainedList[LAGMember] + side_b_ae_members: LAGMemberList iptrunk_ipv4_network: ipaddress.IPv4Network iptrunk_ipv6_network: ipaddress.IPv6Network @@ -139,7 +144,7 @@ def update_ipam_stub_for_subscription( @workflow( "Import iptrunk", initial_input_form=initial_input_form_generator, - target=Target.CREATE, + target=Target.SYSTEM, ) def create_imported_iptrunk() -> StepList: """Import an IP trunk without provisioning it.""" diff --git a/gso/workflows/iptrunk/create_iptrunk.py b/gso/workflows/iptrunk/create_iptrunk.py index fed3ab25b20b27a205481ca9fc685528670284d3..99e9f24879ea22c2261ac0fe369859269994710e 100644 --- a/gso/workflows/iptrunk/create_iptrunk.py +++ b/gso/workflows/iptrunk/create_iptrunk.py @@ -1,19 +1,21 @@ """A creation workflow that deploys a new IP trunk service.""" import json +from typing import Annotated from uuid import uuid4 +from annotated_types import Len from orchestrator.config.assignee import Assignee from orchestrator.forms import FormPage -from orchestrator.forms.validators import Choice, Label, UniqueConstrainedList +from orchestrator.forms.validators import Choice, Label from orchestrator.targets import Target from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr from orchestrator.utils.json import json_dumps from orchestrator.workflow import StepList, conditional, done, init, inputstep, step, workflow from orchestrator.workflows.steps import resync, set_status, store_process_subscription from orchestrator.workflows.utils import wrap_create_initial_input_form -from pydantic import validator -from pydantic_forms.core import ReadOnlyField +from pydantic import AfterValidator, ConfigDict, field_validator +from pydantic_forms.validators import ReadOnlyField, validate_unique_list from pynetbox.models.dcim import Interfaces from gso.products.product_blocks.iptrunk import ( @@ -52,18 +54,17 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: routers[str(router["subscription_id"])] = router["description"] class CreateIptrunkForm(FormPage): - class Config: - title = product_name + model_config = ConfigDict(title=product_name) tt_number: str - partner: str = ReadOnlyField("GEANT") + partner: ReadOnlyField("GEANT", default_type=str) # type: ignore[valid-type] geant_s_sid: str | None iptrunk_description: str iptrunk_type: IptrunkType iptrunk_speed: PhysicalPortCapacity iptrunk_number_of_members: int - @validator("tt_number", allow_reuse=True) + @field_validator("tt_number") def validate_tt_number(cls, tt_number: str) -> str: return validate_tt_number(tt_number) @@ -71,20 +72,19 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: class VerifyMinimumLinksForm(FormPage): info_label: Label = ( - f"This is the calculated minimum-links for this LAG: " f"{initial_user_input.iptrunk_number_of_members - 1}" # type: ignore[assignment] + f"This is the calculated minimum-links for this LAG: " f"{initial_user_input.iptrunk_number_of_members - 1}" ) - info_label2: Label = "Please confirm or modify." # type: ignore[assignment] + info_label2: Label = "Please confirm or modify." yield VerifyMinimumLinksForm router_enum_a = Choice("Select a router", zip(routers.keys(), routers.items(), strict=True)) # type: ignore[arg-type] class SelectRouterSideA(FormPage): - class Config: - title = "Select a router for side A of the trunk." + model_config = ConfigDict(title="Select a router for side A of the trunk.") side_a_node_id: router_enum_a # type: ignore[valid-type] - @validator("side_a_node_id", allow_reuse=True) + @field_validator("side_a_node_id") def validate_device_exists_in_netbox(cls, side_a_node_id: UUIDstr) -> str | None: return validate_router_in_netbox(side_a_node_id) @@ -92,9 +92,14 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: router_a = user_input_router_side_a.side_a_node_id.name router_a_fqdn = Router.from_subscription(router_a).router.router_fqdn - class JuniperAeMembers(UniqueConstrainedList[LAGMember]): - min_items = initial_user_input.iptrunk_number_of_members - max_items = initial_user_input.iptrunk_number_of_members + juniper_ae_members = Annotated[ + list[LAGMember], + AfterValidator(validate_unique_list), + Len( + min_length=initial_user_input.iptrunk_number_of_members, + max_length=initial_user_input.iptrunk_number_of_members, + ), + ] if get_router_vendor(router_a) == Vendor.NOKIA: @@ -104,23 +109,25 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: initial_user_input.iptrunk_speed, ) - class NokiaAeMembersA(UniqueConstrainedList[NokiaLAGMemberA]): - min_items = initial_user_input.iptrunk_number_of_members - max_items = initial_user_input.iptrunk_number_of_members - - ae_members_side_a = NokiaAeMembersA + ae_members_side_a_type = Annotated[ + list[NokiaLAGMemberA], + AfterValidator(validate_unique_list), + Len( + min_length=initial_user_input.iptrunk_number_of_members, + max_length=initial_user_input.iptrunk_number_of_members, + ), + ] else: - ae_members_side_a = JuniperAeMembers # type: ignore[assignment] + ae_members_side_a_type = juniper_ae_members # type: ignore[assignment, misc] class CreateIptrunkSideAForm(FormPage): - class Config: - title = f"Provide subscription details for side A of the trunk.({router_a_fqdn})" + model_config = ConfigDict(title=f"Provide subscription details for side A of the trunk.({router_a_fqdn})") side_a_ae_iface: available_lags_choices(router_a) or str # type: ignore[valid-type] side_a_ae_geant_a_sid: str | None - side_a_ae_members: ae_members_side_a # type: ignore[valid-type] + side_a_ae_members: ae_members_side_a_type - @validator("side_a_ae_members", allow_reuse=True) + @field_validator("side_a_ae_members") def validate_side_a_ae_members(cls, side_a_ae_members: list[LAGMember]) -> list[LAGMember]: validate_iptrunk_unique_interface(side_a_ae_members) vendor = get_router_vendor(router_a) @@ -133,12 +140,11 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: router_enum_b = Choice("Select a router", zip(routers.keys(), routers.items(), strict=True)) # type: ignore[arg-type] class SelectRouterSideB(FormPage): - class Config: - title = "Select a router for side B of the trunk." + model_config = ConfigDict(title="Select a router for side B of the trunk.") side_b_node_id: router_enum_b # type: ignore[valid-type] - @validator("side_b_node_id", allow_reuse=True) + @field_validator("side_b_node_id") def validate_device_exists_in_netbox(cls, side_b_node_id: UUIDstr) -> str | None: return validate_router_in_netbox(side_b_node_id) @@ -154,24 +160,24 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: initial_user_input.iptrunk_speed, ) - class NokiaAeMembersB(UniqueConstrainedList): - min_items = len(user_input_side_a.side_a_ae_members) - max_items = len(user_input_side_a.side_a_ae_members) - item_type = NokiaLAGMemberB - - ae_members_side_b = NokiaAeMembersB + ae_members_side_b = Annotated[ + list[NokiaLAGMemberB], + AfterValidator(validate_unique_list), + Len( + min_length=len(user_input_side_a.side_a_ae_members), max_length=len(user_input_side_a.side_a_ae_members) + ), + ] else: - ae_members_side_b = JuniperAeMembers # type: ignore[assignment] + ae_members_side_b = juniper_ae_members # type: ignore[assignment, misc] class CreateIptrunkSideBForm(FormPage): - class Config: - title = f"Provide subscription details for side B of the trunk.({router_b_fqdn})" + model_config = ConfigDict(title=f"Provide subscription details for side B of the trunk.({router_b_fqdn})") side_b_ae_iface: available_lags_choices(router_b) or str # type: ignore[valid-type] side_b_ae_geant_a_sid: str | None - side_b_ae_members: ae_members_side_b # type: ignore[valid-type] + side_b_ae_members: ae_members_side_b - @validator("side_b_ae_members", allow_reuse=True) + @field_validator("side_b_ae_members") def validate_side_b_ae_members(cls, side_b_ae_members: list[LAGMember]) -> list[LAGMember]: validate_iptrunk_unique_interface(side_b_ae_members) vendor = get_router_vendor(router_b) @@ -331,7 +337,7 @@ def check_ip_trunk_connectivity( execute_playbook( playbook_name="iptrunks_checks.yaml", callback_route=callback_route, - inventory=subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn, + inventory=subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn, # type: ignore[arg-type] extra_vars=extra_vars, ) @@ -405,7 +411,7 @@ def check_ip_trunk_isis( execute_playbook( playbook_name="iptrunks_checks.yaml", callback_route=callback_route, - inventory=subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn, + inventory=subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn, # type: ignore[arg-type] extra_vars=extra_vars, ) @@ -436,9 +442,9 @@ def reserve_interfaces_in_netbox(subscription: IptrunkInactive) -> State: if get_router_vendor(trunk_side.iptrunk_side_node.owner_subscription_id) == Vendor.NOKIA: # Create :term:`LAG` interfaces lag_interface: Interfaces = nbclient.create_interface( - iface_name=trunk_side.iptrunk_side_ae_iface, + iface_name=trunk_side.iptrunk_side_ae_iface, # type: ignore[arg-type] interface_type="lag", - device_name=trunk_side.iptrunk_side_node.router_fqdn, + device_name=trunk_side.iptrunk_side_node.router_fqdn, # type: ignore[arg-type] description=str(subscription.subscription_id), enabled=True, ) @@ -447,14 +453,14 @@ def reserve_interfaces_in_netbox(subscription: IptrunkInactive) -> State: # Reserve interfaces for interface in trunk_side.iptrunk_side_ae_members: nbclient.attach_interface_to_lag( - device_name=trunk_side.iptrunk_side_node.router_fqdn, + device_name=trunk_side.iptrunk_side_node.router_fqdn, # type: ignore[arg-type] lag_name=lag_interface.name, - iface_name=interface.interface_name, + iface_name=interface.interface_name, # type: ignore[arg-type] description=str(subscription.subscription_id), ) nbclient.reserve_interface( - device_name=trunk_side.iptrunk_side_node.router_fqdn, - iface_name=interface.interface_name, + device_name=trunk_side.iptrunk_side_node.router_fqdn, # type: ignore[arg-type] + iface_name=interface.interface_name, # type: ignore[arg-type] ) return { "subscription": subscription, @@ -490,15 +496,14 @@ def prompt_start_new_checklist(subscription: IptrunkProvisioning) -> FormGenerat oss_params = load_oss_params() class SharepointPrompt(FormPage): - class Config: - title = "Start new checklist" + model_config = ConfigDict(title="Start new checklist") info_label_1: Label = ( - f"Visit {oss_params.SHAREPOINT.checklist_site_url} and start a new Sharepoint checklist for an IPtrunk " # type: ignore[assignment] + f"Visit {oss_params.SHAREPOINT.checklist_site_url} and start a new Sharepoint checklist for an IPtrunk " f"from {subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn} to " f"{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}." ) - info_label_2: Label = "Once this is done, click proceed to finish the workflow." # type: ignore[assignment] + info_label_2: Label = "Once this is done, click proceed to finish the workflow." yield SharepointPrompt diff --git a/gso/workflows/iptrunk/deploy_twamp.py b/gso/workflows/iptrunk/deploy_twamp.py index 64483e60b8cae8717ad553c3f03bb03651fa6299..c8342f9a44e996ede5a30de69b06faabf846bc14 100644 --- a/gso/workflows/iptrunk/deploy_twamp.py +++ b/gso/workflows/iptrunk/deploy_twamp.py @@ -10,7 +10,7 @@ from orchestrator.utils.json import json_dumps from orchestrator.workflow import StepList, done, init, step, workflow from orchestrator.workflows.steps import resync, store_process_subscription, unsync from orchestrator.workflows.utils import wrap_modify_initial_input_form -from pydantic import validator +from pydantic import field_validator from gso.products.product_types.iptrunk import Iptrunk from gso.services.lso_client import execute_playbook, lso_interaction @@ -24,11 +24,11 @@ def _initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: info_label: Label = ( "Please confirm deployment of TWAMP on IP trunk from " f"{trunk.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn} to " - f"{trunk.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}" # type: ignore[assignment] + f"{trunk.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}" ) tt_number: str - @validator("tt_number", allow_reuse=True) + @field_validator("tt_number") def validate_tt_number(cls, tt_number: str) -> str: return validate_tt_number(tt_number) diff --git a/gso/workflows/iptrunk/migrate_iptrunk.py b/gso/workflows/iptrunk/migrate_iptrunk.py index 96ee4abb3ea30356ae701c90c22f2db27669c949..16dc3f74a1eec22146481872a7627ad0fede7d32 100644 --- a/gso/workflows/iptrunk/migrate_iptrunk.py +++ b/gso/workflows/iptrunk/migrate_iptrunk.py @@ -7,20 +7,22 @@ configured to run from A to C. B is then no longer associated with this IP trunk import copy import json import re +from typing import Annotated from uuid import uuid4 +from annotated_types import Len from orchestrator import step, workflow from orchestrator.config.assignee import Assignee from orchestrator.forms import FormPage -from orchestrator.forms.validators import Choice, Label, UniqueConstrainedList +from orchestrator.forms.validators import Choice, Label from orchestrator.targets import Target from orchestrator.types import FormGenerator, State, UUIDstr from orchestrator.utils.json import json_dumps from orchestrator.workflow import StepList, conditional, done, init, inputstep from orchestrator.workflows.steps import resync, store_process_subscription, unsync from orchestrator.workflows.utils import wrap_modify_initial_input_form -from pydantic import validator -from pydantic_forms.core import ReadOnlyField +from pydantic import AfterValidator, ConfigDict, field_validator +from pydantic_forms.validators import ReadOnlyField, validate_unique_list from pynetbox.models.dcim import Interfaces from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock @@ -61,16 +63,15 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: ) class IPTrunkMigrateForm(FormPage): - class Config: - title = form_title + model_config = ConfigDict(title=form_title) tt_number: str replace_side: replaced_side_enum # type: ignore[valid-type] - warning_label: Label = "Are we moving to a different Site?" # type: ignore[assignment] + warning_label: Label = "Are we moving to a different Site?" migrate_to_different_site: bool = False restore_isis_metric: bool = True - @validator("tt_number", allow_reuse=True, always=True) + @field_validator("tt_number", mode="before") def validate_tt_number(cls, tt_number: str) -> str: return validate_tt_number(tt_number) @@ -98,8 +99,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: new_router_enum = Choice("Select a new router", zip(routers.keys(), routers.items(), strict=True)) # type: ignore[arg-type] class NewSideIPTrunkRouterForm(FormPage): - class Config: - title = form_title + model_config = ConfigDict(title=form_title) new_node: new_router_enum # type: ignore[valid-type] @@ -116,18 +116,23 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: subscription.iptrunk.iptrunk_speed, ) - class NokiaAeMembers(UniqueConstrainedList[NokiaLAGMember]): - min_items = len(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members) - max_items = len(subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_members) - - ae_members = NokiaAeMembers + ae_members = Annotated[ + list[NokiaLAGMember], + AfterValidator(validate_unique_list), + Len( + min_length=len(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members), + max_length=len(subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_members), + ), + ] else: - - class JuniperLagMember(UniqueConstrainedList[LAGMember]): - min_items = len(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members) - max_items = len(subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_members) - - ae_members = JuniperLagMember # type: ignore[assignment] + ae_members = Annotated[ # type: ignore[assignment, misc] + list[LAGMember], + AfterValidator(validate_unique_list), + Len( + min_length=len(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members), + max_length=len(subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_members), + ), + ] replace_index = ( 0 @@ -136,22 +141,21 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: else 1 ) existing_lag_ae_members = [ - { - "interface_name": iface.interface_name, - "interface_description": iface.interface_description, - } + LAGMember( + interface_name=iface.interface_name, + interface_description=iface.interface_description, + ) for iface in subscription.iptrunk.iptrunk_sides[replace_index].iptrunk_side_ae_members ] class NewSideIPTrunkForm(FormPage): - class Config: - title = form_title + model_config = ConfigDict(title=form_title) new_lag_interface: side_a_ae_iface # type: ignore[valid-type] - existing_lag_interface: list[LAGMember] = ReadOnlyField(existing_lag_ae_members) - new_lag_member_interfaces: ae_members # type: ignore[valid-type] + existing_lag_interface: ReadOnlyField(existing_lag_ae_members, default_type=list[LAGMember]) # type: ignore[valid-type] + new_lag_member_interfaces: ae_members - @validator("new_lag_interface", allow_reuse=True, pre=True, always=True) + @field_validator("new_lag_interface") def lag_interface_proper_name(cls, new_lag_interface: str) -> str: if get_router_vendor(new_router) == Vendor.JUNIPER: juniper_lag_re = re.compile("^ae\\d{1,2}$") @@ -160,7 +164,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: raise ValueError(msg) return new_lag_interface - @validator("new_lag_member_interfaces", allow_reuse=True) + @field_validator("new_lag_member_interfaces") def is_interface_names_valid_juniper(cls, new_lag_member_interfaces: list[LAGMember]) -> list[LAGMember]: vendor = get_router_vendor(new_router) return validate_interface_name_list(new_lag_member_interfaces, vendor) @@ -392,10 +396,9 @@ def confirm_continue_move_fiber() -> FormGenerator: """Wait for confirmation from an operator that the physical fiber has been moved.""" class ProvisioningResultPage(FormPage): - class Config: - title = "Please confirm before continuing" + model_config = ConfigDict(title="Please confirm before continuing") - info_label: Label = "New trunk interface has been deployed, wait for the physical connection to be moved." # type: ignore[assignment] + info_label: Label = "New trunk interface has been deployed, wait for the physical connection to be moved." yield ProvisioningResultPage @@ -482,10 +485,9 @@ def confirm_continue_restore_isis() -> FormGenerator: """Wait for an operator to confirm that the old :term:`ISIS` metric should be restored.""" class ProvisioningResultPage(FormPage): - class Config: - title = "Please confirm before continuing" + model_config = ConfigDict(title="Please confirm before continuing") - info_label: Label = "ISIS config has been deployed, confirm if you want to restore the old metric." # type: ignore[assignment] + info_label: Label = "ISIS config has been deployed, confirm if you want to restore the old metric." yield ProvisioningResultPage diff --git a/gso/workflows/iptrunk/modify_trunk_interface.py b/gso/workflows/iptrunk/modify_trunk_interface.py index 27111c62cbba9a1a92c173a035a3df163ac29d3f..234a9dfc22f5f8ca56e70022eba234517ef6a861 100644 --- a/gso/workflows/iptrunk/modify_trunk_interface.py +++ b/gso/workflows/iptrunk/modify_trunk_interface.py @@ -1,19 +1,19 @@ """A modification workflow that updates the :term:`LAG` interfaces that are part of an existing IP trunk.""" -import ipaddress import json +from typing import Annotated, TypeVar from uuid import UUID, uuid4 -from orchestrator.forms import FormPage, ReadOnlyField -from orchestrator.forms.validators import UniqueConstrainedList +from annotated_types import Len +from orchestrator.forms import FormPage from orchestrator.targets import Target from orchestrator.types import FormGenerator, State, UUIDstr from orchestrator.utils.json import json_dumps from orchestrator.workflow import StepList, conditional, done, init, step, workflow from orchestrator.workflows.steps import resync, store_process_subscription, unsync from orchestrator.workflows.utils import wrap_modify_initial_input_form -from pydantic import validator -from pydantic_forms.validators import Label +from pydantic import AfterValidator, ConfigDict, field_validator +from pydantic_forms.validators import Label, ReadOnlyField, validate_unique_list from gso.products.product_blocks.iptrunk import ( IptrunkInterfaceBlock, @@ -33,10 +33,14 @@ from gso.utils.helpers import ( validate_iptrunk_unique_interface, validate_tt_number, ) -from gso.utils.shared_enums import Vendor +from gso.utils.shared_enums import IPv4AddressType, IPv6AddressType, Vendor +T = TypeVar("T", bound=LAGMember) -def initialize_ae_members(subscription: Iptrunk, initial_user_input: dict, side_index: int) -> type[LAGMember]: + +def initialize_ae_members( + subscription: Iptrunk, initial_user_input: dict, side_index: int +) -> Annotated[list[LAGMember], ""]: """Initialize the list of AE members.""" router = subscription.iptrunk.iptrunk_sides[side_index].iptrunk_side_node router_vendor = get_router_vendor(router.owner_subscription_id) @@ -60,19 +64,17 @@ def initialize_ae_members(subscription: Iptrunk, initial_user_input: dict, side_ ) ) - class NokiaAeMembers(UniqueConstrainedList[NokiaLAGMember]): - min_items = iptrunk_number_of_members - max_items = iptrunk_number_of_members - - ae_members = NokiaAeMembers - else: + return Annotated[ + list[NokiaLAGMember], + AfterValidator(validate_unique_list), + Len(min_length=iptrunk_number_of_members, max_length=iptrunk_number_of_members), + ] # type: ignore[return-value] - class JuniperAeMembers(UniqueConstrainedList[LAGMember]): - min_items = iptrunk_number_of_members - max_items = iptrunk_number_of_members - - ae_members = JuniperAeMembers # type: ignore[assignment] - return ae_members # type: ignore[return-value] + return Annotated[ + list[LAGMember], + AfterValidator(validate_unique_list), + Len(min_length=iptrunk_number_of_members, max_length=iptrunk_number_of_members), + ] # type: ignore[return-value] def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: @@ -86,15 +88,19 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: iptrunk_type: IptrunkType = subscription.iptrunk.iptrunk_type warning_label: Label = ( "Changing the PhyPortCapacity will result in the deletion of all AE members. " - "You will need to add the new AE members in the next steps." # type: ignore[assignment] + "You will need to add the new AE members in the next steps." ) iptrunk_speed: PhysicalPortCapacity = subscription.iptrunk.iptrunk_speed iptrunk_number_of_members: int = subscription.iptrunk.iptrunk_minimum_links + 1 - iptrunk_isis_metric: int = ReadOnlyField(subscription.iptrunk.iptrunk_isis_metric) - iptrunk_ipv4_network: ipaddress.IPv4Network = ReadOnlyField(subscription.iptrunk.iptrunk_ipv4_network) - iptrunk_ipv6_network: ipaddress.IPv6Network = ReadOnlyField(subscription.iptrunk.iptrunk_ipv6_network) + iptrunk_isis_metric: ReadOnlyField(subscription.iptrunk.iptrunk_isis_metric, default_type=int) # type: ignore[valid-type] + iptrunk_ipv4_network: ReadOnlyField( # type: ignore[valid-type] + str(subscription.iptrunk.iptrunk_ipv4_network), default_type=IPv4AddressType + ) + iptrunk_ipv6_network: ReadOnlyField( # type: ignore[valid-type] + str(subscription.iptrunk.iptrunk_ipv6_network), default_type=IPv6AddressType + ) - @validator("tt_number", allow_reuse=True) + @field_validator("tt_number") def validate_tt_number(cls, tt_number: str) -> str: return validate_tt_number(tt_number) @@ -102,19 +108,20 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: class VerifyMinimumLinksForm(FormPage): info_label: Label = ( - f"This is the calculated minimum-links for this LAG: " f"{initial_user_input.iptrunk_number_of_members - 1}" # type: ignore[assignment] + f"This is the calculated minimum-links for this LAG: " f"{initial_user_input.iptrunk_number_of_members - 1}" ) - info_label2: Label = "Please confirm or modify." # type: ignore[assignment] + info_label2: Label = "Please confirm or modify." yield VerifyMinimumLinksForm ae_members_side_a = initialize_ae_members(subscription, initial_user_input.dict(), 0) class ModifyIptrunkSideAForm(FormPage): - class Config: - title = "Provide subscription details for side A of the trunk." + model_config = ConfigDict(title="Provide subscription details for side A of the trunk.") - side_a_node: str = ReadOnlyField(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn) - side_a_ae_iface: str = ReadOnlyField(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_iface) + side_a_node: ReadOnlyField( # type: ignore[valid-type] + subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn, default_type=str + ) + side_a_ae_iface: ReadOnlyField(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_iface, default_type=str) # type: ignore[valid-type] side_a_ae_geant_a_sid: str | None = subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_geant_a_sid side_a_ae_members: ae_members_side_a = ( # type: ignore[valid-type] subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members @@ -122,11 +129,11 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: else [] ) - @validator("side_a_ae_members", allow_reuse=True) + @field_validator("side_a_ae_members") def validate_iptrunk_unique_interface_side_a(cls, side_a_ae_members: list[LAGMember]) -> list[LAGMember]: return validate_iptrunk_unique_interface(side_a_ae_members) - @validator("side_a_ae_members", allow_reuse=True) + @field_validator("side_a_ae_members") def validate_interface_name_members(cls, side_a_ae_members: list[LAGMember]) -> list[LAGMember]: vendor = subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.vendor return validate_interface_name_list(side_a_ae_members, vendor) @@ -135,11 +142,12 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: ae_members_side_b = initialize_ae_members(subscription, initial_user_input.dict(), 1) class ModifyIptrunkSideBForm(FormPage): - class Config: - title = "Provide subscription details for side B of the trunk." + model_config = ConfigDict(title="Provide subscription details for side B of the trunk.") - side_b_node: str = ReadOnlyField(subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn) - side_b_ae_iface: str = ReadOnlyField(subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_iface) + side_b_node: ReadOnlyField( # type: ignore[valid-type] + subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn, default_type=str + ) + side_b_ae_iface: ReadOnlyField(subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_iface, default_type=str) # type: ignore[valid-type] side_b_ae_geant_a_sid: str | None = subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_geant_a_sid side_b_ae_members: ae_members_side_b = ( # type: ignore[valid-type] subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_members @@ -147,11 +155,11 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: else [] ) - @validator("side_b_ae_members", allow_reuse=True) + @field_validator("side_b_ae_members") def validate_iptrunk_unique_interface_side_b(cls, side_b_ae_members: list[LAGMember]) -> list[LAGMember]: return validate_iptrunk_unique_interface(side_b_ae_members) - @validator("side_b_ae_members", allow_reuse=True) + @field_validator("side_b_ae_members") def validate_interface_name_members(cls, side_b_ae_members: list[LAGMember]) -> list[LAGMember]: vendor = subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.vendor return validate_interface_name_list(side_b_ae_members, vendor) diff --git a/gso/workflows/iptrunk/terminate_iptrunk.py b/gso/workflows/iptrunk/terminate_iptrunk.py index a2cb6727215a9f6184859c548dba7c3162127559..1ae61b80cd486d46f0f0fbea139868b01dc96d15 100644 --- a/gso/workflows/iptrunk/terminate_iptrunk.py +++ b/gso/workflows/iptrunk/terminate_iptrunk.py @@ -16,7 +16,7 @@ from orchestrator.workflows.steps import ( unsync, ) from orchestrator.workflows.utils import wrap_modify_initial_input_form -from pydantic import validator +from pydantic import field_validator from gso.products.product_blocks.iptrunk import IptrunkSideBlock from gso.products.product_types.iptrunk import Iptrunk @@ -35,18 +35,18 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: class TerminateForm(FormPage): if iptrunk.status == SubscriptionLifecycle.INITIAL: info_label_2: Label = ( - "This will immediately mark the subscription as terminated, preventing any other workflows from " # type:ignore[assignment] + "This will immediately mark the subscription as terminated, preventing any other workflows from " "interacting with this product subscription." ) - info_label_3: Label = "ONLY EXECUTE THIS WORKFLOW WHEN YOU ARE ABSOLUTELY SURE WHAT YOU ARE DOING." # type:ignore[assignment] + info_label_3: Label = "ONLY EXECUTE THIS WORKFLOW WHEN YOU ARE ABSOLUTELY SURE WHAT YOU ARE DOING." tt_number: str termination_label: Label = ( - "Please confirm whether configuration should get removed from the A and B sides of the trunk." # type: ignore[assignment] + "Please confirm whether configuration should get removed from the A and B sides of the trunk." ) remove_configuration: bool = True - @validator("tt_number", allow_reuse=True) + @field_validator("tt_number") def validate_tt_number(cls, tt_number: str) -> str: return validate_tt_number(tt_number) diff --git a/gso/workflows/office_router/create_imported_office_router.py b/gso/workflows/office_router/create_imported_office_router.py index 7c1a1f19378d65263c93e23d5755574d26434b98..e1cda4cdd924563c5eb3495997dce651fd1fd724 100644 --- a/gso/workflows/office_router/create_imported_office_router.py +++ b/gso/workflows/office_router/create_imported_office_router.py @@ -1,20 +1,19 @@ """A creation workflow that adds existing office routers to the coreDB.""" -import ipaddress - from orchestrator import workflow from orchestrator.forms import FormPage from orchestrator.targets import Target from orchestrator.types import FormGenerator, State, SubscriptionLifecycle from orchestrator.workflow import StepList, done, init, step from orchestrator.workflows.steps import resync, set_status, store_process_subscription +from pydantic import ConfigDict from gso.products import ProductName from gso.products.product_types.office_router import ImportedOfficeRouterInactive from gso.services import subscriptions from gso.services.partners import get_partner_by_name from gso.services.subscriptions import get_site_by_name -from gso.utils.shared_enums import PortNumber, Vendor +from gso.utils.shared_enums import IPv4AddressType, IPv6AddressType, PortNumber, Vendor @step("Create subscription") @@ -34,15 +33,14 @@ def initial_input_form_generator() -> FormGenerator: """Generate a form that is filled in using information passed through the :term:`API` endpoint.""" class ImportOfficeRouter(FormPage): - class Config: - title = "Import an office router" + model_config = ConfigDict(title="Import an office router") partner: str office_router_site: str office_router_fqdn: str office_router_ts_port: PortNumber - office_router_lo_ipv4_address: ipaddress.IPv4Address - office_router_lo_ipv6_address: ipaddress.IPv6Address + office_router_lo_ipv4_address: IPv4AddressType + office_router_lo_ipv6_address: IPv6AddressType user_input = yield ImportOfficeRouter @@ -55,8 +53,8 @@ def initialize_subscription( office_router_fqdn: str, office_router_ts_port: PortNumber, office_router_site: str, - office_router_lo_ipv4_address: ipaddress.IPv4Address | None = None, - office_router_lo_ipv6_address: ipaddress.IPv6Address | None = None, + office_router_lo_ipv4_address: IPv4AddressType | None = None, + office_router_lo_ipv6_address: IPv6AddressType | None = None, ) -> State: """Initialise the office router subscription using input data.""" subscription.office_router.office_router_ts_port = office_router_ts_port diff --git a/gso/workflows/router/activate_router.py b/gso/workflows/router/activate_router.py index 4de880f4e2b8a9cc13b7c1c80315fef634e577c9..b742c58cd5476685fc6ea8199fbcfa96cf865527 100644 --- a/gso/workflows/router/activate_router.py +++ b/gso/workflows/router/activate_router.py @@ -16,7 +16,7 @@ def _initial_input_form(subscription_id: UUIDstr) -> FormGenerator: router = Router.from_subscription(subscription_id) class ActivateRouterForm(FormPage): - info_label: Label = "Start approval process for router activation." # type:ignore[assignment] + info_label: Label = "Start approval process for router activation." user_input = yield ActivateRouterForm @@ -28,7 +28,7 @@ def verify_complete_checklist() -> FormGenerator: """Show a form for the operator to input a link to the completed checklist.""" class VerifyCompleteForm(FormPage): - info_label: Label = "Verify that the checklist has been completed. Then continue this workflow." # type: ignore[assignment] + info_label: Label = "Verify that the checklist has been completed. Then continue this workflow." checklist_url: str = "" user_input = yield VerifyCompleteForm diff --git a/gso/workflows/router/create_imported_router.py b/gso/workflows/router/create_imported_router.py index a71a7a18a6ca45909d2ffe80c410f3672e0771a7..d7c7e45c9a69bbef6cea90dc18a46a8136d641c8 100644 --- a/gso/workflows/router/create_imported_router.py +++ b/gso/workflows/router/create_imported_router.py @@ -1,13 +1,12 @@ """A creation workflow that adds an existing router to the service database.""" -import ipaddress - from orchestrator import workflow from orchestrator.forms import FormPage from orchestrator.targets import Target from orchestrator.types import FormGenerator, State, SubscriptionLifecycle from orchestrator.workflow import StepList, done, init, step from orchestrator.workflows.steps import resync, set_status, store_process_subscription +from pydantic import ConfigDict from gso.products import ProductName from gso.products.product_blocks.router import RouterRole @@ -15,7 +14,7 @@ from gso.products.product_types.router import ImportedRouterInactive from gso.services.partners import get_partner_by_name from gso.services.subscriptions import get_product_id_by_name, get_site_by_name from gso.utils.helpers import generate_fqdn -from gso.utils.shared_enums import PortNumber, Vendor +from gso.utils.shared_enums import IPv4AddressType, IPv6AddressType, PortNumber, Vendor @step("Create subscription") @@ -35,8 +34,7 @@ def initial_input_form_generator() -> FormGenerator: """Generate a form that is filled in using information passed through the :term:`API` endpoint.""" class ImportRouter(FormPage): - class Config: - title = "Import Router" + model_config = ConfigDict(title="Import Router") partner: str router_site: str @@ -44,8 +42,8 @@ def initial_input_form_generator() -> FormGenerator: ts_port: int router_vendor: Vendor router_role: RouterRole - router_lo_ipv4_address: ipaddress.IPv4Address - router_lo_ipv6_address: ipaddress.IPv6Address + router_lo_ipv4_address: IPv4AddressType + router_lo_ipv6_address: IPv6AddressType router_lo_iso_address: str user_input = yield ImportRouter @@ -61,8 +59,8 @@ def initialize_subscription( router_site: str, router_role: RouterRole, router_vendor: Vendor, - router_lo_ipv4_address: ipaddress.IPv4Address | None = None, - router_lo_ipv6_address: ipaddress.IPv6Address | None = None, + router_lo_ipv4_address: IPv4AddressType | None = None, + router_lo_ipv6_address: IPv6AddressType | None = None, router_lo_iso_address: str | None = None, ) -> State: """Initialise the router subscription using input data.""" @@ -85,7 +83,7 @@ def initialize_subscription( @workflow( "Import router", initial_input_form=initial_input_form_generator, - target=Target.CREATE, + target=Target.SYSTEM, ) def create_imported_router() -> StepList: """Import a router without provisioning it.""" diff --git a/gso/workflows/router/create_router.py b/gso/workflows/router/create_router.py index 289f201bd45b3064ad352c733a7c81aa1b7b7462..dd9b2ee9f4b877361e09dc56a8742e0efff69a96 100644 --- a/gso/workflows/router/create_router.py +++ b/gso/workflows/router/create_router.py @@ -1,6 +1,6 @@ """A creation workflow for adding a new router to the network.""" -from typing import Any +from typing import Self from orchestrator.config.assignee import Assignee from orchestrator.forms import FormPage @@ -10,8 +10,8 @@ from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUID from orchestrator.workflow import StepList, conditional, done, init, inputstep, step, workflow from orchestrator.workflows.steps import resync, set_status, store_process_subscription from orchestrator.workflows.utils import wrap_create_initial_input_form -from pydantic import validator -from pydantic_forms.core import ReadOnlyField +from pydantic import ConfigDict, model_validator +from pydantic_forms.validators import ReadOnlyField from gso.products.product_blocks.router import RouterRole from gso.products.product_types.router import RouterInactive, RouterProvisioning @@ -39,31 +39,30 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: """Gather information about the new router from the operator.""" class CreateRouterForm(FormPage): - class Config: - title = product_name + model_config = ConfigDict(title=product_name) tt_number: str - partner: str = ReadOnlyField("GEANT") + partner: ReadOnlyField("GEANT", default_type=str) # type: ignore[valid-type] vendor: Vendor router_site: _site_selector() # type: ignore[valid-type] hostname: str ts_port: PortNumber router_role: RouterRole - @validator("hostname", allow_reuse=True) - def hostname_must_be_available(cls, hostname: str, **kwargs: dict[str, Any]) -> str: - router_site = kwargs["values"].get("router_site") - if not router_site: + @model_validator(mode="after") + def hostname_must_be_available(self) -> Self: + router_site = self.router_site + if not router_site: # TODO Test on UI msg = "Please select a site before setting the hostname." raise ValueError(msg) selected_site = Site.from_subscription(router_site).site - input_fqdn = generate_fqdn(hostname, selected_site.site_name, selected_site.site_country_code) + input_fqdn = generate_fqdn(self.hostname, selected_site.site_name, selected_site.site_country_code) if not infoblox.hostname_available(f"lo0.{input_fqdn}"): msg = f'FQDN "{input_fqdn}" is not available.' raise ValueError(msg) - return hostname + return self user_input = yield CreateRouterForm @@ -152,18 +151,17 @@ def prompt_reboot_router(subscription: RouterInactive) -> FormGenerator: """Wait for confirmation from an operator that the router has been rebooted.""" class RebootPrompt(FormPage): - class Config: - title = "Please reboot before continuing" + model_config = ConfigDict(title="Please reboot before continuing") if subscription.router.router_site and subscription.router.router_site.site_ts_address: info_label_1: Label = ( - f"Base config has been deployed. Please log in via the console using https://" # type: ignore[assignment] + f"Base config has been deployed. Please log in via the console using https://" f"{subscription.router.router_site.site_ts_address}." ) else: - info_label_1 = "Base config has been deployed. Please log in via the console." # type: ignore[assignment] + info_label_1 = "Base config has been deployed. Please log in via the console." - info_label_2: Label = "Reboot the router, and once it is up again, press submit to continue the workflow." # type: ignore[assignment] + info_label_2: Label = "Reboot the router, and once it is up again, press submit to continue the workflow." yield RebootPrompt @@ -175,13 +173,12 @@ def prompt_console_login() -> FormGenerator: """Wait for confirmation from an operator that the router can be logged into.""" class ConsolePrompt(FormPage): - class Config: - title = "Verify local authentication" + model_config = ConfigDict(title="Verify local authentication") info_label_1: Label = ( - "Verify that you are able to log in to the router via the console using the admin account." # type: ignore[assignment] + "Verify that you are able to log in to the router via the console using the admin account." ) - info_label_2: Label = "Once this is done, press submit to continue the workflow." # type: ignore[assignment] + info_label_2: Label = "Once this is done, press submit to continue the workflow." yield ConsolePrompt @@ -193,11 +190,10 @@ def prompt_insert_in_ims() -> FormGenerator: """Wait for confirmation from an operator that the router has been inserted in IMS.""" class IMSPrompt(FormPage): - class Config: - title = "Update IMS mediation server" + model_config = ConfigDict(title="Update IMS mediation server") - info_label_1: Label = "Insert the router into IMS." # type: ignore[assignment] - info_label_2: Label = "Once this is done, press submit to continue the workflow." # type: ignore[assignment] + info_label_1: Label = "Insert the router into IMS." + info_label_2: Label = "Once this is done, press submit to continue the workflow." yield IMSPrompt @@ -209,14 +205,13 @@ def prompt_insert_in_radius(subscription: RouterInactive) -> FormGenerator: """Wait for confirmation from an operator that the router has been inserted in RADIUS.""" class RadiusPrompt(FormPage): - class Config: - title = "Update RADIUS clients" + model_config = ConfigDict(title="Update RADIUS clients") info_label_1: Label = ( - f"Please go to https://kratos.geant.org/add_radius_client and add the {subscription.router.router_fqdn}" # type: ignore[assignment] + f"Please go to https://kratos.geant.org/add_radius_client and add the {subscription.router.router_fqdn}" f" - {subscription.router.router_lo_ipv4_address} to radius authentication" ) - info_label_2: Label = "This will be functionally checked later during verification work." # type: ignore[assignment] + info_label_2: Label = "This will be functionally checked later during verification work." yield RadiusPrompt @@ -229,14 +224,13 @@ def prompt_start_new_checklist(subscription: RouterProvisioning) -> FormGenerato oss_params = load_oss_params() class SharepointPrompt(FormPage): - class Config: - title = "Start new checklist" + model_config = ConfigDict(title="Start new checklist") info_label_1: Label = ( f"Visit {oss_params.SHAREPOINT.checklist_site_url} and start a new Sharepoint checklist for " - f"{subscription.router.router_fqdn}." # type: ignore[assignment] + f"{subscription.router.router_fqdn}." ) - info_label_2: Label = "Once this is done, click proceed to finish the workflow." # type: ignore[assignment] + info_label_2: Label = "Once this is done, click proceed to finish the workflow." yield SharepointPrompt diff --git a/gso/workflows/router/modify_connection_strategy.py b/gso/workflows/router/modify_connection_strategy.py index a3f5b5ae2f1f8cd0aa58d1d407d2daf28662c8a0..bc6be3ecc4b4a08605de17151ec7a14e740daf01 100644 --- a/gso/workflows/router/modify_connection_strategy.py +++ b/gso/workflows/router/modify_connection_strategy.py @@ -6,6 +6,7 @@ from orchestrator.types import FormGenerator, State, UUIDstr from orchestrator.workflow import StepList, done, init, step, workflow from orchestrator.workflows.steps import resync, store_process_subscription, unsync from orchestrator.workflows.utils import wrap_modify_initial_input_form +from pydantic import ConfigDict from gso.products.product_types.router import Router from gso.utils.shared_enums import ConnectionStrategy @@ -20,8 +21,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: ) class ModifyConnectionStrategyForm(FormPage): - class Config: - title = f"Modify the connection strategy of {subscription.router.router_fqdn}." + model_config = ConfigDict(title=f"Modify the connection strategy of {subscription.router.router_fqdn}.") connection_strategy: ConnectionStrategy = current_connection_strategy diff --git a/gso/workflows/router/redeploy_base_config.py b/gso/workflows/router/redeploy_base_config.py index ffacc0ce7f1e383ac258971a78601bf39ac6af89..0393a377abe9c5351e55f0181ee2e6a8ab30f229 100644 --- a/gso/workflows/router/redeploy_base_config.py +++ b/gso/workflows/router/redeploy_base_config.py @@ -17,7 +17,7 @@ def _initial_input_form(subscription_id: UUIDstr) -> FormGenerator: router = Router.from_subscription(subscription_id) class RedeployBaseConfigForm(FormPage): - info_label: Label = f"Redeploy base config on {router.router.router_fqdn}?" # type: ignore[assignment] + info_label: Label = f"Redeploy base config on {router.router.router_fqdn}?" tt_number: str user_input = yield RedeployBaseConfigForm diff --git a/gso/workflows/router/terminate_router.py b/gso/workflows/router/terminate_router.py index 0d46f9abbd7a32e2f141894a09751659d24d52fa..781e75813843db8dd4330fa746cc8a5c45015206 100644 --- a/gso/workflows/router/terminate_router.py +++ b/gso/workflows/router/terminate_router.py @@ -34,13 +34,13 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: class TerminateForm(FormPage): if router.status == SubscriptionLifecycle.INITIAL: info_label_2: Label = ( - "This will immediately mark the subscription as terminated, preventing any other workflows from " # type:ignore[assignment] + "This will immediately mark the subscription as terminated, preventing any other workflows from " "interacting with this product subscription." ) - info_label_3: Label = "ONLY EXECUTE THIS WORKFLOW WHEN YOU ARE ABSOLUTELY SURE WHAT YOU ARE DOING." # type:ignore[assignment] + info_label_3: Label = "ONLY EXECUTE THIS WORKFLOW WHEN YOU ARE ABSOLUTELY SURE WHAT YOU ARE DOING." tt_number: str - termination_label: Label = "Please confirm whether configuration should get removed from the router." # type: ignore[assignment] + termination_label: Label = "Please confirm whether configuration should get removed from the router." remove_configuration: bool = True user_input = yield TerminateForm diff --git a/gso/workflows/router/update_ibgp_mesh.py b/gso/workflows/router/update_ibgp_mesh.py index efd4dd8370258b761a37d8437e16787b07e0a1a1..0aa61cb65a09b64c5d4821b92c4fd85c92a2ef0a 100644 --- a/gso/workflows/router/update_ibgp_mesh.py +++ b/gso/workflows/router/update_ibgp_mesh.py @@ -10,7 +10,7 @@ from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUID from orchestrator.workflow import StepList, done, init, inputstep, step, workflow from orchestrator.workflows.steps import resync, store_process_subscription, unsync from orchestrator.workflows.utils import wrap_modify_initial_input_form -from pydantic import root_validator +from pydantic import ConfigDict, model_validator from gso.products.product_blocks.router import RouterRole from gso.products.product_types.router import Router @@ -29,13 +29,12 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: subscription = Router.from_subscription(subscription_id) class AddBGPSessionForm(FormPage): - class Config: - title = f"Add {subscription.router.router_fqdn} to the iBGP mesh?" + model_config = ConfigDict(title=f"Add {subscription.router.router_fqdn} to the iBGP mesh?") tt_number: str - @root_validator(allow_reuse=True) - def router_has_a_trunk(cls, values: dict[str, Any]) -> dict[str, Any]: + @model_validator(mode="before") + def router_has_a_trunk(cls, data: Any) -> Any: terminating_trunks = get_trunks_that_terminate_on_router( subscription_id, SubscriptionLifecycle.PROVISIONING ) + get_trunks_that_terminate_on_router(subscription_id, SubscriptionLifecycle.ACTIVE) @@ -43,7 +42,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: msg = "Selected router does not terminate any available IP trunks." raise ValueError(msg) - return values + return data user_input = yield AddBGPSessionForm @@ -203,10 +202,9 @@ def prompt_insert_in_radius() -> FormGenerator: """Wait for confirmation from an operator that the router has been inserted in RADIUS.""" class RADIUSPrompt(FormPage): - class Config: - title = "Please update RADIUS before continuing" + model_config = ConfigDict(title="Please update RADIUS before continuing") - info_label: Label = "Insert the router into RADIUS, and continue the workflow once this has been completed." # type: ignore[assignment] + info_label: Label = "Insert the router into RADIUS, and continue the workflow once this has been completed." yield RADIUSPrompt @@ -218,10 +216,9 @@ def prompt_radius_login() -> FormGenerator: """Wait for confirmation from an operator that the router can be logged into using RADIUS.""" class RADIUSPrompt(FormPage): - class Config: - title = "Please check RADIUS before continuing" + model_config = ConfigDict(title="Please check RADIUS before continuing") - info_label: Label = "Log in to the router using RADIUS, and continue the workflow when this was successful." # type: ignore[assignment] + info_label: Label = "Log in to the router using RADIUS, and continue the workflow when this was successful." yield RADIUSPrompt diff --git a/gso/workflows/site/create_imported_site.py b/gso/workflows/site/create_imported_site.py index e149ee22f0e21d46f598df0e1ab8d1b1f4382ba8..033893f0e39e21a78fad87fa6643932ddea8672f 100644 --- a/gso/workflows/site/create_imported_site.py +++ b/gso/workflows/site/create_imported_site.py @@ -7,6 +7,7 @@ from orchestrator.targets import Target from orchestrator.types import FormGenerator, State, SubscriptionLifecycle from orchestrator.workflow import StepList, done, init, step, workflow from orchestrator.workflows.steps import resync, set_status, store_process_subscription +from pydantic import ConfigDict from gso.products import ProductName from gso.products.product_blocks.site import LatitudeCoordinate, LongitudeCoordinate, SiteTier @@ -32,8 +33,7 @@ def generate_initial_input_form() -> FormGenerator: """Generate a form that is filled in using information passed through the :term:`API` endpoint.""" class ImportSite(FormPage): - class Config: - title = "Import Site" + model_config = ConfigDict(title="Import Site") site_name: str site_city: str @@ -84,7 +84,7 @@ def initialize_subscription( @workflow( "Import Site", - target=Target.CREATE, + target=Target.SYSTEM, initial_input_form=generate_initial_input_form, ) def create_imported_site() -> StepList: diff --git a/gso/workflows/site/create_site.py b/gso/workflows/site/create_site.py index be9aab537c7fec01550b7f8009925b799a8c1fd9..c4290d7072e517abd2276e7369663b3685af0e61 100644 --- a/gso/workflows/site/create_site.py +++ b/gso/workflows/site/create_site.py @@ -6,7 +6,8 @@ from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUID from orchestrator.workflow import StepList, done, init, step, workflow from orchestrator.workflows.steps import resync, set_status, store_process_subscription from orchestrator.workflows.utils import wrap_create_initial_input_form -from pydantic_forms.core import ReadOnlyField +from pydantic import ConfigDict +from pydantic_forms.validators import ReadOnlyField from gso.products.product_blocks import site as site_pb from gso.products.product_blocks.site import LatitudeCoordinate, LongitudeCoordinate @@ -19,20 +20,8 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: """Get input from the operator about the new site subscription.""" class CreateSiteForm(FormPage, BaseSiteValidatorModel): - class Config: - title = product_name - - partner: str = ReadOnlyField("GEANT") - site_name: str - site_city: str - site_country: str - site_country_code: str - site_latitude: LatitudeCoordinate - site_longitude: LongitudeCoordinate - site_bgp_community_id: int - site_internal_id: int - site_tier: site_pb.SiteTier - site_ts_address: str + model_config = ConfigDict(title=product_name) + partner: ReadOnlyField("GEANT", default_type=str) # type: ignore[valid-type] user_input = yield CreateSiteForm diff --git a/gso/workflows/site/modify_site.py b/gso/workflows/site/modify_site.py index 15b549dbbcf7f357b5aebc28b885a998a18d9daa..93ac6f5156344d9c47db476197dbe66b252d972a 100644 --- a/gso/workflows/site/modify_site.py +++ b/gso/workflows/site/modify_site.py @@ -11,12 +11,10 @@ from orchestrator.workflows.steps import ( unsync, ) from orchestrator.workflows.utils import wrap_modify_initial_input_form -from pydantic import validator -from pydantic.fields import ModelField -from pydantic_forms.core import ReadOnlyField +from pydantic import ConfigDict, field_validator +from pydantic_forms.validators import ReadOnlyField -from gso.products.product_blocks import site as site_pb -from gso.products.product_blocks.site import LatitudeCoordinate, LongitudeCoordinate +from gso.products.product_blocks.site import LatitudeCoordinate, LongitudeCoordinate, SiteTier from gso.products.product_types.site import Site from gso.utils.helpers import validate_ipv4_or_ipv6, validate_site_fields_is_unique @@ -26,32 +24,42 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: subscription = Site.from_subscription(subscription_id) class ModifySiteForm(FormPage): - class Config: - title = "Modify Site" + model_config = ConfigDict(title="Modify Site") - site_name: str = ReadOnlyField(subscription.site.site_name) + site_name: ReadOnlyField(subscription.site.site_name, default_type=str) # type: ignore[valid-type] site_city: str = subscription.site.site_city - site_country: str = ReadOnlyField(subscription.site.site_country) - site_country_code: str = ReadOnlyField(subscription.site.site_country_code) + site_country: ReadOnlyField(subscription.site.site_country, default_type=str) # type: ignore[valid-type] + site_country_code: ReadOnlyField(subscription.site.site_country_code, default_type=str) # type: ignore[valid-type] site_latitude: LatitudeCoordinate = subscription.site.site_latitude site_longitude: LongitudeCoordinate = subscription.site.site_longitude site_bgp_community_id: int = subscription.site.site_bgp_community_id site_internal_id: int = subscription.site.site_internal_id - site_tier: site_pb.SiteTier = ReadOnlyField(subscription.site.site_tier) + site_tier: ReadOnlyField(subscription.site.site_tier, default_type=SiteTier) # type: ignore[valid-type] site_ts_address: str | None = subscription.site.site_ts_address - @validator("site_ts_address", allow_reuse=True) + @field_validator("site_ts_address") def validate_ts_address(cls, site_ts_address: str) -> str: if site_ts_address and site_ts_address != subscription.site.site_ts_address: validate_site_fields_is_unique("site_ts_address", site_ts_address) validate_ipv4_or_ipv6(site_ts_address) + return site_ts_address - @validator("site_internal_id", "site_bgp_community_id", allow_reuse=True) - def validate_unique_fields(cls, value: str, field: ModelField) -> str | int: - if value == getattr(subscription.site, field.name): - return value - return validate_site_fields_is_unique(field.name, value) + @field_validator("site_internal_id") + def validate_site_internal_id(cls, site_internal_id: int) -> int: + if site_internal_id == subscription.site.site_internal_id: + return site_internal_id + + validate_site_fields_is_unique("site_internal_id", site_internal_id) + return site_internal_id + + @field_validator("site_bgp_community_id") + def validate_site_bgp_community_id(cls, site_bgp_community_id: int) -> int: + if site_bgp_community_id == subscription.site.site_bgp_community_id: + return site_bgp_community_id + + validate_site_fields_is_unique("site_bgp_community_id", site_bgp_community_id) + return site_bgp_community_id user_input = yield ModifySiteForm diff --git a/gso/workflows/site/terminate_site.py b/gso/workflows/site/terminate_site.py index 96e807b47ed9c72b101ad3c6303e9b1fbd1405bc..02be6f7c76f4f01ada85ddaf91e552575eb3c18a 100644 --- a/gso/workflows/site/terminate_site.py +++ b/gso/workflows/site/terminate_site.py @@ -23,12 +23,12 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: class TerminateForm(FormPage): if site.status == SubscriptionLifecycle.INITIAL: info_label_2: Label = ( - "This will immediately mark the subscription as terminated, preventing any other workflows from " # type:ignore[assignment] + "This will immediately mark the subscription as terminated, preventing any other workflows from " "interacting with this product subscription." ) - info_label_3: Label = "ONLY EXECUTE THIS WORKFLOW WHEN YOU ARE ABSOLUTELY SURE WHAT YOU ARE DOING." # type:ignore[assignment] + info_label_3: Label = "ONLY EXECUTE THIS WORKFLOW WHEN YOU ARE ABSOLUTELY SURE WHAT YOU ARE DOING." - termination_label: Label = "Are you sure you want to delete this site?" # type: ignore[assignment] + termination_label: Label = "Are you sure you want to delete this site?" user_input = yield TerminateForm return user_input.dict() diff --git a/gso/workflows/super_pop_switch/create_imported_super_pop_switch.py b/gso/workflows/super_pop_switch/create_imported_super_pop_switch.py index 14b58256882eae40962ce717921404bcd69d6b2d..f44f38a502575814cd15abc30707f0ba64af92a2 100644 --- a/gso/workflows/super_pop_switch/create_imported_super_pop_switch.py +++ b/gso/workflows/super_pop_switch/create_imported_super_pop_switch.py @@ -1,13 +1,12 @@ """A creation workflow that adds existing Super PoP switches to the coreDB.""" -import ipaddress - from orchestrator import workflow from orchestrator.forms import FormPage from orchestrator.targets import Target from orchestrator.types import FormGenerator, State, SubscriptionLifecycle from orchestrator.workflow import StepList, done, init, step from orchestrator.workflows.steps import resync, set_status, store_process_subscription +from pydantic import ConfigDict from gso.products import ProductName from gso.products.product_types.super_pop_switch import ImportedSuperPopSwitchInactive @@ -15,7 +14,7 @@ from gso.services import subscriptions from gso.services.partners import get_partner_by_name from gso.services.subscriptions import get_site_by_name from gso.utils.helpers import generate_fqdn -from gso.utils.shared_enums import PortNumber, Vendor +from gso.utils.shared_enums import IPv4AddressType, PortNumber, Vendor @step("Create subscription") @@ -35,14 +34,13 @@ def initial_input_form_generator() -> FormGenerator: """Generate a form that is filled in using information passed through the :term:`API` endpoint.""" class ImportSuperPopSwitch(FormPage): - class Config: - title = "Import a Super PoP switch" + model_config = ConfigDict(title="Import a Super PoP switch") partner: str super_pop_switch_site: str hostname: str super_pop_switch_ts_port: PortNumber - super_pop_switch_mgmt_ipv4_address: ipaddress.IPv4Address + super_pop_switch_mgmt_ipv4_address: IPv4AddressType user_input = yield ImportSuperPopSwitch @@ -55,7 +53,7 @@ def initialize_subscription( hostname: str, super_pop_switch_ts_port: PortNumber, super_pop_switch_site: str, - super_pop_switch_mgmt_ipv4_address: ipaddress.IPv4Address | None = None, + super_pop_switch_mgmt_ipv4_address: IPv4AddressType | None = None, ) -> State: """Initialise the Super PoP switch subscription using input data.""" subscription.super_pop_switch.super_pop_switch_ts_port = super_pop_switch_ts_port diff --git a/log.txt b/log.txt new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/requirements.txt b/requirements.txt index 73bb46ff0e83e703d78bca7c58a728f95e7fa696..e9164c69a1c47d6ccd52d42bcd97b8a22b8bf994 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,22 +1,22 @@ -orchestrator-core==1.3.4 +orchestrator-core==2.1.2 requests==2.31.0 infoblox-client~=0.6.0 -pycountry==22.3.5 -pynetbox==7.2.0 -celery-redbeat==2.1.1 -celery==5.3.4 +pycountry==23.12.11 +pynetbox==7.3.3 +celery-redbeat==2.2.0 +celery==5.3.6 # Test and linting dependencies celery-stubs==0.1.3 -types-requests==2.31.0.1 -types-PyYAML==6.0.12.12 -pytest==7.4.3 -faker==19.13.0 -responses==0.24.0 -mypy==1.6.1 -ruff==0.1.5 +types-requests==2.31.0.20240406 +types-PyYAML==6.0.12.20240311 +pytest==8.1.1 +faker==24.8.0 +responses==0.25.0 +mypy==1.9.0 +ruff==0.3.5 sphinx==7.2.6 -sphinx-rtd-theme==1.3.0 +sphinx-rtd-theme==2.0.0 urllib3_mock==0.3.3 -pytest-asyncio==0.23.3 -pre-commit~=3.6.0 +pytest-asyncio==0.23.6 +pre-commit~=3.7.0 diff --git a/test/api/conftest.py b/test/api/conftest.py deleted file mode 100644 index e002fa13c19973dfbe733aa47fba34981558f116..0000000000000000000000000000000000000000 --- a/test/api/conftest.py +++ /dev/null @@ -1,8 +0,0 @@ -from test.fixtures import ( # noqa: F401 - iptrunk_side_subscription_factory, - iptrunk_subscription_factory, - nokia_router_subscription_factory, - office_router_subscription_factory, - site_subscription_factory, - super_pop_switch_subscription_factory, -) diff --git a/test/api/test_imports.py b/test/api/test_imports.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/test/api/test_processes.py b/test/api/test_processes.py index 671218400c022a96eaa1e119be60db4fa5ec0d7b..f56fe52640d587928531f5171712f98ca57f8f1e 100644 --- a/test/api/test_processes.py +++ b/test/api/test_processes.py @@ -11,9 +11,11 @@ from orchestrator.workflow import ProcessStatus @pytest.fixture() -def create_process(faker, nokia_router_subscription_factory): +def create_process(test_workflow, nokia_router_subscription_factory): process_id = uuid4() - process = ProcessTable(process_id=process_id, workflow_name=faker.sentence(), last_status=ProcessStatus.SUSPENDED) + process = ProcessTable( + process_id=process_id, workflow_id=test_workflow.workflow_id, last_status=ProcessStatus.SUSPENDED + ) subscription = nokia_router_subscription_factory() process_subscription = ProcessSubscriptionTable(process_id=process_id, subscription_id=subscription) diff --git a/test/auth/test_oidc_policy_helper.py b/test/auth/test_oidc_policy_helper.py index 14af9f6b4ee55c5025aaef64414017f85a8f7513..767e34423a5c4969d3a08fc4d5ee01f005fc0b40 100644 --- a/test/auth/test_oidc_policy_helper.py +++ b/test/auth/test_oidc_policy_helper.py @@ -57,7 +57,7 @@ def oidc_user(mock_openid_config): resource_server_id="resource_server", resource_server_secret="secret", # noqa: S106 ) - user.openid_config = OIDCConfig.parse_obj(mock_openid_config) + user.openid_config = OIDCConfig.model_validate(mock_openid_config) return user @@ -266,7 +266,7 @@ async def test_oidc_user_call_no_token(oidc_user, mock_request): patch("httpx.AsyncClient.get", new_callable=MagicMock) as mock_get, ): mock_post.return_value = MagicMock(status_code=200, json=lambda: {"active": False}) - mock_get.return_value = MagicMock(status_code=200, json=lambda: {}) + mock_get.return_value = MagicMock(status_code=200, json=dict) result = await oidc_user.__call__(mock_request) diff --git a/test/conftest.py b/test/conftest.py index d0bfebfed7b8bf9e04e2d086fc5bd568550dc321..a5d4279839d452801453e7d2368e8b2513bf6c81 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,4 +1,5 @@ import contextlib +import datetime import ipaddress import logging import os @@ -11,10 +12,20 @@ from alembic.config import Config from faker import Faker from faker.providers import BaseProvider from orchestrator import app_settings -from orchestrator.db import Database, db +from orchestrator.db import ( + Database, + ProductBlockTable, + ProductTable, + ResourceTypeTable, + SubscriptionMetadataTable, + WorkflowTable, + db, +) from orchestrator.db.database import ENGINE_ARGUMENTS, SESSION_ARGUMENTS, BaseModel -from orchestrator.types import strEnum -from sqlalchemy import create_engine, text +from orchestrator.domain import SUBSCRIPTION_MODEL_REGISTRY, SubscriptionModel +from orchestrator.domain.base import ProductBlockModel +from orchestrator.types import SubscriptionLifecycle, strEnum +from sqlalchemy import create_engine, select, text from sqlalchemy.engine import make_url from sqlalchemy.orm import scoped_session, sessionmaker from starlette.testclient import TestClient @@ -25,6 +36,16 @@ from gso.main import init_gso_app from gso.schema.partner import PartnerCreate from gso.services.partners import create_partner from gso.utils.helpers import LAGMember +from test.fixtures import ( # noqa: F401 + iptrunk_side_subscription_factory, + iptrunk_subscription_factory, + juniper_router_subscription_factory, + nokia_router_subscription_factory, + office_router_subscription_factory, + site_subscription_factory, + super_pop_switch_subscription_factory, + test_workflow, +) logging.getLogger("faker.factory").setLevel(logging.WARNING) @@ -247,3 +268,250 @@ def test_client(fastapi_app): @pytest.fixture(scope="session") def geant_partner(): return create_partner(PartnerCreate(name="GEANT-TEST", partner_type=PartnerType.GEANT, email="goat-test@geant.org")) + + +@pytest.fixture() +def generic_resource_type_1(): + rt = ResourceTypeTable(description="Resource Type one", resource_type="rt_1") + db.session.add(rt) + db.session.commit() + + return rt + + +@pytest.fixture() +def generic_resource_type_2(): + rt = ResourceTypeTable(description="Resource Type two", resource_type="rt_2") + db.session.add(rt) + db.session.commit() + return rt + + +@pytest.fixture() +def generic_resource_type_3(): + rt = ResourceTypeTable(description="Resource Type three", resource_type="rt_3") + db.session.add(rt) + db.session.commit() + + return rt + + +@pytest.fixture() +def generic_product_block_1(generic_resource_type_1): + pb = ProductBlockTable( + name="PB_1", + description="Generic Product Block 1", + tag="PB1", + status="active", + resource_types=[generic_resource_type_1], + created_at=datetime.datetime.fromisoformat("2023-05-24T00:00:00+00:00"), + ) + db.session.add(pb) + db.session.commit() + return pb + + +@pytest.fixture() +def generic_product_block_2(generic_resource_type_2, generic_resource_type_3): + pb = ProductBlockTable( + name="PB_2", + description="Generic Product Block 2", + tag="PB2", + status="active", + resource_types=[generic_resource_type_2, generic_resource_type_3], + created_at=datetime.datetime.fromisoformat("2023-05-24T00:00:00+00:00"), + ) + db.session.add(pb) + db.session.commit() + return pb + + +@pytest.fixture() +def generic_product_block_3(generic_resource_type_2): + pb = ProductBlockTable( + name="PB_3", + description="Generic Product Block 3", + tag="PB3", + status="active", + resource_types=[generic_resource_type_2], + created_at=datetime.datetime.fromisoformat("2023-05-24T00:00:00+00:00"), + ) + db.session.add(pb) + db.session.commit() + return pb + + +@pytest.fixture() +def generic_product_1(generic_product_block_1, generic_product_block_2): + workflow = db.session.scalar(select(WorkflowTable).where(WorkflowTable.name == "modify_note")) + p = ProductTable( + name="Product 1", + description="Generic Product One", + product_type="Generic", + status="active", + tag="GEN1", + product_blocks=[generic_product_block_1, generic_product_block_2], + workflows=[workflow], + ) + db.session.add(p) + db.session.commit() + return p + + +@pytest.fixture() +def generic_product_2(generic_product_block_3): + workflow = db.session.scalar(select(WorkflowTable).where(WorkflowTable.name == "modify_note")) + + p = ProductTable( + name="Product 2", + description="Generic Product Two", + product_type="Generic", + status="active", + tag="GEN2", + product_blocks=[generic_product_block_3], + workflows=[workflow], + ) + db.session.add(p) + db.session.commit() + return p + + +@pytest.fixture() +def generic_product_3(generic_product_block_2): + p = ProductTable( + name="Product 3", + description="Generic Product Three", + product_type="Generic", + status="active", + tag="GEN3", + product_blocks=[generic_product_block_2], + ) + db.session.add(p) + db.session.commit() + return p + + +@pytest.fixture() +def generic_product_block_type_1(generic_product_block_1): + class GenericProductBlockOneInactive(ProductBlockModel, product_block_name="PB_1"): + rt_1: str | None = None + + class GenericProductBlockOne(GenericProductBlockOneInactive, lifecycle=[SubscriptionLifecycle.ACTIVE]): + rt_1: str + + return GenericProductBlockOneInactive, GenericProductBlockOne + + +@pytest.fixture() +def generic_product_block_type_2(generic_product_block_2): + class GenericProductBlockTwoInactive(ProductBlockModel, product_block_name="PB_2"): + rt_2: int | None = None + rt_3: str | None = None + + class GenericProductBlockTwo(GenericProductBlockTwoInactive, lifecycle=[SubscriptionLifecycle.ACTIVE]): + rt_2: int + rt_3: str + + return GenericProductBlockTwoInactive, GenericProductBlockTwo + + +@pytest.fixture() +def generic_product_block_type_3(generic_product_block_3): + class GenericProductBlockThreeInactive(ProductBlockModel, product_block_name="PB_3"): + rt_2: int | None = None + + class GenericProductBlockThree(GenericProductBlockThreeInactive, lifecycle=[SubscriptionLifecycle.ACTIVE]): + rt_2: int + + return GenericProductBlockThreeInactive, GenericProductBlockThree + + +@pytest.fixture() +def generic_product_type_1(generic_product_1, generic_product_block_type_1, generic_product_block_type_2): + generic_product_block_one_inactive, generic_product_block_one = generic_product_block_type_1 + generic_product_block_two_inactive, generic_product_block_two = generic_product_block_type_2 + + # Test Product domain models + + class GenericProductOneInactive(SubscriptionModel, is_base=True): + pb_1: generic_product_block_one_inactive + pb_2: generic_product_block_two_inactive + + class GenericProductOne(GenericProductOneInactive, lifecycle=[SubscriptionLifecycle.ACTIVE]): + pb_1: generic_product_block_one + pb_2: generic_product_block_two + + SUBSCRIPTION_MODEL_REGISTRY["Product 1"] = GenericProductOne + + yield GenericProductOneInactive, GenericProductOne + + del SUBSCRIPTION_MODEL_REGISTRY["Product 1"] + + +@pytest.fixture() +def generic_product_type_2(generic_product_2, generic_product_block_type_3): + generic_product_block_three_inactive, generic_product_block_three = generic_product_block_type_3 + + class GenericProductTwoInactive(SubscriptionModel, is_base=True): + pb_3: generic_product_block_three_inactive + + class GenericProductTwo(GenericProductTwoInactive, lifecycle=[SubscriptionLifecycle.ACTIVE]): + pb_3: generic_product_block_three + + SUBSCRIPTION_MODEL_REGISTRY["Product 2"] = GenericProductTwo + + yield GenericProductTwoInactive, GenericProductTwo + + del SUBSCRIPTION_MODEL_REGISTRY["Product 2"] + + +@pytest.fixture() +def product_type_1_subscription_factory(generic_product_1, generic_product_type_1, geant_partner): + def subscription_create( + description="Generic Subscription One", + start_date="2023-05-24T00:00:00+00:00", + rt_1="Value1", + rt_2=42, + rt_3="Value2", + ): + generic_product_one_inactive, _ = generic_product_type_1 + gen_subscription = generic_product_one_inactive.from_product_id( + generic_product_1.product_id, customer_id=geant_partner["partner_id"], insync=True + ) + gen_subscription.pb_1.rt_1 = rt_1 + gen_subscription.pb_2.rt_2 = rt_2 + gen_subscription.pb_2.rt_3 = rt_3 + gen_subscription = SubscriptionModel.from_other_lifecycle(gen_subscription, SubscriptionLifecycle.ACTIVE) + gen_subscription.description = description + gen_subscription.start_date = start_date + gen_subscription.save() + + gen_subscription_metadata = SubscriptionMetadataTable() + gen_subscription_metadata.subscription_id = gen_subscription.subscription_id + gen_subscription_metadata.metadata_ = {"description": "Some metadata description"} + db.session.add(gen_subscription_metadata) + db.session.commit() + return str(gen_subscription.subscription_id) + + return subscription_create + + +@pytest.fixture() +def product_type_1_subscriptions_factory(product_type_1_subscription_factory): + def subscriptions_create(amount=1): + return [ + product_type_1_subscription_factory( + description=f"Subscription {i}", + start_date=( + datetime.datetime.fromisoformat("2023-05-24T00:00:00+00:00") + datetime.timedelta(days=i) + ).replace(tzinfo=datetime.UTC), + ) + for i in range(amount) + ] + + return subscriptions_create + + +@pytest.fixture() +def generic_subscription_1(product_type_1_subscription_factory): + return product_type_1_subscription_factory() diff --git a/test/fixtures.py b/test/fixtures.py index 2a7eab3dea34e4625beba4816741154db2d4f2a3..96107fd646f0da31232c0460cb72d868c4e1a86a 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -1,9 +1,18 @@ import ipaddress +from collections.abc import Generator +from typing import Any +from uuid import uuid4 import pytest +from orchestrator import step, workflow +from orchestrator.config.assignee import Assignee from orchestrator.db import db from orchestrator.domain import SubscriptionModel from orchestrator.types import SubscriptionLifecycle, UUIDstr +from orchestrator.workflow import done, init, inputstep +from pydantic_forms.core import FormPage +from pydantic_forms.types import FormGenerator +from pydantic_forms.validators import Choice from gso.products import ProductName from gso.products.product_blocks.iptrunk import ( @@ -21,6 +30,7 @@ from gso.products.product_types.site import Site, SiteInactive from gso.products.product_types.super_pop_switch import SuperPopSwitchInactive from gso.services import subscriptions from gso.utils.shared_enums import Vendor +from test.workflows import WorkflowInstanceForTests @pytest.fixture() @@ -395,3 +405,40 @@ def super_pop_switch_subscription_factory(site_subscription_factory, faker, gean return str(super_pop_switch_subscription.subscription_id) return subscription_create + + +@pytest.fixture() +def test_workflow(generic_subscription_1: UUIDstr, generic_product_type_1) -> Generator: + _, generic_product_one = generic_product_type_1 + + @step("Insert UUID in state") + def insert_object(): + return {"subscription_id": str(uuid4()), "model": generic_product_one.from_subscription(generic_subscription_1)} + + @step("Test that it is a string now") + def check_object(subscription_id: Any, model: dict) -> None: + # This is actually a test. It would be nicer to have this in a proper test but that takes to much setup that + # already happens here. So we hijack this fixture and run this test for all tests that use this fixture + # (which should not be an issue) + assert isinstance(subscription_id, str) + assert isinstance(model, dict) + + @inputstep("Modify", assignee=Assignee.CHANGES) + def modify(subscription_id: UUIDstr) -> FormGenerator: + class TestChoice(Choice): + A = "A" + B = "B" + C = "C" + + class TestForm(FormPage): + generic_select: TestChoice + + user_input = yield TestForm + return user_input.model_dump() + + @workflow("Workflow") + def workflow_for_testing_processes_py(): + return init >> insert_object >> check_object >> modify >> done + + with WorkflowInstanceForTests(workflow_for_testing_processes_py, "workflow_for_testing_processes_py") as wf: + yield wf diff --git a/test/schedules/test_scheduling.py b/test/schedules/test_scheduling.py index 5ed2ad01e14a00e9e0785e9ee9a31518325f4bea..82168eb4375f2bdb50ed2c4de34fe9e0f65cd8cb 100644 --- a/test/schedules/test_scheduling.py +++ b/test/schedules/test_scheduling.py @@ -8,7 +8,7 @@ from gso.schedules.scheduling import scheduler @pytest.fixture(scope="module") def validate_subscriptions(): - from gso.schedules.validate_subscriptions import validate_subscriptions as vs # noqa: PLC0415 + from gso.schedules.validate_subscriptions import validate_subscriptions as vs return vs diff --git a/test/schemas/test_types.py b/test/schemas/test_types.py index 2e90123f3d96f3c0e5c86294780ba4539a9660c1..6f43bb10bb87d8e8b634a4249eba768d5f5af246 100644 --- a/test/schemas/test_types.py +++ b/test/schemas/test_types.py @@ -1,53 +1,62 @@ import pytest +from pydantic import BaseModel, ValidationError from gso.products.product_blocks.site import LatitudeCoordinate, LongitudeCoordinate +class LatitudeModel(BaseModel): + latitude: LatitudeCoordinate + + +class LongitudeModel(BaseModel): + longitude: LongitudeCoordinate + + @pytest.mark.parametrize( ("input_value", "is_valid"), [ - ("40.7128", True), - ("-74.0060", True), - ("90", True), - ("-90", True), - ("0", True), - ("45.6", True), - ("91", False), - ("-91", False), - ("180", False), - ("-180", False), + (40.7128, True), + (-74.0060, True), + (90, True), + (-90, True), + (0, True), + (45.6, True), + (91, False), + (-91, False), + (180, False), + (-180, False), ("abc", False), - ("90.1", False), + (90.1, False), ], ) def test_latitude(input_value, is_valid): if is_valid: - assert LatitudeCoordinate.validate(input_value) == input_value + assert LatitudeModel(latitude=input_value).latitude == input_value else: - with pytest.raises(ValueError, match="Invalid latitude coordinate"): - LatitudeCoordinate.validate(input_value) + with pytest.raises(ValidationError): + LatitudeModel(latitude=input_value) @pytest.mark.parametrize( ("input_value", "is_valid"), [ - ("40.7128", True), - ("-74.0060", True), - ("180", True), - ("-180", True), - ("0", True), - ("90.1", True), - ("181", False), - ("-181", False), - ("200", False), - ("-200", False), + (40.7128, True), + (-74.0060, True), + (180, True), + (-180, True), + (0, True), + (90.1, True), + (181, False), + (-181, False), + (200, False), + (-200, False), ("abc", False), ("90a", False), ], ) def test_longitude(input_value, is_valid): if is_valid: - assert LongitudeCoordinate.validate(input_value) == input_value + assert LongitudeModel(longitude=input_value).longitude == input_value else: - with pytest.raises(ValueError, match="Invalid longitude coordinate"): - LongitudeCoordinate.validate(input_value) + with pytest.raises(ValidationError): + LongitudeModel(longitude=input_value) diff --git a/test/services/test_librenms_client.py b/test/services/test_librenms_client.py index 98afda75adda5b4b497cfe9189339d598e3a9abd..d07da93a1be847be29c267c4c2188b5533a0af65 100644 --- a/test/services/test_librenms_client.py +++ b/test/services/test_librenms_client.py @@ -168,9 +168,12 @@ def mock_get_device_misconfigured(faker): @pytest.fixture() def mock_get_device_unauthenticated(): - with patch("gso.services.librenms_client.requests.get") as mock_get_unauthorized, patch( - "gso.services.librenms_client.LibreNMSClient.get_device", - ) as mock_get_device: + with ( + patch("gso.services.librenms_client.requests.get") as mock_get_unauthorized, + patch( + "gso.services.librenms_client.LibreNMSClient.get_device", + ) as mock_get_device, + ): mock_get_unauthorized().status_code = HTTPStatus.UNAUTHORIZED mock_get_unauthorized().json.return_value = {"message": "Unauthenticated."} mock_get_device.side_effect = HTTPError( diff --git a/test/workflows/__init__.py b/test/workflows/__init__.py index a8144aacac102f807324458c14ad0cd04c69c892..246d2b767b253699ce6b56cea36aeaa9ca156fea 100644 --- a/test/workflows/__init__.py +++ b/test/workflows/__init__.py @@ -2,16 +2,16 @@ import difflib import pprint from collections.abc import Callable from copy import deepcopy -from itertools import chain, repeat from typing import cast from uuid import uuid4 import structlog -from orchestrator.db import ProcessTable +from orchestrator.db import ProcessTable, WorkflowTable, db from orchestrator.services.processes import StateMerger, _db_create_process -from orchestrator.types import FormGenerator, InputForm, State +from orchestrator.types import State from orchestrator.utils.json import json_dumps, json_loads from orchestrator.workflow import Process, ProcessStat, Step, Success, Workflow, runwf +from orchestrator.workflow import Process as WFProcess from orchestrator.workflows import ALL_WORKFLOWS, LazyWorkflowInstance, get_workflow from pydantic_forms.core import post_form @@ -113,10 +113,22 @@ def extract_error(result): return extract_state(result).get("error") +def store_workflow(wf: Workflow, name: str | None = None) -> WorkflowTable: + wf_table = WorkflowTable(name=name or wf.name, target=wf.target, description=wf.description) + db.session.add(wf_table) + db.session.commit() + return wf_table + + +def delete_workflow(wf: WorkflowTable) -> None: + db.session.delete(wf) + db.session.commit() + + class WorkflowInstanceForTests(LazyWorkflowInstance): """Register Test workflows. - Similar to ``LazyWorkflowInstance`` but does not require an import during instantiate + Similar to `LazyWorkflowInstance` but does not require an import during instantiate Used for creating test workflows """ @@ -125,14 +137,19 @@ class WorkflowInstanceForTests(LazyWorkflowInstance): is_callable: bool def __init__(self, workflow: Workflow, name: str) -> None: + super().__init__("orchestrator.test", name) self.workflow = workflow self.name = name def __enter__(self): ALL_WORKFLOWS[self.name] = self + self.workflow_instance = store_workflow(self.workflow, name=self.name) + return self.workflow_instance def __exit__(self, _exc_type, _exc_value, _traceback): del ALL_WORKFLOWS[self.name] + delete_workflow(self.workflow_instance) + del self.workflow_instance def instantiate(self) -> Workflow: """Import and instantiate a workflow and return it. @@ -140,7 +157,9 @@ class WorkflowInstanceForTests(LazyWorkflowInstance): This can be as simple as merely importing a workflow function. However, if it concerns a workflow generating function, that function will be called with or without arguments as specified. - :return Workflow: A workflow function. + Returns: + A workflow function. + """ self.workflow.name = self.name return self.workflow @@ -172,13 +191,23 @@ def _store_step(step_log: list[tuple[Step, Process]]) -> Callable[[ProcessStat, return __store_step -def run_workflow(workflow_key: str, input_data: State | list[State]) -> tuple[Process, ProcessStat, list]: - # ATTENTION!! This code needs to be as similar as possible to ``server.services.processes.start_process`` +def _sanitize_input(input_data: State | list[State]) -> list[State]: + # To be backwards compatible convert single dict to list + if not isinstance(input_data, list): + input_data = [input_data] + + # We need a copy here and we want to mimic the actual code that returns a serialized version of the state + return cast(list[State], json_loads(json_dumps(input_data))) + + +def run_workflow(workflow_key: str, input_data: State | list[State]) -> tuple[WFProcess, ProcessStat, list]: + # ATTENTION!! This code needs to be as similar as possible to `server.services.processes.start_process` # The main differences are: we use a different step log function, and we don't run in # a separate thread + user_data = _sanitize_input(input_data) user = "john.doe" - step_log: list[tuple[Step, Process]] = [] + step_log: list[tuple[Step, WFProcess]] = [] process_id = uuid4() workflow = get_workflow(workflow_key) @@ -190,7 +219,7 @@ def run_workflow(workflow_key: str, input_data: State | list[State]) -> tuple[Pr "workflow_target": workflow.target, } - user_input = post_form(workflow.initial_input_form, initial_state, input_data) + user_input = post_form(workflow.initial_input_form, initial_state, user_data) pstat = ProcessStat( process_id, @@ -244,72 +273,6 @@ def resume_workflow( return result, step_log -def run_form_generator( - form_generator: FormGenerator, - extra_inputs: list[State] | None = None, -) -> tuple[list[dict], State]: - """Run a form generator to get the resulting forms and result. - - Warning! This does not run the actual pydantic validation on purpose. However, you should - make sure that anything in extra_inputs matched the values and types as if the pydantic validation has - been run. - - :param FormGenerator form_generator: The form generator that will be run. - :param list[State] | None extra_inputs: list of user input dicts for each page in the generator. - If no input is given for a page, an empty dict is used. - The default value from the form is used as the default value for a field. - - :return tuple[list[dict], State]: A list of generated forms and the result state for the whole generator. - - Example: - ------- - Given the following form generator: - - >>> from pydantic_forms.core import FormPage - >>> def form_generator(state): - ... class TestForm(FormPage): - ... field: str = "foo" - ... user_input = yield TestForm - ... return {**user_input.dict(), "bar": 42} - - You can run this without extra_inputs - >>> forms, result = run_form_generator(form_generator({"state_field": 1})) - >>> forms - [{'title': 'unknown', 'type': 'object', 'properties': { - 'field': {'title': 'Field', 'default': 'foo', 'type': 'string'}}, 'additionalProperties': False}] - >>> result - {'field': 'foo', 'bar': 42} - - - Or with extra_inputs: - >>> forms, result = run_form_generator(form_generator({'state_field': 1}), [{'field':'baz'}]) - >>> forms - [{'title': 'unknown', 'type': 'object', 'properties': { - 'field': {'title': 'Field', 'default': 'foo', 'type': 'string'}}, 'additionalProperties': False}] - >>> result - {'field': 'baz', 'bar': 42} - - """ - forms: list[dict] = [] - result: State = {"s": 3} - if extra_inputs is None: - extra_inputs = [] - - try: - form = cast(InputForm, next(form_generator)) - forms.append(form.schema()) - for extra_input in chain(extra_inputs, repeat(cast(State, {}))): - user_input_data = {field_name: field.default for field_name, field in form.__fields__.items()} - user_input_data.update(extra_input) - user_input = form.construct(**user_input_data) - form = form_generator.send(user_input) - forms.append(form.schema()) - except StopIteration as stop: - result = stop.value - - return forms, result - - def user_accept_and_assert_suspended(process_stat, step_log, extra_data=None): extra_data = extra_data or {} result, step_log = resume_workflow(process_stat, step_log, extra_data) diff --git a/test/workflows/conftest.py b/test/workflows/conftest.py index 0665829aee73ae9cd3b9d1129a2781a98c2e210d..9d298a779f3e4f190e009973caa321658eb2433b 100644 --- a/test/workflows/conftest.py +++ b/test/workflows/conftest.py @@ -1,14 +1,6 @@ import pytest from urllib3_mock import Responses -from test.fixtures import ( # noqa: F401 - iptrunk_side_subscription_factory, - iptrunk_subscription_factory, - juniper_router_subscription_factory, - nokia_router_subscription_factory, - site_subscription_factory, -) - @pytest.fixture(autouse=True) def responses(): diff --git a/test/workflows/iptrunk/test_create_iptrunk.py b/test/workflows/iptrunk/test_create_iptrunk.py index 4163f6835166d368cbda5a007d24f7043bd8e7f4..84039f326f35de5ef08591fb07d686b03bc8d35a 100644 --- a/test/workflows/iptrunk/test_create_iptrunk.py +++ b/test/workflows/iptrunk/test_create_iptrunk.py @@ -6,7 +6,6 @@ import pytest from gso.products import Iptrunk, ProductName from gso.products.product_blocks.iptrunk import IptrunkType, PhysicalPortCapacity from gso.services.subscriptions import get_product_id_by_name -from gso.utils.helpers import LAGMember from gso.utils.shared_enums import Vendor from test import USER_CONFIRM_EMPTY_FORM from test.services.conftest import MockedNetboxClient @@ -56,7 +55,7 @@ def input_form_wizard_data(request, juniper_router_subscription_factory, nokia_r else: router_side_b = nokia_router_subscription_factory() side_b_members = [ - LAGMember(interface_name=f"Interface{interface}", interface_description=faker.sentence()) + {"interface_name": f"Interface{interface}", "interface_description": faker.sentence()} for interface in range(2) ] @@ -74,10 +73,10 @@ def input_form_wizard_data(request, juniper_router_subscription_factory, nokia_r "side_a_ae_iface": "lag-1", "side_a_ae_geant_a_sid": None, "side_a_ae_members": [ - LAGMember( - interface_name=f"Interface{interface}", - interface_description=faker.sentence(), - ) + { + "interface_name": f"Interface{interface}", + "interface_description": faker.sentence(), + } for interface in range(2) ], } @@ -134,12 +133,10 @@ def test_successful_iptrunk_creation_with_standard_lso_result( subscription_id = state["subscription_id"] subscription = Iptrunk.from_subscription(subscription_id) - sorted_sides = sorted( - [ - subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_site.site_name, - subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_site.site_name, - ] - ) + sorted_sides = sorted([ + subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_site.site_name, + subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_site.site_name, + ]) assert subscription.status == "provisioning" assert subscription.description == ( f"IP trunk {sorted_sides[0]} {sorted_sides[1]}, geant_s_sid:{input_form_wizard_data[0]['geant_s_sid']}" diff --git a/test/workflows/iptrunk/test_migrate_iptrunk.py b/test/workflows/iptrunk/test_migrate_iptrunk.py index 5640cd646b75083f44d5bfbe37e21d1bfa9115a9..cd46d72100d3432cc868cdd8bb6cb710d0e74c44 100644 --- a/test/workflows/iptrunk/test_migrate_iptrunk.py +++ b/test/workflows/iptrunk/test_migrate_iptrunk.py @@ -29,6 +29,8 @@ def migrate_form_input( iptrunk_side_subscription_factory, ): use_juniper = getattr(request, "param", UseJuniperSide.NONE) + new_side_ae_members_nokia = faker.link_members_nokia()[0:2] + new_side_ae_members_juniper = faker.link_members_juniper()[0:2] if use_juniper == UseJuniperSide.SIDE_A: # Nokia -> Juniper @@ -36,7 +38,7 @@ def migrate_form_input( old_subscription = Iptrunk.from_subscription(product_id) new_router = juniper_router_subscription_factory() replace_side = str(old_subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.subscription.subscription_id) - new_side_ae_members = faker.link_members_juniper()[0:2] + new_side_ae_members = new_side_ae_members_juniper lag_name = "ae1" elif use_juniper == UseJuniperSide.SIDE_B: # Juniper -> Nokia @@ -48,7 +50,7 @@ def migrate_form_input( old_subscription = Iptrunk.from_subscription(product_id) new_router = nokia_router_subscription_factory() replace_side = str(old_subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.subscription.subscription_id) - new_side_ae_members = faker.link_members_nokia()[0:2] + new_side_ae_members = new_side_ae_members_nokia lag_name = "lag-1" elif use_juniper == UseJuniperSide.SIDE_BOTH: # Juniper -> Juniper @@ -60,7 +62,7 @@ def migrate_form_input( old_subscription = Iptrunk.from_subscription(product_id) new_router = juniper_router_subscription_factory() replace_side = str(old_subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.subscription.subscription_id) - new_side_ae_members = faker.link_members_juniper()[0:2] + new_side_ae_members = new_side_ae_members_juniper lag_name = "ae1" else: # Nokia -> Nokia @@ -68,7 +70,7 @@ def migrate_form_input( old_subscription = Iptrunk.from_subscription(product_id) new_router = nokia_router_subscription_factory() replace_side = str(old_subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.subscription.subscription_id) - new_side_ae_members = faker.link_members_nokia()[0:2] + new_side_ae_members = new_side_ae_members_nokia lag_name = "lag-1" return [ diff --git a/test/workflows/site/test_create_site.py b/test/workflows/site/test_create_site.py index e31576152634045e9efe57b864a51785495a41d1..f6c196da217320506521e9d993b7e7609f8e05db 100644 --- a/test/workflows/site/test_create_site.py +++ b/test/workflows/site/test_create_site.py @@ -4,7 +4,6 @@ from pydantic_forms.exceptions import FormValidationError from gso.products import ProductName from gso.products.product_blocks.site import SiteTier from gso.products.product_types.site import Site -from gso.services.partners import get_partner_by_name from gso.services.subscriptions import get_product_id_by_name from test.workflows import assert_complete, extract_state, run_workflow @@ -65,7 +64,7 @@ def test_site_name_is_incorrect(responses, faker): "site_internal_id": faker.pyint(), "site_tier": SiteTier.TIER1, "site_ts_address": faker.ipv4(), - "partner": get_partner_by_name("GEANT")["partner_id"], + "partner": "GEANT", }, ]