diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 61d37c99aa91a89f8c195261fd647924c051007e..6cc76019e6555e40b0c8a4aca3db42defbad859c 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -12,7 +12,7 @@ run-tox-pipeline: stage: tox tags: - docker-executor - image: python:3.11 + image: python:3.12 services: - postgres:15.4 diff --git a/Dockerfile b/Dockerfile index 5be8cb5440ee78244ae72f0d8df2073660e576f6..b2802dd07a91be30f25831fc71f97361e2ac46d1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.11-alpine +FROM python:3.12-alpine WORKDIR /app ARG ARTIFACT_VERSION diff --git a/docs/source/quickstart.rst b/docs/source/quickstart.rst index a7cc2d9058a220cfd6db4e43bb3a1cb2a4ad9181..d82a04c1816b3ebb75e3f05213d5163df3ad6fd3 100644 --- a/docs/source/quickstart.rst +++ b/docs/source/quickstart.rst @@ -4,15 +4,15 @@ Quickstart Development environment and dependencies ---------------------------------------- -- Install python 3.11 if you do not have it already: +- Install python 3.12 if you do not have it already: - ``add-apt-repository ppa:deadsnakes/ppa`` - - ``apt install python3.11 python3.11-distutils`` + - ``apt install python3.12 python3.12-distutils`` - Follow Steps 1 and 2 from here to install dependencies and setup DB: `<https://workfloworchestrator.org/orchestrator-core/workshops/beginner/debian/>`_ - To install the orchestrator GUI, you can follow the steps 5 and 6 from the previous link. - Create a virtual environment: - ``source /usr/share/virtualenvwrapper/virtualenvwrapper.sh`` - - ``mkvirtualenv --python python3.11 gso`` + - ``mkvirtualenv --python python3.12 gso`` - To use the virtual environment: - ``source /usr/share/virtualenvwrapper/virtualenvwrapper.sh`` - ``workon gso`` @@ -25,7 +25,7 @@ Do all this inside the virtual environment. - Clone this repository - ``pip install -r requirements.txt`` - If you get an error because you pip version is too old, run this: - ``curl -sS https://bootstrap.pypa.io/get-pip.py | python3.11`` + ``curl -sS https://bootstrap.pypa.io/get-pip.py | python3.12`` - ``pip install -e .`` - Create an ``oss-params.json`` based on the ``oss-params-example.json`` file inside ``/gso``. - Export the oss-params file: ``export OSS_PARAMS_FILENAME="/path/to/oss-params.json"`` diff --git a/gso/api/v1/imports.py b/gso/api/v1/imports.py index 6a2faf96b889a6bd6522227640121d1a44013992..de08184fb0d3a6cfbc29ff6228fc5b921593f6b8 100644 --- a/gso/api/v1/imports.py +++ b/gso/api/v1/imports.py @@ -15,7 +15,7 @@ from gso.products.product_blocks.router import RouterRole from gso.services import subscriptions from gso.services.partners import PartnerNotFoundError, get_partner_by_name from gso.utils.helpers import BaseSiteValidatorModel, LAGMember -from gso.utils.shared_enums import FancyIPV4Address, FancyIPV6Address, PortNumber, Vendor +from gso.utils.shared_enums import IPV4AddressType, IPV6AddressType, PortNumber, Vendor router = APIRouter(prefix="/imports", tags=["Imports"], dependencies=[Depends(opa_security_default)]) @@ -42,8 +42,8 @@ class RouterImportModel(BaseModel): ts_port: int router_vendor: Vendor router_role: RouterRole - router_lo_ipv4_address: FancyIPV4Address - router_lo_ipv6_address: FancyIPV6Address + router_lo_ipv4_address: IPV4AddressType + router_lo_ipv6_address: IPV6AddressType router_lo_iso_address: str @@ -129,7 +129,7 @@ class SuperPopSwitchImportModel(BaseModel): super_pop_switch_site: str hostname: str super_pop_switch_ts_port: PortNumber - super_pop_switch_mgmt_ipv4_address: FancyIPV4Address + super_pop_switch_mgmt_ipv4_address: IPV4AddressType class OfficeRouterImportModel(BaseModel): @@ -139,8 +139,8 @@ class OfficeRouterImportModel(BaseModel): office_router_site: str office_router_fqdn: str office_router_ts_port: PortNumber - office_router_lo_ipv4_address: FancyIPV4Address - office_router_lo_ipv6_address: FancyIPV6Address + office_router_lo_ipv4_address: IPV4AddressType + office_router_lo_ipv6_address: IPV6AddressType def _start_process(process_name: str, data: dict) -> UUID: diff --git a/gso/api/v1/subscriptions.py b/gso/api/v1/subscriptions.py index e2221779b66786429bb4db61f2c94bfee53e8371..bf4e96bfb6c0b2932d183d1c2707f2a18bf5a608 100644 --- a/gso/api/v1/subscriptions.py +++ b/gso/api/v1/subscriptions.py @@ -1,4 +1,5 @@ """:term:`API` endpoint for fetching different types of subscriptions.""" + from typing import Any from fastapi import Depends, Response, status @@ -19,17 +20,6 @@ router = APIRouter( ) -# class MySubscriptionDomainModelSchema(SubscriptionDomainModelSchema): -# model_config = ConfigDict( -# extra="allow", -# json_encoders={ -# # datetime: lambda dt: dt.timestamp(), -# ipaddress.IPv4Address: lambda v: 1/0, -# ipaddress.IPv6Address: lambda v: str(v), -# } -# ) - - @router.get( "/routers", status_code=status.HTTP_200_OK, diff --git a/gso/auth/oidc_policy_helper.py b/gso/auth/oidc_policy_helper.py index 1ba2eb3b8cdb3db63babe30beabf2c6186e3ae3c..96edfbf3e08b21951a320dc40dfb6e1c8012a10e 100644 --- a/gso/auth/oidc_policy_helper.py +++ b/gso/auth/oidc_policy_helper.py @@ -252,7 +252,7 @@ class OIDCUser(HTTPBearer): return response = await async_request.get(self.openid_url + "/.well-known/openid-configuration") - self.openid_config = OIDCConfig.parse_obj(response.json()) + self.openid_config = OIDCConfig.model_validate(response.json()) async def userinfo(self, async_request: AsyncClient, token: str) -> OIDCUserModel: """Get the userinfo from the openid server. diff --git a/gso/auth/opa.py b/gso/auth/opa.py deleted file mode 100644 index 28c0cad8feff8d74207cda86078fdea0318b46ff..0000000000000000000000000000000000000000 --- a/gso/auth/opa.py +++ /dev/null @@ -1,44 +0,0 @@ -from http import HTTPStatus - -from fastapi.exceptions import HTTPException -from fastapi.params import Depends -from httpx import AsyncClient, NetworkError -from oauth2_lib.fastapi import OIDCUserModel, OPAAuthorization, OPAResult -from oauth2_lib.settings import oauth2lib_settings -from starlette.requests import Request -from structlog import get_logger - -from gso.auth.oidc import oidc_instance - -logger = get_logger(__name__) - - -class OPAAuthorization(OPAAuthorization): - _instance = None - - def __new__(cls, *args, **kwargs): - if cls._instance is None: - cls._instance = super(OPAAuthorization, cls).__new__(cls) - return cls._instance - - async def authorize( - self, request: Request, user_info: OIDCUserModel = Depends(oidc_instance.authenticate) - ) -> bool | None: - return await super().authorize(request, user_info) - - async def get_decision(self, async_request: AsyncClient, opa_input: dict) -> OPAResult: - logger.debug("Posting input json to Policy agent", opa_url=self.opa_url, input=opa_input) - try: - result = await async_request.post(self.opa_url, json=opa_input) - except (NetworkError, TypeError) as exc: - logger.debug("Could not get decision from policy agent", error=str(exc)) - raise HTTPException(status_code=HTTPStatus.SERVICE_UNAVAILABLE, detail="Policy agent is unavailable") - - json_result = result.json() - logger.debug("Received decision from policy agent", decision=json_result) - return OPAResult(decision_id=json_result["decision_id"], result=json_result["result"]["allow"]) - - -opa_instance = OPAAuthorization( - opa_url=oauth2lib_settings.OPA_URL, -) diff --git a/gso/middlewares.py b/gso/middlewares.py index 58106502b70a794cde29cfb714ce61101d056dbb..5ffca88ef1caf559eafd10a9a3d6700767929661 100644 --- a/gso/middlewares.py +++ b/gso/middlewares.py @@ -93,9 +93,9 @@ class ModifyProcessEndpointResponse(BaseHTTPMiddleware): if callback_result and isinstance(callback_result, str): callback_result = json.loads(callback_result) if callback_result.get("output") and len(callback_result["output"]) > max_output_length: - callback_result[ - "output" - ] = f'{request.base_url}api/v1/processes/steps/{step["step_id"]}/callback-results{token}' + callback_result["output"] = ( + f'{request.base_url}api/v1/processes/steps/{step["step_id"]}/callback-results{token}' + ) step["state"]["callback_result"] = callback_result except (AttributeError, KeyError, TypeError): pass diff --git a/gso/migrations/versions/2024-04-02_1ec810b289c0_add_orchestrator_2_1_2_migrations.py b/gso/migrations/versions/2024-04-02_1ec810b289c0_add_orchestrator_2_1_2_migrations.py index aa9593a8ba0279329a6361900a1965b2eddc365c..c9842e731cab00e90653b8ca5bb32185a3d0b025 100644 --- a/gso/migrations/versions/2024-04-02_1ec810b289c0_add_orchestrator_2_1_2_migrations.py +++ b/gso/migrations/versions/2024-04-02_1ec810b289c0_add_orchestrator_2_1_2_migrations.py @@ -5,6 +5,8 @@ Revises: Create Date: 2024-04-02 10:21:08.539591 """ +from alembic import op +from orchestrator.migrations.helpers import create_workflow, delete_workflow # revision identifiers, used by Alembic. revision = '1ec810b289c0' @@ -13,11 +15,47 @@ branch_labels = None # TODO: check it carefuly depends_on = '048219045729' # in this revision, SURF has added a new columns to the workflow table like delted_at, so we need to add a dependency on the revision that added the columns to the workflow table. +new_workflows = [ + { + "name": "import_site", + "target": "SYSTEM", + "description": "Import a site without provisioning it.", + "product_type": "Site" + }, + { + "name": "import_router", + "target": "SYSTEM", + "description": "Import a router without provisioning it.", + "product_type": "Router" + }, + { + "name": "import_iptrunk", + "target": "SYSTEM", + "description": "Import an IP trunk without provisioning it.", + "product_type": "Iptrunk" + }, + { + "name": "import_super_pop_switch", + "target": "SYSTEM", + "description": "Import a Super PoP switch without provisioning it.", + "product_type": "SuperPopSwitch" + }, + { + "name": "import_office_router", + "target": "SYSTEM", + "description": "Import an office router without provisioning it.", + "product_type": "OfficeRouter" + }, +] + def upgrade() -> None: - pass + conn = op.get_bind() + for workflow in new_workflows: + create_workflow(conn, workflow) def downgrade() -> None: - pass - + conn = op.get_bind() + for workflow in new_workflows: + delete_workflow(conn, workflow["name"]) diff --git a/gso/monkeypatches.py b/gso/monkeypatches.py index 7c929d9f0b7e3b58a968373b6d51daab36feb36e..b5c73b558b8eed81297c36f02909e5f316ceff5f 100644 --- a/gso/monkeypatches.py +++ b/gso/monkeypatches.py @@ -3,6 +3,7 @@ This adjustment is typically done to extend or modify the functionality of the original oauth2_lib package to meet specific requirements of the gso application. """ + from datetime import datetime from ipaddress import IPv4Address, IPv6Address @@ -18,9 +19,3 @@ oauth2_lib.fastapi.OIDCUserModel = OIDCUserModel # type: ignore[assignment, mis oauth2_lib.fastapi.opa_decision = opa_decision # type: ignore[assignment] oauth2_lib.fastapi.HTTPX_SSL_CONTEXT = HTTPX_SSL_CONTEXT oauth2_lib.settings.oauth2lib_settings = oauth2lib_settings # type: ignore[assignment] - -BaseModel.model_config["json_encoders"] = { - datetime: lambda dt: dt.timestamp(), - IPv4Address: lambda v: str(v), - IPv6Address: lambda v: str(v), -} diff --git a/gso/products/product_blocks/iptrunk.py b/gso/products/product_blocks/iptrunk.py index 1c4cab17fd128f6f63ecddaca18160e4c55ebfc3..de68b83ba5feb8dd33c06dfa791ab07a6ff3ccb8 100644 --- a/gso/products/product_blocks/iptrunk.py +++ b/gso/products/product_blocks/iptrunk.py @@ -1,7 +1,7 @@ """IP trunk product block that has all parameters of a subscription throughout its lifecycle.""" import ipaddress -from typing import Annotated +from typing import Annotated, Sequence, TypeVar, Generic from annotated_types import Len from orchestrator.domain.base import ProductBlockModel, T @@ -35,7 +35,7 @@ class IptrunkType(strEnum): LEASED = "Leased" -LAGMemberList = Annotated[list[T], AfterValidator(validate_unique_list), Len(min_length=0, max_length=None)] +LAGMemberList = Annotated[list[T], AfterValidator(validate_unique_list), Len(min_length=0)] IptrunkSides = Annotated[list[T], AfterValidator(validate_unique_list), Len(min_length=2, max_length=2)] @@ -83,7 +83,7 @@ class IptrunkSideBlockProvisioning(IptrunkSideBlockInactive, lifecycle=[Subscrip iptrunk_side_node: RouterBlockProvisioning iptrunk_side_ae_iface: str | None = None iptrunk_side_ae_geant_a_sid: str | None = None - iptrunk_side_ae_members: LAGMemberList[IptrunkInterfaceBlockProvisioning] + iptrunk_side_ae_members: LAGMemberList[IptrunkInterfaceBlockProvisioning] # type: ignore[assignment] class IptrunkSideBlock(IptrunkSideBlockProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]): @@ -92,7 +92,7 @@ class IptrunkSideBlock(IptrunkSideBlockProvisioning, lifecycle=[SubscriptionLife iptrunk_side_node: RouterBlock iptrunk_side_ae_iface: str | None = None iptrunk_side_ae_geant_a_sid: str | None = None - iptrunk_side_ae_members: LAGMemberList[IptrunkInterfaceBlock] + iptrunk_side_ae_members: LAGMemberList[IptrunkInterfaceBlock] # type: ignore[assignment] class IptrunkBlockInactive( @@ -124,7 +124,7 @@ class IptrunkBlockProvisioning(IptrunkBlockInactive, lifecycle=[SubscriptionLife iptrunk_isis_metric: int | None = None iptrunk_ipv4_network: ipaddress.IPv4Network | None = None iptrunk_ipv6_network: ipaddress.IPv6Network | None = None - iptrunk_sides: IptrunkSides[IptrunkSideBlockProvisioning] + iptrunk_sides: IptrunkSides[IptrunkSideBlockProvisioning] # type: ignore[assignment] class IptrunkBlock(IptrunkBlockProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]): @@ -147,4 +147,4 @@ class IptrunkBlock(IptrunkBlockProvisioning, lifecycle=[SubscriptionLifecycle.AC #: The IPv6 network used for this trunk. iptrunk_ipv6_network: ipaddress.IPv6Network #: The two sides that the trunk is connected to. - iptrunk_sides: IptrunkSides[IptrunkSideBlock] + iptrunk_sides: IptrunkSides[IptrunkSideBlock] # type: ignore[assignment] diff --git a/gso/products/product_blocks/office_router.py b/gso/products/product_blocks/office_router.py index c94bc9de79e67c2801122b4d76d7b12befbd6dbe..d5bcf1a3b4644a869fb1383a6ad1fd46dad8a5b3 100644 --- a/gso/products/product_blocks/office_router.py +++ b/gso/products/product_blocks/office_router.py @@ -1,6 +1,5 @@ """Product block for :class:`office router` products.""" - from orchestrator.domain.base import ProductBlockModel from orchestrator.types import SubscriptionLifecycle @@ -9,7 +8,7 @@ from gso.products.product_blocks.site import ( SiteBlockInactive, SiteBlockProvisioning, ) -from gso.utils.shared_enums import FancyIPV4Address, FancyIPV6Address, PortNumber, Vendor +from gso.utils.shared_enums import IPV4AddressType, IPV6AddressType, PortNumber, Vendor class OfficeRouterBlockInactive( @@ -21,8 +20,8 @@ class OfficeRouterBlockInactive( office_router_fqdn: str | None = None office_router_ts_port: PortNumber | None = None - office_router_lo_ipv4_address: FancyIPV4Address | None = None - office_router_lo_ipv6_address: FancyIPV6Address | None = None + office_router_lo_ipv4_address: IPV4AddressType | None = None + office_router_lo_ipv6_address: IPV6AddressType | None = None office_router_site: SiteBlockInactive | None vendor: Vendor | None = None @@ -32,8 +31,8 @@ class OfficeRouterBlockProvisioning(OfficeRouterBlockInactive, lifecycle=[Subscr office_router_fqdn: str | None = None office_router_ts_port: PortNumber | None = None - office_router_lo_ipv4_address: FancyIPV4Address | None = None - office_router_lo_ipv6_address: FancyIPV6Address | None = None + office_router_lo_ipv4_address: IPV4AddressType | None = None + office_router_lo_ipv6_address: IPV6AddressType | None = None office_router_site: SiteBlockProvisioning | None vendor: Vendor | None = None @@ -46,9 +45,9 @@ class OfficeRouterBlock(OfficeRouterBlockProvisioning, lifecycle=[SubscriptionLi #: The port of the terminal server that this office router is connected to. Used to offer out of band access. office_router_ts_port: PortNumber #: The IPv4 loopback address of the office router. - office_router_lo_ipv4_address: FancyIPV4Address + office_router_lo_ipv4_address: IPV4AddressType #: The IPv6 loopback address of the office router. - office_router_lo_ipv6_address: FancyIPV6Address + office_router_lo_ipv6_address: IPV6AddressType #: The :class:`Site` that this office router resides in. Both physically and computationally. office_router_site: SiteBlock #: The vendor of an office router. Defaults to Juniper. diff --git a/gso/products/product_blocks/router.py b/gso/products/product_blocks/router.py index 64da1e6f94fa830484c1ee70e20565b6da85b3fa..85e940a015ceae354efb269569dc3fb8b1539abe 100644 --- a/gso/products/product_blocks/router.py +++ b/gso/products/product_blocks/router.py @@ -8,7 +8,7 @@ from gso.products.product_blocks.site import ( SiteBlockInactive, SiteBlockProvisioning, ) -from gso.utils.shared_enums import FancyIPV4Address, FancyIPV6Address, PortNumber, Vendor +from gso.utils.shared_enums import IPV4AddressType, IPV6AddressType, PortNumber, Vendor class RouterRole(strEnum): @@ -29,8 +29,8 @@ class RouterBlockInactive( router_fqdn: str | None = None router_ts_port: PortNumber | None = None router_access_via_ts: bool | None = None - router_lo_ipv4_address: FancyIPV4Address | None = None - router_lo_ipv6_address: FancyIPV6Address | None = None + router_lo_ipv4_address: IPV4AddressType | None = None + router_lo_ipv6_address: IPV6AddressType | None = None router_lo_iso_address: str | None = None router_role: RouterRole | None = None router_site: SiteBlockInactive | None @@ -43,8 +43,8 @@ class RouterBlockProvisioning(RouterBlockInactive, lifecycle=[SubscriptionLifecy router_fqdn: str router_ts_port: PortNumber router_access_via_ts: bool - router_lo_ipv4_address: FancyIPV4Address - router_lo_ipv6_address: FancyIPV6Address + router_lo_ipv4_address: IPV4AddressType + router_lo_ipv6_address: IPV6AddressType router_lo_iso_address: str router_role: RouterRole router_site: SiteBlockProvisioning @@ -61,9 +61,9 @@ class RouterBlock(RouterBlockProvisioning, lifecycle=[SubscriptionLifecycle.ACTI #: Whether this router should be accessed through the terminal server, or through its loopback address. router_access_via_ts: bool #: The IPv4 loopback address of the router. - router_lo_ipv4_address: FancyIPV4Address + router_lo_ipv4_address: IPV4AddressType #: The IPv6 loopback address of the router. - router_lo_ipv6_address: FancyIPV6Address + router_lo_ipv6_address: IPV6AddressType #: The :term:`ISO` :term:`NET` of the router, used for :term:`ISIS` support. router_lo_iso_address: str #: The role of the router, which can be any of the values defined in :class:`RouterRole`. diff --git a/gso/products/product_blocks/site.py b/gso/products/product_blocks/site.py index de39478cc8e66d4643edd8509bf4fff44ce068a6..1f108a410fe3a39ae8c81eb653ce8ce39db842d3 100644 --- a/gso/products/product_blocks/site.py +++ b/gso/products/product_blocks/site.py @@ -1,7 +1,7 @@ """The product block that describes a site subscription.""" import re -from typing import Annotated +from typing import Annotated, TypeAlias from orchestrator.domain.base import ProductBlockModel from orchestrator.types import SubscriptionLifecycle, strEnum @@ -22,39 +22,51 @@ class SiteTier(strEnum): def validate_latitude(v: float) -> float: + """Validate a latitude coordinate.""" regex = re.compile(r"^-?([1-8]?\d(\.\d+)?|90(\.0+)?)$") if not regex.match(str(v)): - raise ValueError("Invalid latitude coordinate. Valid examples: '40.7128', '-74.0060', '90', '-90', '0'.") + msg = "Invalid latitude coordinate. Valid examples: '40.7128', '-74.0060', '90', '-90', '0'." + raise ValueError(msg) return v def validate_longitude(v: float) -> float: + """Validate a longitude coordinate.""" regex = re.compile(r"^-?(180(\.0+)?|((1[0-7]\d)|([1-9]?\d))(\.\d+)?)$") if not regex.match(str(v)): - raise ValueError("Invalid longitude coordinate. Valid examples: '40.7128', '-74.0060', '180', '-180', '0'.") + msg = "Invalid longitude coordinate. Valid examples: '40.7128', '-74.0060', '180', '-180', '0'." + raise ValueError(msg) return v -LatitudeCoordinate: type[float] = Annotated[ +LatitudeCoordinate = Annotated[ float, Field( ge=-90, le=90, - example="40.7128", - description="A latitude coordinate, modeled as a string. The coordinate must match the format conforming to the latitude range of -90 to +90 degrees. It can be a floating-point number or an integer. Valid examples: 40.7128, -74.0060, 90, -90, 0.", + description=( + "A latitude coordinate, modeled as a string. " + "The coordinate must match the format conforming to the latitude range of -90 to +90 degrees. " + "It can be a floating-point number or an integer. Valid examples: 40.7128, -74.0060, 90, -90, 0." + ), ), - AfterValidator(validate_latitude) + AfterValidator(validate_latitude), ] + LongitudeCoordinate = Annotated[ float, Field( ge=-180, le=180, - example="74.0060", - description="A longitude coordinate, modeled as a string. The coordinate must match the format conforming to the longitude range of -180 to +180 degrees. It can be a floating-point number or an integer. Valid examples: 40.7128, -74.0060, 180, -180, 0.", + description=( + "A longitude coordinate, modeled as a string. " + "The coordinate must match the format conforming to the longitude " + "range of -180 to +180 degrees. It can be a floating-point number or an integer. " + "Valid examples: 40.7128, -74.0060, 180, -180, 0." + ), ), - AfterValidator(validate_longitude) + AfterValidator(validate_longitude), ] diff --git a/gso/products/product_blocks/super_pop_switch.py b/gso/products/product_blocks/super_pop_switch.py index 8dc124ce59c4756931051faad4a2aa9f3318a2a0..1b4adafa3854b68277a6036969889d939548f030 100644 --- a/gso/products/product_blocks/super_pop_switch.py +++ b/gso/products/product_blocks/super_pop_switch.py @@ -1,6 +1,5 @@ """Product block for :class:`Super PoP Switch` products.""" - from orchestrator.domain.base import ProductBlockModel from orchestrator.types import SubscriptionLifecycle @@ -9,7 +8,7 @@ from gso.products.product_blocks.site import ( SiteBlockInactive, SiteBlockProvisioning, ) -from gso.utils.shared_enums import FancyIPV4Address, PortNumber, Vendor +from gso.utils.shared_enums import IPV4AddressType, PortNumber, Vendor class SuperPopSwitchBlockInactive( @@ -21,7 +20,7 @@ class SuperPopSwitchBlockInactive( super_pop_switch_fqdn: str | None = None super_pop_switch_ts_port: PortNumber | None = None - super_pop_switch_mgmt_ipv4_address: FancyIPV4Address | None = None + super_pop_switch_mgmt_ipv4_address: IPV4AddressType | None = None super_pop_switch_site: SiteBlockInactive | None vendor: Vendor | None = None @@ -31,7 +30,7 @@ class SuperPopSwitchBlockProvisioning(SuperPopSwitchBlockInactive, lifecycle=[Su super_pop_switch_fqdn: str | None = None super_pop_switch_ts_port: PortNumber | None = None - super_pop_switch_mgmt_ipv4_address: FancyIPV4Address | None = None + super_pop_switch_mgmt_ipv4_address: IPV4AddressType | None = None super_pop_switch_site: SiteBlockProvisioning | None vendor: Vendor | None = None @@ -44,7 +43,7 @@ class SuperPopSwitchBlock(SuperPopSwitchBlockProvisioning, lifecycle=[Subscripti #: The port of the terminal server that this Super PoP switch is connected to. Used to offer out of band access. super_pop_switch_ts_port: PortNumber #: The IPv4 management address of the Super PoP switch. - super_pop_switch_mgmt_ipv4_address: FancyIPV4Address + super_pop_switch_mgmt_ipv4_address: IPV4AddressType #: The :class:`Site` that this Super PoP switch resides in. Both physically and computationally. super_pop_switch_site: SiteBlock #: The vendor of a Super PoP switch. Defaults to Juniper. diff --git a/gso/schema/partner.py b/gso/schema/partner.py index d8aa1b741a38d552ec625bfaa9c81905c2e32ab0..b1c58c2cf91bf544501f6b2e316117b8b83a70c9 100644 --- a/gso/schema/partner.py +++ b/gso/schema/partner.py @@ -14,7 +14,7 @@ class PartnerCreate(BaseModel): partner_id: str = Field(default_factory=lambda: str(uuid4())) name: str email: EmailStr | None = None - as_number: str | None = Field(None, unique=True) + as_number: str | None = None as_set: str | None = None route_set: str | None = None black_listed_as_sets: list[str] | None = None diff --git a/gso/services/infoblox.py b/gso/services/infoblox.py index 234682680484071ba79fa2d411ae980596853cff..1645773fe7063c682a5428a13fa58ed8950ef755 100644 --- a/gso/services/infoblox.py +++ b/gso/services/infoblox.py @@ -10,7 +10,7 @@ from infoblox_client.exceptions import ( ) from gso.settings import IPAMParams, load_oss_params -from gso.utils.shared_enums import FancyIPV4Address, FancyIPV6Address +from gso.utils.shared_enums import IPV4AddressType, IPV6AddressType logger = getLogger(__name__) NULL_MAC = "00:00:00:00:00:00" @@ -234,8 +234,8 @@ def allocate_host( def create_host_by_ip( hostname: str, - ipv4_address: FancyIPV4Address, - ipv6_address: FancyIPV6Address, + ipv4_address: IPV4AddressType, + ipv6_address: IPV6AddressType, service_type: str, comment: str, ) -> None: @@ -269,7 +269,7 @@ def create_host_by_ip( new_host.update() -def find_host_by_ip(ip_addr: FancyIPV4Address | ipaddress.IPv6Address) -> objects.HostRecord | None: +def find_host_by_ip(ip_addr: IPV4AddressType | ipaddress.IPv6Address) -> objects.HostRecord | None: """Find a host record in Infoblox by its associated IP address. :param ip_addr: The IP address of a host that is searched for. @@ -315,7 +315,7 @@ def find_v6_host_by_fqdn(fqdn: str) -> objects.HostRecordV6: ) -def delete_host_by_ip(ip_addr: FancyIPV4Address | ipaddress.IPv6Address) -> None: +def delete_host_by_ip(ip_addr: IPV4AddressType | ipaddress.IPv6Address) -> None: """Delete a host from Infoblox. Delete a host record in Infoblox, by providing the IP address that is associated with the record. Raises a diff --git a/gso/services/lso_client.py b/gso/services/lso_client.py index fc8eabd1f0d7f4c36af7a267a2a88cc0bb2bfd93..f707cba2e4d251cf1ee7d7cc017c6cc534505edb 100644 --- a/gso/services/lso_client.py +++ b/gso/services/lso_client.py @@ -130,8 +130,8 @@ def _show_results(state: State) -> FormGenerator: if "lso_result_extra_label" in state: extra_label: Label = state["lso_result_extra_label"] - run_status: ReadOnlyField(state["callback_result"]["status"], default_type=str) - run_results: ReadOnlyField(json.dumps(state["callback_result"], indent=4), default_type=LongText) + run_status: ReadOnlyField(state["callback_result"]["status"], default_type=str) # type: ignore[valid-type] + run_results: ReadOnlyField(json.dumps(state["callback_result"], indent=4), default_type=LongText) # type: ignore[valid-type] yield ConfirmRunPage [state.pop(key, None) for key in ["run_results", "lso_result_title", "lso_result_extra_label"]] diff --git a/gso/utils/helpers.py b/gso/utils/helpers.py index c34b672236edcc101df94e6e2c429a40cae98dff..a0c71bb413dd430d30cf5171827847bd1f60fd1a 100644 --- a/gso/utils/helpers.py +++ b/gso/utils/helpers.py @@ -16,7 +16,7 @@ from gso.products.product_blocks.site import LatitudeCoordinate, LongitudeCoordi from gso.products.product_types.router import Router from gso.services.netbox_client import NetboxClient from gso.services.subscriptions import get_active_subscriptions_by_field_and_value -from gso.utils.shared_enums import FancyIPV4Address, Vendor +from gso.utils.shared_enums import IPV4AddressType, Vendor class LAGMember(BaseModel): @@ -105,7 +105,7 @@ def get_router_vendor(router_id: UUID) -> Vendor: return Router.from_subscription(router_id).router.vendor -def iso_from_ipv4(ipv4_address: FancyIPV4Address) -> str: +def iso_from_ipv4(ipv4_address: IPV4AddressType) -> str: """Calculate an :term:`ISO` address, based on an IPv4 address. :param IPv4Address ipv4_address: The address that's to be converted @@ -155,12 +155,11 @@ def validate_iptrunk_unique_interface(interfaces: list[LAGMember]) -> list[LAGMe return interfaces -def validate_site_fields_is_unique(field_name: str, value: str | int) -> str | int: +def validate_site_fields_is_unique(field_name: str, value: str | int) -> None: """Validate that a site field is unique.""" if len(get_active_subscriptions_by_field_and_value(field_name, str(value))) > 0: msg = f"{field_name} must be unique" raise ValueError(msg) - return value def validate_ipv4_or_ipv6(value: str) -> str: @@ -186,7 +185,7 @@ def validate_country_code(country_code: str) -> str: return country_code -def validate_site_name(site_name: str) -> str: +def validate_site_name(site_name: str) -> None: """Validate the site name. The site name must consist of three uppercase letters, optionally followed by a single digit. @@ -198,7 +197,6 @@ def validate_site_name(site_name: str) -> str: f"digit (0-9). Received: {site_name}" ) raise ValueError(msg) - return site_name class BaseSiteValidatorModel(BaseModel): @@ -215,7 +213,6 @@ class BaseSiteValidatorModel(BaseModel): site_latitude: LatitudeCoordinate site_longitude: LongitudeCoordinate - @field_validator("site_ts_address") def validate_ts_address(cls, site_ts_address: str) -> str: """Validate that a terminal server address is valid.""" @@ -231,22 +228,26 @@ class BaseSiteValidatorModel(BaseModel): @field_validator("site_ts_address") def site_ts_address_must_be_unique(cls, site_ts_address: str) -> str: """Validate that the internal and :term:`BGP` community IDs are unique.""" - return validate_site_fields_is_unique("site_ts_address", site_ts_address) + validate_site_fields_is_unique("site_ts_address", site_ts_address) + return site_ts_address @field_validator("site_internal_id") def site_internal_id_must_be_unique(cls, site_internal_id: int) -> int: """Validate that the internal and :term:`BGP` community IDs are unique.""" - return validate_site_fields_is_unique("site_internal_id", site_internal_id) + validate_site_fields_is_unique("site_internal_id", site_internal_id) + return site_internal_id @field_validator("site_bgp_community_id") def site_bgp_community_id_must_be_unique(cls, site_bgp_community_id: int) -> int: """Validate that the internal and :term:`BGP` community IDs are unique.""" - return validate_site_fields_is_unique("site_bgp_community_id", site_bgp_community_id) + validate_site_fields_is_unique("site_bgp_community_id", site_bgp_community_id) + return site_bgp_community_id @field_validator("site_name") def site_name_must_be_unique(cls, site_name: str) -> str: """Validate that the internal and :term:`BGP` community IDs are unique.""" - return validate_site_fields_is_unique("site_name", site_name) + validate_site_fields_is_unique("site_name", site_name) + return site_name @field_validator("site_name") def site_name_must_be_valid(cls, site_name: str) -> str: diff --git a/gso/utils/shared_enums.py b/gso/utils/shared_enums.py index 7fe5c1522a83a128b54081eb893086475f111487..86865ba70b9612be6595304d93e23a1006dee941 100644 --- a/gso/utils/shared_enums.py +++ b/gso/utils/shared_enums.py @@ -1,4 +1,5 @@ """Shared choices for the different models.""" + import ipaddress from typing import Annotated @@ -26,12 +27,12 @@ PortNumber = Annotated[ ] -FancyIPV4Address = Annotated[ +IPV4AddressType = Annotated[ ipaddress.IPv4Address, PlainSerializer(lambda ip: str(ip), return_type=str, when_used="always") ] -FancyIPV6Address = Annotated[ - ipaddress.IPv6Address, PlainSerializer(lambda ip: str(ip), return_type=str, when_used="always") +IPV6AddressType = Annotated[ + ipaddress.IPv6Address, PlainSerializer(lambda ip: str(ip), return_type=str, when_used="always") ] @@ -40,4 +41,3 @@ class ConnectionStrategy(strEnum): IN_BAND = "IN BAND" OUT_OF_BAND = "OUT OF BAND" - diff --git a/gso/workflows/iptrunk/activate_iptrunk.py b/gso/workflows/iptrunk/activate_iptrunk.py index f686a8cb7e3c825dceffeb876c644a37342ce3d8..a98a0446c50a4de14b8f55502b633babb7d027af 100644 --- a/gso/workflows/iptrunk/activate_iptrunk.py +++ b/gso/workflows/iptrunk/activate_iptrunk.py @@ -16,7 +16,7 @@ def _initial_input_form(subscription_id: UUIDstr) -> FormGenerator: trunk = Iptrunk.from_subscription(subscription_id) class ActivateTrunkForm(FormPage): - info_label: Label = "Start approval process for IP trunk activation." # type:ignore[assignment] + info_label: Label = "Start approval process for IP trunk activation." user_input = yield ActivateTrunkForm @@ -28,7 +28,7 @@ def verify_complete_checklist() -> FormGenerator: """Show a form for the operator to input a link to the completed checklist.""" class VerifyCompleteForm(FormPage): - info_label: Label = "Verify that the checklist has been completed. Then continue this workflow." # type: ignore[assignment] + info_label: Label = "Verify that the checklist has been completed. Then continue this workflow." checklist_url: str = "" user_input = yield VerifyCompleteForm diff --git a/gso/workflows/iptrunk/create_iptrunk.py b/gso/workflows/iptrunk/create_iptrunk.py index 80aa2c06b760351fdcc91e7645ba332cde5edde3..71f5453bb196c7d79ad67ea1ca0fe5fc97b656f4 100644 --- a/gso/workflows/iptrunk/create_iptrunk.py +++ b/gso/workflows/iptrunk/create_iptrunk.py @@ -1,7 +1,7 @@ """A creation workflow that deploys a new IP trunk service.""" import json -from typing import Annotated +from typing import Annotated, TypeAlias from uuid import uuid4 from annotated_types import Len @@ -57,7 +57,7 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: model_config = ConfigDict(title=product_name) tt_number: str - partner: ReadOnlyField("GEANT", default_type=str) + partner: ReadOnlyField("GEANT", default_type=str) # type: ignore[valid-type] geant_s_sid: str | None iptrunk_description: str iptrunk_type: IptrunkType @@ -104,7 +104,7 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: ae_members_side_a = Annotated[list[NokiaLAGMemberA], AfterValidator(validate_unique_list), Len(min_length=initial_user_input.iptrunk_number_of_members, max_length=initial_user_input.iptrunk_number_of_members)] else: - ae_members_side_a = JuniperAeMembers # type: ignore[assignment] + ae_members_side_a_type = JuniperAeMembers # type: ignore[assignment] class CreateIptrunkSideAForm(FormPage): model_config = ConfigDict(title=f"Provide subscription details for side A of the trunk.({router_a_fqdn})") @@ -146,9 +146,15 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: initial_user_input.iptrunk_speed, ) - ae_members_side_b = Annotated[list[NokiaLAGMemberB], AfterValidator(validate_unique_list), Len(min_length=len(user_input_side_a.side_a_ae_members), max_length= len(user_input_side_a.side_a_ae_members))] + ae_members_side_b = Annotated[ + list[NokiaLAGMemberB], + AfterValidator(validate_unique_list), + Len( + min_length=len(user_input_side_a.side_a_ae_members), max_length=len(user_input_side_a.side_a_ae_members) + ), + ] else: - ae_members_side_b = JuniperAeMembers # type: ignore[assignment] + ae_members_side_b = JuniperAeMembers class CreateIptrunkSideBForm(FormPage): model_config = ConfigDict(title=f"Provide subscription details for side B of the trunk.({router_b_fqdn})") @@ -440,7 +446,7 @@ def reserve_interfaces_in_netbox(subscription: IptrunkInactive) -> State: ) nbclient.reserve_interface( device_name=trunk_side.iptrunk_side_node.router_fqdn, - iface_name=interface.interface_name, + iface_name=interface.interface_name, # type: ignore[arg-type] ) return { "subscription": subscription, @@ -479,11 +485,11 @@ def prompt_start_new_checklist(subscription: IptrunkProvisioning) -> FormGenerat model_config = ConfigDict(title="Start new checklist") info_label_1: Label = ( - f"Visit {oss_params.SHAREPOINT.checklist_site_url} and start a new Sharepoint checklist for an IPtrunk " # type: ignore[assignment] + f"Visit {oss_params.SHAREPOINT.checklist_site_url} and start a new Sharepoint checklist for an IPtrunk " f"from {subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn} to " f"{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}." ) - info_label_2: Label = "Once this is done, click proceed to finish the workflow." # type: ignore[assignment] + info_label_2: Label = "Once this is done, click proceed to finish the workflow." yield SharepointPrompt diff --git a/gso/workflows/iptrunk/deploy_twamp.py b/gso/workflows/iptrunk/deploy_twamp.py index db5f5e110fb26324f91b4687c2f77cb7316f928b..c8342f9a44e996ede5a30de69b06faabf846bc14 100644 --- a/gso/workflows/iptrunk/deploy_twamp.py +++ b/gso/workflows/iptrunk/deploy_twamp.py @@ -24,7 +24,7 @@ def _initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: info_label: Label = ( "Please confirm deployment of TWAMP on IP trunk from " f"{trunk.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn} to " - f"{trunk.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}" # type: ignore[assignment] + f"{trunk.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}" ) tt_number: str diff --git a/gso/workflows/iptrunk/migrate_iptrunk.py b/gso/workflows/iptrunk/migrate_iptrunk.py index a8dd7cb42738c0eee3126b5ad53bc168b05def11..0a30e01079383504d659324e1e11ccfb793b1594 100644 --- a/gso/workflows/iptrunk/migrate_iptrunk.py +++ b/gso/workflows/iptrunk/migrate_iptrunk.py @@ -67,7 +67,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: tt_number: str replace_side: replaced_side_enum # type: ignore[valid-type] - warning_label: Label = "Are we moving to a different Site?" # type: ignore[assignment] + warning_label: Label = "Are we moving to a different Site?" migrate_to_different_site: bool = False restore_isis_metric: bool = True @@ -116,10 +116,23 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: subscription.iptrunk.iptrunk_speed, ) - ae_members = Annotated[list[NokiaLAGMember], AfterValidator(validate_unique_list), Len(min_length=len(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members), max_length=len(subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_members))] + ae_members = Annotated[ + list[NokiaLAGMember], + AfterValidator(validate_unique_list), + Len( + min_length=len(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members), + max_length=len(subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_members), + ), + ] else: - - ae_members = Annotated[list[LAGMember], AfterValidator(validate_unique_list), Len(min_length=len(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members), max_length=len(subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_members))] + ae_members = Annotated[ + list[LAGMember], + AfterValidator(validate_unique_list), + Len( + min_length=len(subscription.iptrunk.iptrunk_sides[0].iptrunk_side_ae_members), + max_length=len(subscription.iptrunk.iptrunk_sides[1].iptrunk_side_ae_members), + ), + ] replace_index = ( 0 @@ -131,7 +144,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: LAGMember( interface_name=iface.interface_name, interface_description=iface.interface_description, - ) + ) for iface in subscription.iptrunk.iptrunk_sides[replace_index].iptrunk_side_ae_members ] @@ -139,8 +152,8 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: model_config = ConfigDict(title=form_title) new_lag_interface: side_a_ae_iface # type: ignore[valid-type] - existing_lag_interface: ReadOnlyField(existing_lag_ae_members, default_type=list[LAGMember]) - new_lag_member_interfaces: ae_members # type: ignore[valid-type] + existing_lag_interface: ReadOnlyField(existing_lag_ae_members, default_type=list[LAGMember]) # type: ignore[valid-type] + new_lag_member_interfaces: ae_members @field_validator("new_lag_interface") def lag_interface_proper_name(cls, new_lag_interface: str) -> str: @@ -385,7 +398,7 @@ def confirm_continue_move_fiber() -> FormGenerator: class ProvisioningResultPage(FormPage): model_config = ConfigDict(title="Please confirm before continuing") - info_label: Label = "New trunk interface has been deployed, wait for the physical connection to be moved." # type: ignore[assignment] + info_label: Label = "New trunk interface has been deployed, wait for the physical connection to be moved." yield ProvisioningResultPage @@ -474,7 +487,7 @@ def confirm_continue_restore_isis() -> FormGenerator: class ProvisioningResultPage(FormPage): model_config = ConfigDict(title="Please confirm before continuing") - info_label: Label = "ISIS config has been deployed, confirm if you want to restore the old metric." # type: ignore[assignment] + info_label: Label = "ISIS config has been deployed, confirm if you want to restore the old metric." yield ProvisioningResultPage diff --git a/gso/workflows/iptrunk/modify_trunk_interface.py b/gso/workflows/iptrunk/modify_trunk_interface.py index 63c7566654c84bbdaa34303b6da05a6501b283d1..dcbb40cf874dafb4507aa6c8f8c7c7c00d5be182 100644 --- a/gso/workflows/iptrunk/modify_trunk_interface.py +++ b/gso/workflows/iptrunk/modify_trunk_interface.py @@ -1,7 +1,7 @@ """A modification workflow that updates the :term:`LAG` interfaces that are part of an existing IP trunk.""" import json -from typing import Annotated +from typing import Annotated, TypeVar, Type, Any from uuid import UUID, uuid4 from annotated_types import Len @@ -33,10 +33,12 @@ from gso.utils.helpers import ( validate_iptrunk_unique_interface, validate_tt_number, ) -from gso.utils.shared_enums import FancyIPV4Address, FancyIPV6Address, Vendor +from gso.utils.shared_enums import IPV4AddressType, IPV6AddressType, Vendor +T = TypeVar('T', bound=LAGMember) -def initialize_ae_members(subscription: Iptrunk, initial_user_input: dict, side_index: int) -> type[LAGMember]: + +def initialize_ae_members(subscription: Iptrunk, initial_user_input: dict, side_index: int) -> type[list[Any]]: """Initialize the list of AE members.""" router = subscription.iptrunk.iptrunk_sides[side_index].iptrunk_side_node router_vendor = get_router_vendor(router.owner_subscription_id) @@ -78,13 +80,13 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: iptrunk_type: IptrunkType = subscription.iptrunk.iptrunk_type warning_label: Label = ( "Changing the PhyPortCapacity will result in the deletion of all AE members. " - "You will need to add the new AE members in the next steps." # type: ignore[assignment] + "You will need to add the new AE members in the next steps." ) iptrunk_speed: PhysicalPortCapacity = subscription.iptrunk.iptrunk_speed iptrunk_number_of_members: int = subscription.iptrunk.iptrunk_minimum_links + 1 iptrunk_isis_metric: ReadOnlyField(subscription.iptrunk.iptrunk_isis_metric, default_type=int) - iptrunk_ipv4_network: ReadOnlyField(str(subscription.iptrunk.iptrunk_ipv4_network), default_type=FancyIPV4Address) - iptrunk_ipv6_network: ReadOnlyField(str(subscription.iptrunk.iptrunk_ipv6_network), default_type=FancyIPV6Address) + iptrunk_ipv4_network: ReadOnlyField(str(subscription.iptrunk.iptrunk_ipv4_network), default_type=IPV4AddressType) + iptrunk_ipv6_network: ReadOnlyField(str(subscription.iptrunk.iptrunk_ipv6_network), default_type=IPV6AddressType) @field_validator("tt_number") def validate_tt_number(cls, tt_number: str) -> str: @@ -216,11 +218,11 @@ def modify_iptrunk_subscription( @step("[DRY RUN] Provision IP trunk interface") def provision_ip_trunk_iface_dry( - subscription: Iptrunk, - process_id: UUIDstr, - callback_route: str, - tt_number: str, - removed_ae_members: list[str], + subscription: Iptrunk, + process_id: UUIDstr, + callback_route: str, + tt_number: str, + removed_ae_members: list[str], ) -> State: """Perform a dry run of deploying the updated IP trunk.""" extra_vars = { @@ -230,14 +232,14 @@ def provision_ip_trunk_iface_dry( "config_object": "trunk_interface", "removed_ae_members": removed_ae_members, "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy config for " - f"{subscription.iptrunk.geant_s_sid}", + f"{subscription.iptrunk.geant_s_sid}", } execute_playbook( playbook_name="iptrunks.yaml", callback_route=callback_route, inventory=f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}\n" - f"{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}\n", + f"{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}\n", extra_vars=extra_vars, ) @@ -246,11 +248,11 @@ def provision_ip_trunk_iface_dry( @step("[FOR REAL] Provision IP trunk interface") def provision_ip_trunk_iface_real( - subscription: Iptrunk, - process_id: UUIDstr, - callback_route: str, - tt_number: str, - removed_ae_members: list[str], + subscription: Iptrunk, + process_id: UUIDstr, + callback_route: str, + tt_number: str, + removed_ae_members: list[str], ) -> State: """Provision the new IP trunk with updated interfaces.""" extra_vars = { @@ -260,14 +262,14 @@ def provision_ip_trunk_iface_real( "config_object": "trunk_interface", "removed_ae_members": removed_ae_members, "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy config for " - f"{subscription.iptrunk.geant_s_sid}", + f"{subscription.iptrunk.geant_s_sid}", } execute_playbook( playbook_name="iptrunks.yaml", callback_route=callback_route, inventory=f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}\n" - f"{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}\n", + f"{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}\n", extra_vars=extra_vars, ) @@ -275,10 +277,10 @@ def provision_ip_trunk_iface_real( def _netbox_update_interfaces( - subscription_id: UUID, - side_block: IptrunkSideBlock, - removed_ae_members: list[dict], - previous_ae_members: list[dict], + subscription_id: UUID, + side_block: IptrunkSideBlock, + removed_ae_members: list[dict], + previous_ae_members: list[dict], ) -> None: nbclient = NetboxClient() @@ -308,7 +310,7 @@ def _netbox_update_interfaces( @step("Netbox: Reserve side A interfaces") def netbox_update_interfaces_side_a( - subscription: Iptrunk, removed_ae_members: list[list[dict]], previous_ae_members: list[list[dict]] + subscription: Iptrunk, removed_ae_members: list[list[dict]], previous_ae_members: list[list[dict]] ) -> None: """Update Netbox such that it contains the new interfaces on side A.""" _netbox_update_interfaces( @@ -321,7 +323,7 @@ def netbox_update_interfaces_side_a( @step("Netbox: Reserve side B interfaces") def netbox_update_interfaces_side_b( - subscription: Iptrunk, removed_ae_members: list[list[dict]], previous_ae_members: list[list[dict]] + subscription: Iptrunk, removed_ae_members: list[list[dict]], previous_ae_members: list[list[dict]] ) -> None: """Update Netbox such that it contains the new interfaces on side B.""" _netbox_update_interfaces( @@ -386,25 +388,25 @@ def modify_trunk_interface() -> StepList: lambda state: get_router_vendor( state["subscription"]["iptrunk"]["iptrunk_sides"][0]["iptrunk_side_node"]["owner_subscription_id"] ) - == Vendor.NOKIA + == Vendor.NOKIA ) side_b_is_nokia = conditional( lambda state: get_router_vendor( state["subscription"]["iptrunk"]["iptrunk_sides"][1]["iptrunk_side_node"]["owner_subscription_id"] ) - == Vendor.NOKIA + == Vendor.NOKIA ) return ( - init - >> store_process_subscription(Target.MODIFY) - >> unsync - >> modify_iptrunk_subscription - >> side_a_is_nokia(netbox_update_interfaces_side_a) - >> side_b_is_nokia(netbox_update_interfaces_side_b) - >> lso_interaction(provision_ip_trunk_iface_dry) - >> lso_interaction(provision_ip_trunk_iface_real) - >> side_a_is_nokia(allocate_interfaces_in_netbox_side_a) - >> side_b_is_nokia(allocate_interfaces_in_netbox_side_b) - >> resync - >> done + init + >> store_process_subscription(Target.MODIFY) + >> unsync + >> modify_iptrunk_subscription + >> side_a_is_nokia(netbox_update_interfaces_side_a) + >> side_b_is_nokia(netbox_update_interfaces_side_b) + >> lso_interaction(provision_ip_trunk_iface_dry) + >> lso_interaction(provision_ip_trunk_iface_real) + >> side_a_is_nokia(allocate_interfaces_in_netbox_side_a) + >> side_b_is_nokia(allocate_interfaces_in_netbox_side_b) + >> resync + >> done ) diff --git a/gso/workflows/router/activate_router.py b/gso/workflows/router/activate_router.py index 4de880f4e2b8a9cc13b7c1c80315fef634e577c9..b742c58cd5476685fc6ea8199fbcfa96cf865527 100644 --- a/gso/workflows/router/activate_router.py +++ b/gso/workflows/router/activate_router.py @@ -16,7 +16,7 @@ def _initial_input_form(subscription_id: UUIDstr) -> FormGenerator: router = Router.from_subscription(subscription_id) class ActivateRouterForm(FormPage): - info_label: Label = "Start approval process for router activation." # type:ignore[assignment] + info_label: Label = "Start approval process for router activation." user_input = yield ActivateRouterForm @@ -28,7 +28,7 @@ def verify_complete_checklist() -> FormGenerator: """Show a form for the operator to input a link to the completed checklist.""" class VerifyCompleteForm(FormPage): - info_label: Label = "Verify that the checklist has been completed. Then continue this workflow." # type: ignore[assignment] + info_label: Label = "Verify that the checklist has been completed. Then continue this workflow." checklist_url: str = "" user_input = yield VerifyCompleteForm diff --git a/gso/workflows/router/create_router.py b/gso/workflows/router/create_router.py index fe5260cdec75f74b2bb6afbd9786ebc9492968a7..56df7e9328960114fbe5844a549434f6e365a015 100644 --- a/gso/workflows/router/create_router.py +++ b/gso/workflows/router/create_router.py @@ -42,7 +42,7 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: model_config = ConfigDict(title=product_name) tt_number: str - partner: ReadOnlyField("GEANT", default_type=str) + partner: ReadOnlyField("GEANT", default_type=str) # type: ignore[valid-type] vendor: Vendor router_site: _site_selector() # type: ignore[valid-type] hostname: str @@ -52,7 +52,7 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: @model_validator(mode="after") def hostname_must_be_available(self) -> Self: router_site = self.router_site - if not router_site: # TODO Test on UI + if not router_site: # TODO Test on UI msg = "Please select a site before setting the hostname." raise ValueError(msg) @@ -155,13 +155,13 @@ def prompt_reboot_router(subscription: RouterInactive) -> FormGenerator: if subscription.router.router_site and subscription.router.router_site.site_ts_address: info_label_1: Label = ( - f"Base config has been deployed. Please log in via the console using https://" # type: ignore[assignment] + f"Base config has been deployed. Please log in via the console using https://" f"{subscription.router.router_site.site_ts_address}." ) else: - info_label_1 = "Base config has been deployed. Please log in via the console." # type: ignore[assignment] + info_label_1 = "Base config has been deployed. Please log in via the console." - info_label_2: Label = "Reboot the router, and once it is up again, press submit to continue the workflow." # type: ignore[assignment] + info_label_2: Label = "Reboot the router, and once it is up again, press submit to continue the workflow." yield RebootPrompt @@ -176,9 +176,9 @@ def prompt_console_login() -> FormGenerator: model_config = ConfigDict(title="Verify local authentication") info_label_1: Label = ( - "Verify that you are able to log in to the router via the console using the admin account." # type: ignore[assignment] + "Verify that you are able to log in to the router via the console using the admin account." ) - info_label_2: Label = "Once this is done, press submit to continue the workflow." # type: ignore[assignment] + info_label_2: Label = "Once this is done, press submit to continue the workflow." yield ConsolePrompt @@ -192,8 +192,8 @@ def prompt_insert_in_ims() -> FormGenerator: class IMSPrompt(FormPage): model_config = ConfigDict(title="Update IMS mediation server") - info_label_1: Label = "Insert the router into IMS." # type: ignore[assignment] - info_label_2: Label = "Once this is done, press submit to continue the workflow." # type: ignore[assignment] + info_label_1: Label = "Insert the router into IMS." + info_label_2: Label = "Once this is done, press submit to continue the workflow." yield IMSPrompt @@ -208,10 +208,10 @@ def prompt_insert_in_radius(subscription: RouterInactive) -> FormGenerator: model_config = ConfigDict(title="Update RADIUS clients") info_label_1: Label = ( - f"Please go to https://kratos.geant.org/add_radius_client and add the {subscription.router.router_fqdn}" # type: ignore[assignment] + f"Please go to https://kratos.geant.org/add_radius_client and add the {subscription.router.router_fqdn}" f" - {subscription.router.router_lo_ipv4_address} to radius authentication" ) - info_label_2: Label = "This will be functionally checked later during verification work." # type: ignore[assignment] + info_label_2: Label = "This will be functionally checked later during verification work." yield RadiusPrompt @@ -228,9 +228,9 @@ def prompt_start_new_checklist(subscription: RouterProvisioning) -> FormGenerato info_label_1: Label = ( f"Visit {oss_params.SHAREPOINT.checklist_site_url} and start a new Sharepoint checklist for " - f"{subscription.router.router_fqdn}." # type: ignore[assignment] + f"{subscription.router.router_fqdn}." ) - info_label_2: Label = "Once this is done, click proceed to finish the workflow." # type: ignore[assignment] + info_label_2: Label = "Once this is done, click proceed to finish the workflow." yield SharepointPrompt diff --git a/gso/workflows/router/redeploy_base_config.py b/gso/workflows/router/redeploy_base_config.py index ffacc0ce7f1e383ac258971a78601bf39ac6af89..0393a377abe9c5351e55f0181ee2e6a8ab30f229 100644 --- a/gso/workflows/router/redeploy_base_config.py +++ b/gso/workflows/router/redeploy_base_config.py @@ -17,7 +17,7 @@ def _initial_input_form(subscription_id: UUIDstr) -> FormGenerator: router = Router.from_subscription(subscription_id) class RedeployBaseConfigForm(FormPage): - info_label: Label = f"Redeploy base config on {router.router.router_fqdn}?" # type: ignore[assignment] + info_label: Label = f"Redeploy base config on {router.router.router_fqdn}?" tt_number: str user_input = yield RedeployBaseConfigForm diff --git a/gso/workflows/router/update_ibgp_mesh.py b/gso/workflows/router/update_ibgp_mesh.py index 09bde9c89f1ac82176fbeb6d2fc7bed6bf90f4e3..6c799e2056634a0ee11e73fa155716ed0b6e3ce8 100644 --- a/gso/workflows/router/update_ibgp_mesh.py +++ b/gso/workflows/router/update_ibgp_mesh.py @@ -34,7 +34,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: tt_number: str @model_validator(mode="before") - def router_has_a_trunk(self ) -> Self: + def router_has_a_trunk(self) -> Self: terminating_trunks = get_trunks_that_terminate_on_router( subscription_id, SubscriptionLifecycle.PROVISIONING ) + get_trunks_that_terminate_on_router(subscription_id, SubscriptionLifecycle.ACTIVE) @@ -204,7 +204,7 @@ def prompt_insert_in_radius() -> FormGenerator: class RADIUSPrompt(FormPage): model_config = ConfigDict(title="Please update RADIUS before continuing") - info_label: Label = "Insert the router into RADIUS, and continue the workflow once this has been completed." # type: ignore[assignment] + info_label: Label = "Insert the router into RADIUS, and continue the workflow once this has been completed." yield RADIUSPrompt @@ -218,7 +218,7 @@ def prompt_radius_login() -> FormGenerator: class RADIUSPrompt(FormPage): model_config = ConfigDict(title="Please check RADIUS before continuing") - info_label: Label = "Log in to the router using RADIUS, and continue the workflow when this was successful." # type: ignore[assignment] + info_label: Label = "Log in to the router using RADIUS, and continue the workflow when this was successful." yield RADIUSPrompt diff --git a/gso/workflows/shared/cancel_subscription.py b/gso/workflows/shared/cancel_subscription.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/gso/workflows/site/create_site.py b/gso/workflows/site/create_site.py index 42e4fc0cf5639b02a5455f0e8f4574d70ab1e2af..c4290d7072e517abd2276e7369663b3685af0e61 100644 --- a/gso/workflows/site/create_site.py +++ b/gso/workflows/site/create_site.py @@ -21,8 +21,7 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: class CreateSiteForm(FormPage, BaseSiteValidatorModel): model_config = ConfigDict(title=product_name) - partner: ReadOnlyField("GEANT", default_type=str) - + partner: ReadOnlyField("GEANT", default_type=str) # type: ignore[valid-type] user_input = yield CreateSiteForm diff --git a/gso/workflows/site/modify_site.py b/gso/workflows/site/modify_site.py index 079d4647344d5c85a4038836c78d22b5199b2b88..40286044bb73d8c4f3f6bd69b2fef0f0eaad92d7 100644 --- a/gso/workflows/site/modify_site.py +++ b/gso/workflows/site/modify_site.py @@ -26,15 +26,15 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: class ModifySiteForm(FormPage): model_config = ConfigDict(title="Modify Site") - site_name: ReadOnlyField(subscription.site.site_name, default_type=str) + site_name: ReadOnlyField(subscription.site.site_name, default_type=str) # type: ignore[valid-type] site_city: str = subscription.site.site_city - site_country: ReadOnlyField(subscription.site.site_country, default_type=str) - site_country_code: ReadOnlyField(subscription.site.site_country_code, default_type=str) + site_country: ReadOnlyField(subscription.site.site_country, default_type=str) # type: ignore[valid-type] + site_country_code: ReadOnlyField(subscription.site.site_country_code, default_type=str) # type: ignore[valid-type] site_latitude: LatitudeCoordinate = subscription.site.site_latitude site_longitude: LongitudeCoordinate = subscription.site.site_longitude site_bgp_community_id: int = subscription.site.site_bgp_community_id site_internal_id: int = subscription.site.site_internal_id - site_tier: ReadOnlyField(subscription.site.site_tier, default_type=SiteTier) + site_tier: ReadOnlyField(subscription.site.site_tier, default_type=SiteTier) # type: ignore[valid-type] site_ts_address: str | None = subscription.site.site_ts_address @field_validator("site_ts_address") @@ -42,19 +42,24 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: if site_ts_address and site_ts_address != subscription.site.site_ts_address: validate_site_fields_is_unique("site_ts_address", site_ts_address) validate_ipv4_or_ipv6(site_ts_address) + return site_ts_address @field_validator("site_internal_id") def validate_site_internal_id(cls, site_internal_id: int) -> int: if site_internal_id == subscription.site.site_internal_id: return site_internal_id - return validate_site_fields_is_unique("site_internal_id", site_internal_id) + + validate_site_fields_is_unique("site_internal_id", site_internal_id) + return site_internal_id @field_validator("site_bgp_community_id") def validate_site_bgp_community_id(cls, site_bgp_community_id: int) -> int: if site_bgp_community_id == subscription.site.site_bgp_community_id: return site_bgp_community_id - return validate_site_fields_is_unique("site_bgp_community_id", site_bgp_community_id) + + validate_site_fields_is_unique("site_bgp_community_id", site_bgp_community_id) + return site_bgp_community_id user_input = yield ModifySiteForm @@ -63,13 +68,13 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: @step("Modify subscription") def modify_site_subscription( - subscription: Site, - site_city: str, - site_latitude: LatitudeCoordinate, - site_longitude: LongitudeCoordinate, - site_bgp_community_id: int, - site_internal_id: int, - site_ts_address: str, + subscription: Site, + site_city: str, + site_latitude: LatitudeCoordinate, + site_longitude: LongitudeCoordinate, + site_bgp_community_id: int, + site_internal_id: int, + site_ts_address: str, ) -> State: """Update the subscription model in the service database.""" subscription.site.site_city = site_city @@ -95,11 +100,11 @@ def modify_site() -> StepList: * Update the subscription model in the service database """ return ( - init - >> store_process_subscription(Target.MODIFY) - >> unsync - >> modify_site_subscription - >> set_status(SubscriptionLifecycle.ACTIVE) - >> resync - >> done + init + >> store_process_subscription(Target.MODIFY) + >> unsync + >> modify_site_subscription + >> set_status(SubscriptionLifecycle.ACTIVE) + >> resync + >> done ) diff --git a/gso/workflows/tasks/import_iptrunk.py b/gso/workflows/tasks/import_iptrunk.py index 5e0e71a218b7e18ebb36cc7dff729ad96a6bee44..8763ca8e094c4da1f611e9d6b4ad02c1b8fb610a 100644 --- a/gso/workflows/tasks/import_iptrunk.py +++ b/gso/workflows/tasks/import_iptrunk.py @@ -143,7 +143,7 @@ def update_ipam_stub_for_subscription( @workflow( "Import iptrunk", initial_input_form=initial_input_form_generator, - target=Target.CREATE, + target=Target.SYSTEM, ) def import_iptrunk() -> StepList: """Import an IP trunk without provisioning it.""" diff --git a/gso/workflows/tasks/import_office_router.py b/gso/workflows/tasks/import_office_router.py index 2abb7ed57c4f6e972cb66c487d37844c79945160..6784d1c23cf81f17f727a114cbc98bdcc1b387f3 100644 --- a/gso/workflows/tasks/import_office_router.py +++ b/gso/workflows/tasks/import_office_router.py @@ -1,6 +1,5 @@ """A creation workflow that adds existing office routers to the coreDB.""" - from orchestrator import workflow from orchestrator.forms import FormPage from orchestrator.targets import Target @@ -15,7 +14,7 @@ from gso.products.product_types.office_router import OfficeRouterInactive from gso.services import subscriptions from gso.services.partners import get_partner_by_name from gso.services.subscriptions import get_site_by_name -from gso.utils.shared_enums import FancyIPV4Address, FancyIPV6Address, PortNumber, Vendor +from gso.utils.shared_enums import IPV4AddressType, IPV6AddressType, PortNumber, Vendor @step("Create subscription") @@ -41,8 +40,8 @@ def initial_input_form_generator() -> FormGenerator: office_router_site: str office_router_fqdn: str office_router_ts_port: PortNumber - office_router_lo_ipv4_address: FancyIPV4Address - office_router_lo_ipv6_address: FancyIPV6Address + office_router_lo_ipv4_address: IPV4AddressType + office_router_lo_ipv6_address: IPV6AddressType user_input = yield ImportOfficeRouter @@ -55,8 +54,8 @@ def initialize_subscription( office_router_fqdn: str, office_router_ts_port: PortNumber, office_router_site: str, - office_router_lo_ipv4_address: FancyIPV4Address | None = None, - office_router_lo_ipv6_address: FancyIPV6Address | None = None, + office_router_lo_ipv4_address: IPV4AddressType | None = None, + office_router_lo_ipv6_address: IPV6AddressType | None = None, ) -> State: """Initialise the office router subscription using input data.""" subscription.office_router.office_router_ts_port = office_router_ts_port diff --git a/gso/workflows/tasks/import_router.py b/gso/workflows/tasks/import_router.py index 1a8e67ee06fc19c58d45391b207f404cee9ccfa8..656536925151bfaee8d199d15ac5ffa4759ebd52 100644 --- a/gso/workflows/tasks/import_router.py +++ b/gso/workflows/tasks/import_router.py @@ -1,6 +1,5 @@ """A creation workflow that adds an existing router to the service database.""" - from orchestrator import workflow from orchestrator.forms import FormPage from orchestrator.targets import Target @@ -18,7 +17,7 @@ from gso.services import subscriptions from gso.services.partners import get_partner_by_name from gso.services.subscriptions import get_site_by_name from gso.utils.helpers import generate_fqdn -from gso.utils.shared_enums import FancyIPV4Address, FancyIPV6Address, PortNumber, Vendor +from gso.utils.shared_enums import IPV4AddressType, IPV6AddressType, PortNumber, Vendor @step("Create subscription") @@ -46,8 +45,8 @@ def initial_input_form_generator() -> FormGenerator: ts_port: int router_vendor: Vendor router_role: RouterRole - router_lo_ipv4_address: FancyIPV4Address - router_lo_ipv6_address: FancyIPV6Address + router_lo_ipv4_address: IPV4AddressType + router_lo_ipv6_address: IPV6AddressType router_lo_iso_address: str user_input = yield ImportRouter @@ -63,8 +62,8 @@ def initialize_subscription( router_site: str, router_role: router_pb.RouterRole, router_vendor: Vendor, - router_lo_ipv4_address: FancyIPV4Address | None = None, - router_lo_ipv6_address: FancyIPV6Address | None = None, + router_lo_ipv4_address: IPV4AddressType | None = None, + router_lo_ipv6_address: IPV6AddressType | None = None, router_lo_iso_address: str | None = None, ) -> State: """Initialise the router subscription using input data.""" @@ -89,7 +88,7 @@ def initialize_subscription( @workflow( "Import router", initial_input_form=initial_input_form_generator, - target=Target.CREATE, + target=Target.SYSTEM, ) def import_router() -> StepList: """Import a router without provisioning it.""" diff --git a/gso/workflows/tasks/import_site.py b/gso/workflows/tasks/import_site.py index b2fa07a9a80efef5bdc7e8c2775cab4350760d26..f4d256279bb4e3d99332fe372e3c71d900aab23a 100644 --- a/gso/workflows/tasks/import_site.py +++ b/gso/workflows/tasks/import_site.py @@ -57,11 +57,11 @@ def generate_initial_input_form() -> FormGenerator: @workflow( "Import Site", - target=Target.CREATE, + target=Target.SYSTEM, initial_input_form=generate_initial_input_form, ) def import_site() -> StepList: - """Workflow to import a site without provisioning it.""" + """Import a site without provisioning it.""" return ( init >> create_subscription diff --git a/gso/workflows/tasks/import_super_pop_switch.py b/gso/workflows/tasks/import_super_pop_switch.py index 58e9ac1af903755aef7bd42b8edbc3a5e9843fb6..cd7fdc24379b110561dbdd97c9c8760af93e4010 100644 --- a/gso/workflows/tasks/import_super_pop_switch.py +++ b/gso/workflows/tasks/import_super_pop_switch.py @@ -1,6 +1,5 @@ """A creation workflow that adds existing Super PoP switches to the coreDB.""" - from orchestrator import workflow from orchestrator.forms import FormPage from orchestrator.targets import Target @@ -16,7 +15,7 @@ from gso.services import subscriptions from gso.services.partners import get_partner_by_name from gso.services.subscriptions import get_site_by_name from gso.utils.helpers import generate_fqdn -from gso.utils.shared_enums import FancyIPV4Address, PortNumber, Vendor +from gso.utils.shared_enums import IPV4AddressType, PortNumber, Vendor @step("Create subscription") @@ -42,7 +41,7 @@ def initial_input_form_generator() -> FormGenerator: super_pop_switch_site: str hostname: str super_pop_switch_ts_port: PortNumber - super_pop_switch_mgmt_ipv4_address: FancyIPV4Address + super_pop_switch_mgmt_ipv4_address: IPV4AddressType user_input = yield ImportSuperPopSwitch @@ -55,7 +54,7 @@ def initialize_subscription( hostname: str, super_pop_switch_ts_port: PortNumber, super_pop_switch_site: str, - super_pop_switch_mgmt_ipv4_address: FancyIPV4Address | None = None, + super_pop_switch_mgmt_ipv4_address: IPV4AddressType | None = None, ) -> State: """Initialise the Super PoP switch subscription using input data.""" subscription.super_pop_switch.super_pop_switch_ts_port = super_pop_switch_ts_port diff --git a/test/api/test_imports.py b/test/api/test_imports.py index 45cd62a869d7ff726d164e4a9a2368090fcd7e55..cf413f2921c7caf304f032ffd94bd5e00d3d6ed0 100644 --- a/test/api/test_imports.py +++ b/test/api/test_imports.py @@ -1,422 +1,404 @@ -# from unittest.mock import patch -# from uuid import uuid4 -# -# import pytest -# from orchestrator.db import SubscriptionTable -# from orchestrator.services import subscriptions -# -# from gso.products.product_blocks.iptrunk import IptrunkType, PhysicalPortCapacity -# from gso.products.product_blocks.router import RouterRole -# from gso.products.product_blocks.site import SiteTier -# from gso.utils.helpers import iso_from_ipv4 -# from gso.utils.shared_enums import Vendor -# -# SITE_IMPORT_ENDPOINT = "/api/v1/imports/sites" -# ROUTER_IMPORT_ENDPOINT = "/api/v1/imports/routers" -# IPTRUNK_IMPORT_API_URL = "/api/v1/imports/iptrunks" -# SUPER_POP_SWITCH_IMPORT_API_URL = "/api/v1/imports/super-pop-switches" -# OFFICE_ROUTER_IMPORT_API_URL = "/api/v1/imports/office-routers" -# -# -# @pytest.fixture() -# def iptrunk_data(nokia_router_subscription_factory, faker): -# router_side_a = nokia_router_subscription_factory() -# router_side_b = nokia_router_subscription_factory() -# return { -# "partner": "GEANT", -# "geant_s_sid": faker.geant_sid(), -# "iptrunk_type": IptrunkType.DARK_FIBER, -# "iptrunk_description": faker.sentence(), -# "iptrunk_speed": PhysicalPortCapacity.HUNDRED_GIGABIT_PER_SECOND, -# "iptrunk_minimum_links": 5, -# "iptrunk_isis_metric": 500, -# "side_a_node_id": router_side_a, -# "side_a_ae_iface": faker.network_interface(), -# "side_a_ae_geant_a_sid": faker.geant_sid(), -# "side_a_ae_members": [ -# { -# "interface_name": faker.network_interface(), -# "interface_description": faker.sentence(), -# } -# for _ in range(5) -# ], -# "side_b_node_id": router_side_b, -# "side_b_ae_iface": faker.network_interface(), -# "side_b_ae_geant_a_sid": faker.geant_sid(), -# "side_b_ae_members": [ -# { -# "interface_name": faker.network_interface(), -# "interface_description": faker.sentence(), -# } -# for _ in range(5) -# ], -# "iptrunk_ipv4_network": str(faker.ipv4(network=True)), -# "iptrunk_ipv6_network": str(faker.ipv6(network=True)), -# } -# -# -# @pytest.fixture() -# def mock_routers(iptrunk_data): -# with patch("gso.services.subscriptions.get_active_router_subscriptions") as mock_get_active_router_subscriptions: -# -# def _active_router_subscriptions(*args, **kwargs): -# if kwargs["includes"] == ["subscription_id", "description"]: -# return [ -# { -# "subscription_id": iptrunk_data["side_a_node_id"], -# "description": "iptrunk_sideA_node_id description", -# }, -# { -# "subscription_id": iptrunk_data["side_b_node_id"], -# "description": "iptrunk_sideB_node_id description", -# }, -# { -# "subscription_id": str(uuid4()), -# "description": "random description", -# }, -# ] -# return [ -# {"subscription_id": iptrunk_data["side_a_node_id"]}, -# {"subscription_id": iptrunk_data["side_b_node_id"]}, -# {"subscription_id": str(uuid4())}, -# ] -# -# mock_get_active_router_subscriptions.side_effect = _active_router_subscriptions -# yield mock_get_active_router_subscriptions -# -# -# @patch("gso.api.v1.imports._start_process") -# def test_import_iptrunk_successful_with_mocked_process(mock_start_process, test_client, mock_routers, iptrunk_data): -# mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000" -# response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) -# -# assert response.status_code == 201 -# assert response.json()["pid"] == "123e4567-e89b-12d3-a456-426655440000" -# -# -# @pytest.fixture() -# def site_data(faker): -# return { -# "site_name": faker.site_name(), -# "site_city": faker.city(), -# "site_country": faker.country(), -# "site_country_code": faker.country_code(), -# "site_latitude": float(faker.latitude()), -# "site_longitude": float(faker.longitude()), -# "site_bgp_community_id": faker.pyint(), -# "site_internal_id": faker.pyint(), -# "site_tier": SiteTier.TIER1, -# "site_ts_address": faker.ipv4(), -# "partner": "GEANT", -# } -# -# -# @pytest.fixture() -# def router_data(faker, site_data): -# mock_ipv4 = faker.ipv4() -# return { -# "hostname": "127.0.0.1", -# "router_role": RouterRole.PE, -# "router_vendor": Vendor.JUNIPER, -# "router_site": site_data["site_name"], -# "ts_port": 1234, -# "partner": "GEANT", -# "router_lo_ipv4_address": mock_ipv4, -# "router_lo_ipv6_address": faker.ipv6(), -# "router_lo_iso_address": iso_from_ipv4(mock_ipv4), -# } -# -# -# @pytest.fixture() -# def super_pop_switch_data(faker, site_data): -# mock_ipv4 = faker.ipv4() -# return { -# "hostname": "127.0.0.1", -# "super_pop_switch_site": site_data["site_name"], -# "super_pop_switch_ts_port": 1234, -# "partner": "GEANT", -# "super_pop_switch_mgmt_ipv4_address": mock_ipv4, -# } -# -# -# @pytest.fixture() -# def office_router_data(faker, site_data): -# return { -# "office_router_fqdn": "127.0.0.1", -# "office_router_site": site_data["site_name"], -# "office_router_ts_port": 1234, -# "partner": "GEANT", -# "office_router_lo_ipv4_address": faker.ipv4(), -# "office_router_lo_ipv6_address": faker.ipv6(), -# } -# -# -# def test_import_site_endpoint(test_client, site_data): -# assert SubscriptionTable.query.all() == [] -# # Post data to the endpoint -# response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) -# assert response.status_code == 201 -# assert "detail" in response.json() -# assert "pid" in response.json() -# subscription = subscriptions.retrieve_subscription_by_subscription_instance_value( -# resource_type="site_name", -# value=site_data["site_name"], -# ) -# assert subscription is not None -# -# -# def test_import_site_endpoint_with_existing_site(test_client, site_data): -# response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) -# assert SubscriptionTable.query.count() == 1 -# assert response.status_code == 201 -# -# response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) -# assert response.status_code == 422 -# assert SubscriptionTable.query.count() == 1 -# -# -# def test_import_site_endpoint_with_invalid_data(test_client, site_data): -# # invalid data, missing site_latitude and invalid site_longitude -# site_data.pop("site_latitude") -# site_data["site_longitude"] = "invalid" -# assert SubscriptionTable.query.count() == 0 -# response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) -# assert response.status_code == 422 -# assert SubscriptionTable.query.count() == 0 -# response = response.json() -# assert response["detail"][0]["loc"] == ["body", "site_latitude"] -# assert response["detail"][0]["msg"] == "field required" -# assert response["detail"][1]["loc"] == ["body", "site_longitude"] -# assert response["detail"][1]["msg"] == "value is not a valid float" -# -# -# def test_import_router_endpoint(test_client, site_data, router_data): -# # Create a site first -# response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) -# assert response.status_code == 201 -# assert SubscriptionTable.query.count() == 1 -# -# response = test_client.post(ROUTER_IMPORT_ENDPOINT, json=router_data) -# assert response.status_code == 201 -# assert SubscriptionTable.query.count() == 2 -# -# -# def test_import_router_endpoint_with_invalid_data(test_client, site_data, router_data): -# response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) -# assert response.status_code == 201 -# assert SubscriptionTable.query.count() == 1 -# -# # invalid data, missing hostname and invalid router_lo_ipv6_address -# router_data.pop("hostname") -# router_data["router_lo_ipv6_address"] = "invalid" -# response = test_client.post(ROUTER_IMPORT_ENDPOINT, json=router_data) -# assert response.status_code == 422 -# assert SubscriptionTable.query.count() == 1 -# response = response.json() -# assert response["detail"][0]["loc"] == ["body", "hostname"] -# assert response["detail"][0]["msg"] == "field required" -# assert response["detail"][1]["loc"] == ["body", "router_lo_ipv6_address"] -# assert response["detail"][1]["msg"] == "value is not a valid IPv6 address" -# -# -# def test_import_iptrunk_successful_with_real_process(test_client, mock_routers, iptrunk_data): -# response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) -# assert response.status_code == 201 -# -# response = response.json() -# assert "detail" in response -# assert "pid" in response -# -# subscription = subscriptions.retrieve_subscription_by_subscription_instance_value( -# resource_type="geant_s_sid", -# value=iptrunk_data["geant_s_sid"], -# ) -# assert subscription is not None -# -# -# @patch("gso.api.v1.imports._start_process") -# def test_import_iptrunk_invalid_partner(mock_start_process, test_client, mock_routers, iptrunk_data): -# iptrunk_data["partner"] = "not_existing_partner" -# mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000" -# response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) -# -# assert response.status_code == 422 -# assert response.json() == { -# "detail": [ -# { -# "loc": ["body", "partner"], -# "msg": "partner not_existing_partner not found", -# "type": "value_error", -# }, -# ], -# } -# -# -# @patch("gso.api.v1.imports._start_process") -# def test_import_iptrunk_invalid_router_id_side_a_and_b(mock_start_process, test_client, iptrunk_data): -# iptrunk_data["side_a_node_id"] = "NOT FOUND" -# iptrunk_data["side_b_node_id"] = "NOT FOUND" -# -# mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000" -# response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) -# -# assert response.status_code == 422 -# assert response.json() == { -# "detail": [ -# { -# "loc": ["body", "side_a_node_id"], -# "msg": f"Router {iptrunk_data['side_a_node_id']} not found", -# "type": "value_error", -# }, -# { -# "loc": ["body", "side_b_node_id"], -# "msg": f"Router {iptrunk_data['side_b_node_id']} not found", -# "type": "value_error", -# }, -# ], -# } -# -# -# @patch("gso.api.v1.imports._start_process") -# def test_import_iptrunk_non_unique_members_side_a(mock_start_process, test_client, mock_routers, iptrunk_data, faker): -# mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000" -# -# repeat_interface_a = { -# "interface_name": faker.network_interface(), -# "interface_description": faker.sentence(), -# } -# repeat_interface_b = { -# "interface_name": faker.network_interface(), -# "interface_description": faker.sentence(), -# } -# iptrunk_data["side_a_ae_members"] = [repeat_interface_a for _ in range(5)] -# iptrunk_data["side_b_ae_members"] = [repeat_interface_b for _ in range(5)] -# -# response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) -# -# assert response.status_code == 422 -# assert response.json() == { -# "detail": [ -# { -# "loc": ["body", "side_a_ae_members"], -# "msg": "Items must be unique", -# "type": "value_error", -# }, -# { -# "loc": ["body", "side_b_ae_members"], -# "msg": "Items must be unique", -# "type": "value_error", -# }, -# { -# "loc": ["body", "__root__"], -# "msg": "Side A members should be at least 5 (iptrunk_minimum_links)", -# "type": "value_error", -# }, -# ], -# } -# -# -# @patch("gso.api.v1.imports._start_process") -# def test_import_iptrunk_fails_on_side_a_member_count_mismatch( -# mock_start_process, -# test_client, -# mock_routers, -# iptrunk_data, -# ): -# mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000" -# -# iptrunk_data["side_a_ae_members"].remove(iptrunk_data["side_a_ae_members"][0]) -# -# response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) -# -# assert response.status_code == 422 -# assert response.json() == { -# "detail": [ -# { -# "loc": ["body", "__root__"], -# "msg": "Side A members should be at least 5 (iptrunk_minimum_links)", -# "type": "value_error", -# }, -# ], -# } -# -# -# @patch("gso.api.v1.imports._start_process") -# def test_import_iptrunk_fails_on_side_a_and_b_members_mismatch( -# mock_start_process, -# test_client, -# iptrunk_data, -# mock_routers, -# ): -# mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000" -# -# iptrunk_data["side_b_ae_members"].remove(iptrunk_data["side_b_ae_members"][0]) -# -# response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) -# -# assert response.status_code == 422 -# assert response.json() == { -# "detail": [ -# { -# "loc": ["body", "__root__"], -# "msg": "Mismatch between Side A and B members", -# "type": "value_error", -# }, -# ], -# } -# -# -# def test_import_super_pop_switch_endpoint(test_client, site_data, super_pop_switch_data): -# response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) -# assert response.status_code == 201 -# assert SubscriptionTable.query.count() == 1 -# -# response = test_client.post(SUPER_POP_SWITCH_IMPORT_API_URL, json=super_pop_switch_data) -# assert response.status_code == 201 -# assert SubscriptionTable.query.count() == 2 -# -# -# def test_import_super_pop_switch_endpoint_with_invalid_data(test_client, site_data, super_pop_switch_data): -# response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) -# assert response.status_code == 201 -# assert SubscriptionTable.query.count() == 1 -# -# # invalid data, missing hostname and invalid mgmt_ipv4_address -# super_pop_switch_data.pop("hostname") -# super_pop_switch_data["super_pop_switch_mgmt_ipv4_address"] = "invalid" -# response = test_client.post(SUPER_POP_SWITCH_IMPORT_API_URL, json=super_pop_switch_data) -# assert response.status_code == 422 -# assert SubscriptionTable.query.count() == 1 -# response = response.json() -# assert response["detail"][0]["loc"] == ["body", "hostname"] -# assert response["detail"][0]["msg"] == "field required" -# assert response["detail"][1]["loc"] == ["body", "super_pop_switch_mgmt_ipv4_address"] -# assert response["detail"][1]["msg"] == "value is not a valid IPv4 address" -# -# -# def test_import_office_router_endpoint(test_client, site_data, office_router_data): -# response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) -# assert response.status_code == 201 -# assert SubscriptionTable.query.count() == 1 -# -# response = test_client.post(OFFICE_ROUTER_IMPORT_API_URL, json=office_router_data) -# assert response.status_code == 201 -# assert SubscriptionTable.query.count() == 2 -# -# -# def test_import_office_router_endpoint_with_invalid_data(test_client, site_data, office_router_data): -# response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) -# assert response.status_code == 201 -# assert SubscriptionTable.query.count() == 1 -# -# # invalid data, missing FQDN and invalid lo_ipv6_address -# office_router_data.pop("office_router_fqdn") -# office_router_data["office_router_lo_ipv6_address"] = "invalid" -# response = test_client.post(OFFICE_ROUTER_IMPORT_API_URL, json=office_router_data) -# assert response.status_code == 422 -# assert SubscriptionTable.query.count() == 1 -# response = response.json() -# assert response["detail"][0]["loc"] == ["body", "office_router_fqdn"] -# assert response["detail"][0]["msg"] == "field required" -# assert response["detail"][1]["loc"] == ["body", "office_router_lo_ipv6_address"] -# assert response["detail"][1]["msg"] == "value is not a valid IPv6 address" +from unittest.mock import patch +from uuid import uuid4 + +import pytest +from orchestrator.db import SubscriptionTable +from orchestrator.services import subscriptions + +from gso.products.product_blocks.iptrunk import IptrunkType, PhysicalPortCapacity +from gso.products.product_blocks.router import RouterRole +from gso.products.product_blocks.site import SiteTier +from gso.utils.helpers import iso_from_ipv4 +from gso.utils.shared_enums import Vendor + +SITE_IMPORT_ENDPOINT = "/api/v1/imports/sites" +ROUTER_IMPORT_ENDPOINT = "/api/v1/imports/routers" +IPTRUNK_IMPORT_API_URL = "/api/v1/imports/iptrunks" +SUPER_POP_SWITCH_IMPORT_API_URL = "/api/v1/imports/super-pop-switches" +OFFICE_ROUTER_IMPORT_API_URL = "/api/v1/imports/office-routers" + + +@pytest.fixture() +def iptrunk_data(nokia_router_subscription_factory, faker): + router_side_a = nokia_router_subscription_factory() + router_side_b = nokia_router_subscription_factory() + return { + "partner": "GEANT", + "geant_s_sid": faker.geant_sid(), + "iptrunk_type": IptrunkType.DARK_FIBER, + "iptrunk_description": faker.sentence(), + "iptrunk_speed": PhysicalPortCapacity.HUNDRED_GIGABIT_PER_SECOND, + "iptrunk_minimum_links": 5, + "iptrunk_isis_metric": 500, + "side_a_node_id": router_side_a, + "side_a_ae_iface": faker.network_interface(), + "side_a_ae_geant_a_sid": faker.geant_sid(), + "side_a_ae_members": [ + { + "interface_name": faker.network_interface(), + "interface_description": faker.sentence(), + } + for _ in range(5) + ], + "side_b_node_id": router_side_b, + "side_b_ae_iface": faker.network_interface(), + "side_b_ae_geant_a_sid": faker.geant_sid(), + "side_b_ae_members": [ + { + "interface_name": faker.network_interface(), + "interface_description": faker.sentence(), + } + for _ in range(5) + ], + "iptrunk_ipv4_network": str(faker.ipv4(network=True)), + "iptrunk_ipv6_network": str(faker.ipv6(network=True)), + } + + +@pytest.fixture() +def mock_routers(iptrunk_data): + with patch("gso.services.subscriptions.get_active_router_subscriptions") as mock_get_active_router_subscriptions: + + def _active_router_subscriptions(*args, **kwargs): + if kwargs["includes"] == ["subscription_id", "description"]: + return [ + { + "subscription_id": iptrunk_data["side_a_node_id"], + "description": "iptrunk_sideA_node_id description", + }, + { + "subscription_id": iptrunk_data["side_b_node_id"], + "description": "iptrunk_sideB_node_id description", + }, + { + "subscription_id": str(uuid4()), + "description": "random description", + }, + ] + return [ + {"subscription_id": iptrunk_data["side_a_node_id"]}, + {"subscription_id": iptrunk_data["side_b_node_id"]}, + {"subscription_id": str(uuid4())}, + ] + + mock_get_active_router_subscriptions.side_effect = _active_router_subscriptions + yield mock_get_active_router_subscriptions + + +@patch("gso.api.v1.imports._start_process") +def test_import_iptrunk_successful_with_mocked_process(mock_start_process, test_client, mock_routers, iptrunk_data): + mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000" + response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) + + assert response.status_code == 201 + assert response.json()["pid"] == "123e4567-e89b-12d3-a456-426655440000" + + +@pytest.fixture() +def site_data(faker): + return { + "site_name": faker.site_name(), + "site_city": faker.city(), + "site_country": faker.country(), + "site_country_code": faker.country_code(), + "site_latitude": float(faker.latitude()), + "site_longitude": float(faker.longitude()), + "site_bgp_community_id": faker.pyint(), + "site_internal_id": faker.pyint(), + "site_tier": SiteTier.TIER1, + "site_ts_address": faker.ipv4(), + "partner": "GEANT", + } + + +@pytest.fixture() +def router_data(faker, site_data): + mock_ipv4 = faker.ipv4() + return { + "hostname": "127.0.0.1", + "router_role": RouterRole.PE, + "router_vendor": Vendor.JUNIPER, + "router_site": site_data["site_name"], + "ts_port": 1234, + "partner": "GEANT", + "router_lo_ipv4_address": mock_ipv4, + "router_lo_ipv6_address": faker.ipv6(), + "router_lo_iso_address": iso_from_ipv4(mock_ipv4), + } + + +@pytest.fixture() +def super_pop_switch_data(faker, site_data): + mock_ipv4 = faker.ipv4() + return { + "hostname": "127.0.0.1", + "super_pop_switch_site": site_data["site_name"], + "super_pop_switch_ts_port": 1234, + "partner": "GEANT", + "super_pop_switch_mgmt_ipv4_address": mock_ipv4, + } + + +@pytest.fixture() +def office_router_data(faker, site_data): + return { + "office_router_fqdn": "127.0.0.1", + "office_router_site": site_data["site_name"], + "office_router_ts_port": 1234, + "partner": "GEANT", + "office_router_lo_ipv4_address": faker.ipv4(), + "office_router_lo_ipv6_address": faker.ipv6(), + } + + +def test_import_site_endpoint(test_client, site_data): + assert SubscriptionTable.query.all() == [] + # Post data to the endpoint + response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) + assert response.status_code == 201 + assert "detail" in response.json() + assert "pid" in response.json() + subscription = subscriptions.retrieve_subscription_by_subscription_instance_value( + resource_type="site_name", + value=site_data["site_name"], + ) + assert subscription is not None + + +def test_import_site_endpoint_with_existing_site(test_client, site_data): + response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) + assert SubscriptionTable.query.count() == 1 + assert response.status_code == 201 + + response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) + assert response.status_code == 422 + assert SubscriptionTable.query.count() == 1 + + +def test_import_site_endpoint_with_invalid_data(test_client, site_data): + # invalid data, missing site_latitude and invalid site_longitude + site_data.pop("site_latitude") + site_data["site_longitude"] = "invalid" + assert SubscriptionTable.query.count() == 0 + response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) + assert response.status_code == 422 + assert SubscriptionTable.query.count() == 0 + response = response.json() + assert response["detail"][0]["loc"] == ["body", "site_latitude"] + assert response["detail"][0]["msg"] == "Field required" + assert response["detail"][1]["loc"] == ["body", "site_longitude"] + assert response["detail"][1]["msg"] == "Input should be a valid number, unable to parse string as a number" + + +def test_import_router_endpoint(test_client, site_data, router_data): + # Create a site first + response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) + assert response.status_code == 201 + assert SubscriptionTable.query.count() == 1 + + response = test_client.post(ROUTER_IMPORT_ENDPOINT, json=router_data) + assert response.status_code == 201 + assert SubscriptionTable.query.count() == 2 + + +def test_import_router_endpoint_with_invalid_data(test_client, site_data, router_data): + response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) + assert response.status_code == 201 + assert SubscriptionTable.query.count() == 1 + + # invalid data, missing hostname and invalid router_lo_ipv6_address + router_data.pop("hostname") + router_data["router_lo_ipv6_address"] = "invalid" + response = test_client.post(ROUTER_IMPORT_ENDPOINT, json=router_data) + assert response.status_code == 422 + assert SubscriptionTable.query.count() == 1 + response = response.json() + assert response["detail"][0]["loc"] == ["body", "hostname"] + assert response["detail"][0]["msg"] == "Field required" + assert response["detail"][1]["loc"] == ["body", "router_lo_ipv6_address"] + assert response["detail"][1]["msg"] == "Input is not a valid IPv6 address" + + +def test_import_iptrunk_successful_with_real_process(test_client, mock_routers, iptrunk_data): + response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) + assert response.status_code == 201 + + response = response.json() + assert "detail" in response + assert "pid" in response + + subscription = subscriptions.retrieve_subscription_by_subscription_instance_value( + resource_type="geant_s_sid", + value=iptrunk_data["geant_s_sid"], + ) + assert subscription is not None + + +@patch("gso.api.v1.imports._start_process") +def test_import_iptrunk_invalid_partner(mock_start_process, test_client, mock_routers, iptrunk_data): + iptrunk_data["partner"] = "not_existing_partner" + mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000" + response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) + + assert response.status_code == 422 + assert response.json() == { + "detail": [ + { + "ctx": {"error": {}}, + "input": "not_existing_partner", + "loc": ["body", "partner"], + "msg": "Value error, partner not_existing_partner not found", + "type": "value_error", + "url": "https://errors.pydantic.dev/2.5/v/value_error", + } + ] + } + + +@patch("gso.api.v1.imports._start_process") +def test_import_iptrunk_invalid_router_id_side_a_and_b(mock_start_process, test_client, iptrunk_data): + iptrunk_data["side_a_node_id"] = "NOT FOUND" + iptrunk_data["side_b_node_id"] = "NOT FOUND" + + mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000" + response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) + + assert response.status_code == 422 + assert response.json() == { + "detail": [ + { + "ctx": {"error": {}}, + "input": "NOT FOUND", + "loc": ["body", "side_a_node_id"], + "msg": "Value error, Router NOT FOUND not found", + "type": "value_error", + "url": "https://errors.pydantic.dev/2.5/v/value_error", + }, + { + "ctx": {"error": {}}, + "input": "NOT FOUND", + "loc": ["body", "side_b_node_id"], + "msg": "Value error, Router NOT FOUND not found", + "type": "value_error", + "url": "https://errors.pydantic.dev/2.5/v/value_error", + }, + ] + } + + +@patch("gso.api.v1.imports._start_process") +def test_import_iptrunk_non_unique_members_side_a(mock_start_process, test_client, mock_routers, iptrunk_data, faker): + mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000" + + repeat_interface_a = { + "interface_name": faker.network_interface(), + "interface_description": faker.sentence(), + } + repeat_interface_b = { + "interface_name": faker.network_interface(), + "interface_description": faker.sentence(), + } + iptrunk_data["side_a_ae_members"] = [repeat_interface_a for _ in range(5)] + iptrunk_data["side_b_ae_members"] = [repeat_interface_b for _ in range(5)] + + response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) + + assert response.status_code == 422 + + for detail in response.json()["detail"]: + assert detail["msg"] == "Value error, Items must be unique" + + +@patch("gso.api.v1.imports._start_process") +def test_import_iptrunk_fails_on_side_a_member_count_mismatch( + mock_start_process, + test_client, + mock_routers, + iptrunk_data, +): + mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000" + + iptrunk_data["side_a_ae_members"].remove(iptrunk_data["side_a_ae_members"][0]) + + response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) + + assert response.status_code == 422 + for detail in response.json()["detail"]: + assert detail["msg"] == "Value error, Side A members should be at least 5 (iptrunk_minimum_links)" + + input_data = detail["input"] + assert input_data["iptrunk_minimum_links"] == 5, "The reported minimum links should be 5" + + +@patch("gso.api.v1.imports._start_process") +def test_import_iptrunk_fails_on_side_a_and_b_members_mismatch( + mock_start_process, + test_client, + iptrunk_data, + mock_routers, +): + mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000" + + iptrunk_data["side_b_ae_members"].remove(iptrunk_data["side_b_ae_members"][0]) + + response = test_client.post(IPTRUNK_IMPORT_API_URL, json=iptrunk_data) + + assert response.status_code == 422 + for detail in response.json()["detail"]: + assert detail["msg"] == "Value error, Mismatch between Side A and B members" + + +def test_import_super_pop_switch_endpoint(test_client, site_data, super_pop_switch_data): + response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) + assert response.status_code == 201 + assert SubscriptionTable.query.count() == 1 + + response = test_client.post(SUPER_POP_SWITCH_IMPORT_API_URL, json=super_pop_switch_data) + assert response.status_code == 201 + assert SubscriptionTable.query.count() == 2 + + +def test_import_super_pop_switch_endpoint_with_invalid_data(test_client, site_data, super_pop_switch_data): + response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) + assert response.status_code == 201 + assert SubscriptionTable.query.count() == 1 + + # invalid data, missing hostname and invalid mgmt_ipv4_address + super_pop_switch_data.pop("hostname") + super_pop_switch_data["super_pop_switch_mgmt_ipv4_address"] = "invalid" + response = test_client.post(SUPER_POP_SWITCH_IMPORT_API_URL, json=super_pop_switch_data) + assert response.status_code == 422 + assert SubscriptionTable.query.count() == 1 + response = response.json() + assert response["detail"][0]["loc"] == ["body", "hostname"] + assert response["detail"][0]["msg"] == "Field required" + assert response["detail"][1]["loc"] == ["body", "super_pop_switch_mgmt_ipv4_address"] + assert response["detail"][1]["msg"] == "Input is not a valid IPv4 address" + + +def test_import_office_router_endpoint(test_client, site_data, office_router_data): + response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) + assert response.status_code == 201 + assert SubscriptionTable.query.count() == 1 + + response = test_client.post(OFFICE_ROUTER_IMPORT_API_URL, json=office_router_data) + assert response.status_code == 201 + assert SubscriptionTable.query.count() == 2 + + +def test_import_office_router_endpoint_with_invalid_data(test_client, site_data, office_router_data): + response = test_client.post(SITE_IMPORT_ENDPOINT, json=site_data) + assert response.status_code == 201 + assert SubscriptionTable.query.count() == 1 + + # invalid data, missing FQDN and invalid lo_ipv6_address + office_router_data.pop("office_router_fqdn") + office_router_data["office_router_lo_ipv6_address"] = "invalid" + response = test_client.post(OFFICE_ROUTER_IMPORT_API_URL, json=office_router_data) + assert response.status_code == 422 + assert SubscriptionTable.query.count() == 1 + response = response.json() + assert response["detail"][0]["loc"] == ["body", "office_router_fqdn"] + assert response["detail"][0]["msg"] == "Field required" + assert response["detail"][1]["loc"] == ["body", "office_router_lo_ipv6_address"] + assert response["detail"][1]["msg"] == "Input is not a valid IPv6 address" diff --git a/test/api/test_processes.py b/test/api/test_processes.py index 4800d7f0876001d139f34b7e798870fda3c4537a..f56fe52640d587928531f5171712f98ca57f8f1e 100644 --- a/test/api/test_processes.py +++ b/test/api/test_processes.py @@ -13,7 +13,9 @@ from orchestrator.workflow import ProcessStatus @pytest.fixture() def create_process(test_workflow, nokia_router_subscription_factory): process_id = uuid4() - process = ProcessTable(process_id=process_id, workflow_id=test_workflow.workflow_id , last_status=ProcessStatus.SUSPENDED) + process = ProcessTable( + process_id=process_id, workflow_id=test_workflow.workflow_id, last_status=ProcessStatus.SUSPENDED + ) subscription = nokia_router_subscription_factory() process_subscription = ProcessSubscriptionTable(process_id=process_id, subscription_id=subscription) diff --git a/test/auth/test_oidc_policy_helper.py b/test/auth/test_oidc_policy_helper.py index b1259d5dacf9aae4f8f53d58647bb1d842319451..767e34423a5c4969d3a08fc4d5ee01f005fc0b40 100644 --- a/test/auth/test_oidc_policy_helper.py +++ b/test/auth/test_oidc_policy_helper.py @@ -57,7 +57,7 @@ def oidc_user(mock_openid_config): resource_server_id="resource_server", resource_server_secret="secret", # noqa: S106 ) - user.openid_config = OIDCConfig.parse_obj(mock_openid_config) + user.openid_config = OIDCConfig.model_validate(mock_openid_config) return user diff --git a/test/conftest.py b/test/conftest.py index bbca59a72bc725564a117301068be64750de27df..a5d4279839d452801453e7d2368e8b2513bf6c81 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -49,6 +49,7 @@ from test.fixtures import ( # noqa: F401 logging.getLogger("faker.factory").setLevel(logging.WARNING) + def pytest_collection_modifyitems(config, items): if bool(os.environ.get("SKIP_ALL_TESTS")): for item in items: @@ -268,6 +269,7 @@ def test_client(fastapi_app): def geant_partner(): return create_partner(PartnerCreate(name="GEANT-TEST", partner_type=PartnerType.GEANT, email="goat-test@geant.org")) + @pytest.fixture() def generic_resource_type_1(): rt = ResourceTypeTable(description="Resource Type one", resource_type="rt_1") @@ -426,18 +428,18 @@ def generic_product_block_type_3(generic_product_block_3): @pytest.fixture() def generic_product_type_1(generic_product_1, generic_product_block_type_1, generic_product_block_type_2): - GenericProductBlockOneInactive, GenericProductBlockOne = generic_product_block_type_1 - GenericProductBlockTwoInactive, GenericProductBlockTwo = generic_product_block_type_2 + generic_product_block_one_inactive, generic_product_block_one = generic_product_block_type_1 + generic_product_block_two_inactive, generic_product_block_two = generic_product_block_type_2 # Test Product domain models class GenericProductOneInactive(SubscriptionModel, is_base=True): - pb_1: GenericProductBlockOneInactive - pb_2: GenericProductBlockTwoInactive + pb_1: generic_product_block_one_inactive + pb_2: generic_product_block_two_inactive class GenericProductOne(GenericProductOneInactive, lifecycle=[SubscriptionLifecycle.ACTIVE]): - pb_1: GenericProductBlockOne - pb_2: GenericProductBlockTwo + pb_1: generic_product_block_one + pb_2: generic_product_block_two SUBSCRIPTION_MODEL_REGISTRY["Product 1"] = GenericProductOne @@ -448,13 +450,13 @@ def generic_product_type_1(generic_product_1, generic_product_block_type_1, gene @pytest.fixture() def generic_product_type_2(generic_product_2, generic_product_block_type_3): - GenericProductBlockThreeInactive, GenericProductBlockThree = generic_product_block_type_3 + generic_product_block_three_inactive, generic_product_block_three = generic_product_block_type_3 class GenericProductTwoInactive(SubscriptionModel, is_base=True): - pb_3: GenericProductBlockThreeInactive + pb_3: generic_product_block_three_inactive class GenericProductTwo(GenericProductTwoInactive, lifecycle=[SubscriptionLifecycle.ACTIVE]): - pb_3: GenericProductBlockThree + pb_3: generic_product_block_three SUBSCRIPTION_MODEL_REGISTRY["Product 2"] = GenericProductTwo @@ -466,14 +468,14 @@ def generic_product_type_2(generic_product_2, generic_product_block_type_3): @pytest.fixture() def product_type_1_subscription_factory(generic_product_1, generic_product_type_1, geant_partner): def subscription_create( - description="Generic Subscription One", - start_date="2023-05-24T00:00:00+00:00", - rt_1="Value1", - rt_2=42, - rt_3="Value2", + description="Generic Subscription One", + start_date="2023-05-24T00:00:00+00:00", + rt_1="Value1", + rt_2=42, + rt_3="Value2", ): - GenericProductOneInactive, _ = generic_product_type_1 - gen_subscription = GenericProductOneInactive.from_product_id( + generic_product_one_inactive, _ = generic_product_type_1 + gen_subscription = generic_product_one_inactive.from_product_id( generic_product_1.product_id, customer_id=geant_partner["partner_id"], insync=True ) gen_subscription.pb_1.rt_1 = rt_1 @@ -501,7 +503,7 @@ def product_type_1_subscriptions_factory(product_type_1_subscription_factory): product_type_1_subscription_factory( description=f"Subscription {i}", start_date=( - datetime.datetime.fromisoformat("2023-05-24T00:00:00+00:00") + datetime.timedelta(days=i) + datetime.datetime.fromisoformat("2023-05-24T00:00:00+00:00") + datetime.timedelta(days=i) ).replace(tzinfo=datetime.UTC), ) for i in range(amount) diff --git a/test/fixtures.py b/test/fixtures.py index 698ddd4eca6826c418b4de7ea3e46e5320eecf71..96107fd646f0da31232c0460cb72d868c4e1a86a 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -36,20 +36,20 @@ from test.workflows import WorkflowInstanceForTests @pytest.fixture() def site_subscription_factory(faker, geant_partner): def subscription_create( - description=None, - start_date="2023-05-24T00:00:00+00:00", - site_name=None, - site_city=None, - site_country=None, - site_country_code=None, - site_latitude=None, - site_longitude=None, - site_bgp_community_id=None, - site_internal_id=None, - site_tier=SiteTier.TIER1, - site_ts_address=None, - status: SubscriptionLifecycle | None = None, - partner: dict | None = None, + description=None, + start_date="2023-05-24T00:00:00+00:00", + site_name=None, + site_city=None, + site_country=None, + site_country_code=None, + site_latitude=None, + site_longitude=None, + site_bgp_community_id=None, + site_internal_id=None, + site_tier=SiteTier.TIER1, + site_ts_address=None, + status: SubscriptionLifecycle | None = None, + partner: dict | None = None, ) -> UUIDstr: if partner is None: partner = geant_partner @@ -95,18 +95,18 @@ def site_subscription_factory(faker, geant_partner): @pytest.fixture() def nokia_router_subscription_factory(site_subscription_factory, faker, geant_partner): def subscription_create( - description=None, - start_date="2023-05-24T00:00:00+00:00", - router_fqdn=None, - router_ts_port=None, - router_access_via_ts=None, - router_lo_ipv4_address=None, - router_lo_ipv6_address=None, - router_lo_iso_address=None, - router_role=RouterRole.PE, - router_site=None, - status: SubscriptionLifecycle | None = None, - partner: dict | None = None, + description=None, + start_date="2023-05-24T00:00:00+00:00", + router_fqdn=None, + router_ts_port=None, + router_access_via_ts=None, + router_lo_ipv4_address=None, + router_lo_ipv6_address=None, + router_lo_iso_address=None, + router_role=RouterRole.PE, + router_site=None, + status: SubscriptionLifecycle | None = None, + partner: dict | None = None, ) -> UUIDstr: if partner is None: partner = geant_partner @@ -150,18 +150,18 @@ def nokia_router_subscription_factory(site_subscription_factory, faker, geant_pa @pytest.fixture() def juniper_router_subscription_factory(site_subscription_factory, faker, geant_partner): def subscription_create( - description=None, - start_date="2023-05-24T00:00:00+00:00", - router_fqdn=None, - router_ts_port=None, - router_access_via_ts=None, - router_lo_ipv4_address=None, - router_lo_ipv6_address=None, - router_lo_iso_address=None, - router_role=RouterRole.PE, - router_site=None, - status: SubscriptionLifecycle | None = None, - partner: dict | None = None, + description=None, + start_date="2023-05-24T00:00:00+00:00", + router_fqdn=None, + router_ts_port=None, + router_access_via_ts=None, + router_lo_ipv4_address=None, + router_lo_ipv6_address=None, + router_lo_iso_address=None, + router_role=RouterRole.PE, + router_site=None, + status: SubscriptionLifecycle | None = None, + partner: dict | None = None, ) -> UUIDstr: if partner is None: partner = geant_partner @@ -206,11 +206,11 @@ def juniper_router_subscription_factory(site_subscription_factory, faker, geant_ @pytest.fixture() def iptrunk_side_subscription_factory(nokia_router_subscription_factory, faker): def subscription_create( - iptrunk_side_node=None, - iptrunk_side_ae_iface=None, - iptrunk_side_ae_geant_a_sid=None, - iptrunk_side_ae_members=None, - iptrunk_side_ae_members_description=None, + iptrunk_side_node=None, + iptrunk_side_ae_iface=None, + iptrunk_side_ae_geant_a_sid=None, + iptrunk_side_ae_members=None, + iptrunk_side_ae_members_description=None, ) -> IptrunkSideBlock: iptrunk_side_node_id = iptrunk_side_node or nokia_router_subscription_factory() iptrunk_side_node = Router.from_subscription(iptrunk_side_node_id).router @@ -244,18 +244,18 @@ def iptrunk_side_subscription_factory(nokia_router_subscription_factory, faker): @pytest.fixture() def iptrunk_subscription_factory(iptrunk_side_subscription_factory, faker, geant_partner): def subscription_create( - description=None, - start_date="2023-05-24T00:00:00+00:00", - geant_s_sid=None, - iptrunk_description=None, - iptrunk_type=IptrunkType.DARK_FIBER, - iptrunk_speed=PhysicalPortCapacity.ONE_GIGABIT_PER_SECOND, - iptrunk_isis_metric=None, - iptrunk_ipv4_network=None, - iptrunk_ipv6_network=None, - iptrunk_sides=None, - status: SubscriptionLifecycle | None = None, - partner: dict | None = None, + description=None, + start_date="2023-05-24T00:00:00+00:00", + geant_s_sid=None, + iptrunk_description=None, + iptrunk_type=IptrunkType.DARK_FIBER, + iptrunk_speed=PhysicalPortCapacity.ONE_GIGABIT_PER_SECOND, + iptrunk_isis_metric=None, + iptrunk_ipv4_network=None, + iptrunk_ipv6_network=None, + iptrunk_sides=None, + status: SubscriptionLifecycle | None = None, + partner: dict | None = None, ) -> UUIDstr: if partner is None: partner = geant_partner @@ -307,15 +307,15 @@ def iptrunk_subscription_factory(iptrunk_side_subscription_factory, faker, geant @pytest.fixture() def office_router_subscription_factory(site_subscription_factory, faker, geant_partner): def subscription_create( - description=None, - start_date="2023-05-24T00:00:00+00:00", - office_router_fqdn=None, - office_router_ts_port=None, - office_router_lo_ipv4_address=None, - office_router_lo_ipv6_address=None, - office_router_site=None, - status: SubscriptionLifecycle | None = None, - partner: dict | None = None, + description=None, + start_date="2023-05-24T00:00:00+00:00", + office_router_fqdn=None, + office_router_ts_port=None, + office_router_lo_ipv4_address=None, + office_router_lo_ipv6_address=None, + office_router_site=None, + status: SubscriptionLifecycle | None = None, + partner: dict | None = None, ) -> UUIDstr: if partner is None: partner = geant_partner @@ -358,14 +358,14 @@ def office_router_subscription_factory(site_subscription_factory, faker, geant_p @pytest.fixture() def super_pop_switch_subscription_factory(site_subscription_factory, faker, geant_partner): def subscription_create( - description=None, - start_date="2023-05-24T00:00:00+00:00", - super_pop_switch_fqdn=None, - super_pop_switch_ts_port=None, - super_pop_switch_mgmt_ipv4_address=None, - super_pop_switch_site=None, - status: SubscriptionLifecycle | None = None, - partner: dict | None = None, + description=None, + start_date="2023-05-24T00:00:00+00:00", + super_pop_switch_fqdn=None, + super_pop_switch_ts_port=None, + super_pop_switch_mgmt_ipv4_address=None, + super_pop_switch_site=None, + status: SubscriptionLifecycle | None = None, + partner: dict | None = None, ) -> UUIDstr: if partner is None: partner = geant_partner @@ -406,13 +406,14 @@ def super_pop_switch_subscription_factory(site_subscription_factory, faker, gean return subscription_create + @pytest.fixture() def test_workflow(generic_subscription_1: UUIDstr, generic_product_type_1) -> Generator: - _, GenericProductOne = generic_product_type_1 + _, generic_product_one = generic_product_type_1 @step("Insert UUID in state") def insert_object(): - return {"subscription_id": str(uuid4()), "model": GenericProductOne.from_subscription(generic_subscription_1)} + return {"subscription_id": str(uuid4()), "model": generic_product_one.from_subscription(generic_subscription_1)} @step("Test that it is a string now") def check_object(subscription_id: Any, model: dict) -> None: diff --git a/test/services/test_librenms_client.py b/test/services/test_librenms_client.py index 55df5ce176329a66587b53c366988b325ecf49e0..0477f8e7722bc286356acbea080813e85c169947 100644 --- a/test/services/test_librenms_client.py +++ b/test/services/test_librenms_client.py @@ -168,9 +168,12 @@ def mock_get_device_misconfigured(faker): @pytest.fixture() def mock_get_device_unauthenticated(): - with patch("gso.services.librenms_client.requests.get") as mock_get_unauthorized, patch( - "gso.services.librenms_client.LibreNMSClient.get_device", - ) as mock_get_device: + with ( + patch("gso.services.librenms_client.requests.get") as mock_get_unauthorized, + patch( + "gso.services.librenms_client.LibreNMSClient.get_device", + ) as mock_get_device, + ): mock_get_unauthorized().status_code = HTTPStatus.UNAUTHORIZED mock_get_unauthorized().json.return_value = {"message": "Unauthenticated."} mock_get_device.side_effect = HTTPError( diff --git a/test/workflows/iptrunk/test_create_iptrunk.py b/test/workflows/iptrunk/test_create_iptrunk.py index 9c16c76711e971aa1885a0e98bd6ba3697657191..c16e0527fb5db87ffd5ff1d41ee27a54919325f7 100644 --- a/test/workflows/iptrunk/test_create_iptrunk.py +++ b/test/workflows/iptrunk/test_create_iptrunk.py @@ -133,12 +133,10 @@ def test_successful_iptrunk_creation_with_standard_lso_result( subscription_id = state["subscription_id"] subscription = Iptrunk.from_subscription(subscription_id) - sorted_sides = sorted( - [ - subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_site.site_name, - subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_site.site_name, - ] - ) + sorted_sides = sorted([ + subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_site.site_name, + subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_site.site_name, + ]) assert subscription.status == "provisioning" assert subscription.description == ( f"IP trunk {sorted_sides[0]} {sorted_sides[1]}, geant_s_sid:{input_form_wizard_data[0]['geant_s_sid']}"