diff --git a/Changelog.md b/Changelog.md index 5c127a11f1415a9ad7ff0ea604f3186fc50422d5..0a5cb25ea52402849e72bd032a0bec39f2ea7e2c 100644 --- a/Changelog.md +++ b/Changelog.md @@ -1,5 +1,10 @@ # Changelog +## [2.33] - 2025-01-23 +- fix a bug in resolve_customer when customer_id is None +- refactor get_partner_by_name to return object instead of dict +- refactor tests and helper methods + ## [2.32] - 2025-01-17 - Add environment variables for Python logging and UTF-8 support in Alpine image - add custom customer type and resolver to the graphq diff --git a/gso/cli/imports.py b/gso/cli/imports.py index d8f82908330a35f9602db3bf0fdbf3a58b741776..1fcb35105bcdc946d57019543dddfb025e96bad8 100644 --- a/gso/cli/imports.py +++ b/gso/cli/imports.py @@ -302,7 +302,7 @@ class L3CoreServiceImportModel(BaseModel): @field_validator("service_binding_ports") def validate_node(cls, value: list[ServiceBindingPort]) -> list[ServiceBindingPort]: """Check if the Service Binding Ports are valid.""" - edge_ports = [str(subscription["subscription_id"]) for subscription in get_active_edge_port_subscriptions()] + edge_ports = [str(subscription.subscription_id) for subscription in get_active_edge_port_subscriptions()] for sbp in value: if sbp.edge_port not in edge_ports: msg = f"Edge Port {sbp.edge_port} not found" diff --git a/gso/graphql_api/resolvers/customer.py b/gso/graphql_api/resolvers/customer.py index b8187a69ca00f4941ebaa104eab2a98880b74055..3cb36e3bdf268bf0d5a62163d0f58eaed9459591 100644 --- a/gso/graphql_api/resolvers/customer.py +++ b/gso/graphql_api/resolvers/customer.py @@ -6,12 +6,12 @@ from orchestrator.graphql.schemas.process import ProcessType from orchestrator.graphql.schemas.subscription import SubscriptionInterface from orchestrator.graphql.utils.override_class import override_class -from gso.services.partners import get_partner_by_id +from gso.services.partners import get_partner_by_id, get_partner_by_name async def resolve_customer(root: CustomerType) -> CustomerType: """Resolve the customer field for a subscription or process.""" - partner = get_partner_by_id(root.customer_id) + partner = get_partner_by_id(root.customer_id) if root.customer_id else get_partner_by_name("GEANT") return CustomerType( customer_id=partner.partner_id, diff --git a/gso/migrations/versions/2025-01-20_16eef776a258_add_l3_core_service_termination_workflow.py b/gso/migrations/versions/2025-01-20_16eef776a258_add_l3_core_service_termination_workflow.py new file mode 100644 index 0000000000000000000000000000000000000000..cc543fcea46ea407b3ddf14307a614d0f55b42e1 --- /dev/null +++ b/gso/migrations/versions/2025-01-20_16eef776a258_add_l3_core_service_termination_workflow.py @@ -0,0 +1,39 @@ +"""Add L3 Core Service termination workflow. + +Revision ID: 16eef776a258 +Revises: c56862da5ab1 +Create Date: 2025-01-20 14:28:31.781259 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = '16eef776a258' +down_revision = 'c56862da5ab1' +branch_labels = None +depends_on = None + + +from orchestrator.migrations.helpers import create_workflow, delete_workflow + +new_workflows = [ + { + "name": "terminate_l3_core_service", + "target": "TERMINATE", + "description": "Terminate Layer 3 Core Service", + "product_type": "L3CoreService" + } +] + + +def upgrade() -> None: + conn = op.get_bind() + for workflow in new_workflows: + create_workflow(conn, workflow) + + +def downgrade() -> None: + conn = op.get_bind() + for workflow in new_workflows: + delete_workflow(conn, workflow["name"]) diff --git a/gso/products/__init__.py b/gso/products/__init__.py index 9b2f12447ee440a504557fb6b4ada80aa5d47df9..7af2244220aa1b321ba57db01e2bcb1504a155c0 100644 --- a/gso/products/__init__.py +++ b/gso/products/__init__.py @@ -80,6 +80,10 @@ class ProductName(strEnum): """VRFs.""" +L3_CORE_SERVICE_PRODUCT_TYPE = L3CoreService.__name__ +L2_CORE_SERVICE_PRODUCT_TYPE = Layer2Circuit.__name__ + + class ProductType(strEnum): """An enumerator of available product types in GSO.""" @@ -102,19 +106,19 @@ class ProductType(strEnum): IMPORTED_OPENGEAR = Opengear.__name__ EDGE_PORT = EdgePort.__name__ IMPORTED_EDGE_PORT = ImportedEdgePort.__name__ - GEANT_IP = L3CoreService.__name__ + GEANT_IP = L3_CORE_SERVICE_PRODUCT_TYPE IMPORTED_GEANT_IP = ImportedL3CoreService.__name__ - IAS = L3CoreService.__name__ + IAS = L3_CORE_SERVICE_PRODUCT_TYPE IMPORTED_IAS = ImportedL3CoreService.__name__ - GWS = L3CoreService.__name__ + GWS = L3_CORE_SERVICE_PRODUCT_TYPE IMPORTED_GWS = ImportedL3CoreService.__name__ - LHCONE = L3CoreService.__name__ + LHCONE = L3_CORE_SERVICE_PRODUCT_TYPE IMPORTED_LHCONE = ImportedL3CoreService.__name__ - COPERNICUS = L3CoreService.__name__ + COPERNICUS = L3_CORE_SERVICE_PRODUCT_TYPE IMPORTED_COPERNICUS = ImportedL3CoreService.__name__ - GEANT_PLUS = Layer2Circuit.__name__ + GEANT_PLUS = L2_CORE_SERVICE_PRODUCT_TYPE IMPORTED_GEANT_PLUS = ImportedLayer2Circuit.__name__ - EXPRESSROUTE = Layer2Circuit.__name__ + EXPRESSROUTE = L2_CORE_SERVICE_PRODUCT_TYPE IMPORTED_EXPRESSROUTE = ImportedLayer2Circuit.__name__ VRF = VRF.__name__ diff --git a/gso/services/partners.py b/gso/services/partners.py index 160cde0969cfa6e6bad43794716655c2df7f58c7..0b66f778e57a4c03cb7129d47854f77cac7790d0 100644 --- a/gso/services/partners.py +++ b/gso/services/partners.py @@ -7,7 +7,6 @@ from uuid import uuid4 from orchestrator.db import db from pydantic import AfterValidator, BaseModel, ConfigDict, EmailStr, Field from sqlalchemy import func -from sqlalchemy.exc import NoResultFound from gso.db.models import PartnerTable @@ -59,20 +58,18 @@ class PartnerNotFoundError(Exception): """Exception raised when a partner is not found.""" -def get_all_partners() -> list[dict]: +def get_all_partners() -> list[PartnerTable]: """Fetch all partners from the database and serialize them to JSON.""" - partners = PartnerTable.query.all() - return [partner.__json__() for partner in partners] + return PartnerTable.query.all() -def get_partner_by_name(name: str) -> dict[str, Any]: +def get_partner_by_name(name: str) -> PartnerTable: """Try to get a partner by their name.""" - try: - partner = db.session.query(PartnerTable).filter(PartnerTable.name == name).one() - return partner.__json__() - except NoResultFound as e: - msg = f"partner {name} not found" - raise PartnerNotFoundError(msg) from e + partner = db.session.query(PartnerTable).filter_by(name=name).first() + if not partner: + raise PartnerNotFoundError + + return partner def get_partner_by_id(partner_id: str) -> PartnerTable: diff --git a/gso/services/subscriptions.py b/gso/services/subscriptions.py index 55823e5a6ee3b7b1b9dcf32f0e2a9cce1d2aa52c..e83f7831ccaf1ad5928f94a866c3e086c65bf033 100644 --- a/gso/services/subscriptions.py +++ b/gso/services/subscriptions.py @@ -227,11 +227,6 @@ def get_subscription_by_process_id(process_id: str) -> SubscriptionModel | None: return SubscriptionModel.from_subscription(subscription_table.subscription_id) if subscription_table else None -def get_insync_subscriptions() -> list[SubscriptionTable]: - """Retrieve all subscriptions that are currently in sync.""" - return SubscriptionTable.query.join(ProductTable).filter(SubscriptionTable.insync.is_(True)).all() - - def get_active_insync_subscriptions() -> list[SubscriptionTable]: """Retrieve all subscriptions that are currently active and in sync.""" return ( @@ -255,19 +250,24 @@ def get_active_site_subscriptions(includes: list[str] | None = None) -> list[Sub ) -def get_active_edge_port_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]: +def get_active_edge_port_subscriptions(partner_id: UUIDstr | None = None) -> list[SubscriptionModel]: """Retrieve active Edge Port subscriptions. Args: - includes: The fields to be included in the returned Subscription objects. + partner_id: The customer id of subscriptions. Returns: A list of Subscription objects for Edge Ports. """ - return get_subscriptions( - product_types=[ProductType.EDGE_PORT], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=includes + results = get_subscriptions( + product_types=[ProductType.EDGE_PORT], + lifecycles=[SubscriptionLifecycle.ACTIVE], + includes=["subscription_id"], + partner_id=partner_id, ) + return [SubscriptionModel.from_subscription(result["subscription_id"]) for result in results] + def get_site_by_name(site_name: str) -> Site: """Get a site by its name. diff --git a/gso/translations/en-GB.json b/gso/translations/en-GB.json index babde318eba5de8c3c72d9c7b42dcc73c3305804..1c6561b2a2efe8576aa5d6ae6ec2c3dd1d6a6230 100644 --- a/gso/translations/en-GB.json +++ b/gso/translations/en-GB.json @@ -70,6 +70,7 @@ "terminate_switch": "Terminate Switch", "terminate_edge_port": "Terminate Edge Port", "terminate_lan_switch_interconnect": "Terminate LAN Switch Interconnect", + "terminate_l3_core_service": "Terminate L3 Core Service", "redeploy_base_config": "Redeploy base config", "update_ibgp_mesh": "Update iBGP mesh", "create_imported_site": "NOT FOR HUMANS -- Import existing site", diff --git a/gso/utils/helpers.py b/gso/utils/helpers.py index 20d66cab639db87252278ad1cc70310740c65812..ca7abb8eb582405e94244f5158a46e7d84b7b1e9 100644 --- a/gso/utils/helpers.py +++ b/gso/utils/helpers.py @@ -13,10 +13,17 @@ from pydantic_forms.validators import Choice from gso import settings from gso.products.product_blocks.router import RouterRole from gso.products.product_types.router import Router -from gso.services import subscriptions from gso.services.netbox_client import NetboxClient from gso.services.partners import get_all_partners -from gso.services.subscriptions import is_virtual_circuit_id_available +from gso.services.subscriptions import ( + get_active_edge_port_subscriptions, + get_active_router_subscriptions, + get_active_site_subscriptions, + get_active_subscriptions_by_field_and_value, + get_active_switch_subscriptions, + get_router_subscriptions, + is_virtual_circuit_id_available, +) from gso.utils.shared_enums import Vendor from gso.utils.types.interfaces import PhysicalPortCapacity from gso.utils.types.ip_address import IPv4AddressType, IPv4NetworkType, IPv6NetworkType @@ -175,8 +182,7 @@ def generate_inventory_for_routers( else [SubscriptionLifecycle.ACTIVE] ) all_routers = [ - Router.from_subscription(r["subscription_id"]) - for r in subscriptions.get_router_subscriptions(lifecycles=lifecycles) + Router.from_subscription(r["subscription_id"]) for r in get_router_subscriptions(lifecycles=lifecycles) ] exclude_routers = exclude_routers or [] @@ -219,7 +225,7 @@ def active_site_selector() -> Choice: """Generate a dropdown selector for choosing an active site in an input form.""" site_subscriptions = { str(site["subscription_id"]): site["description"] - for site in subscriptions.get_active_site_subscriptions(includes=["subscription_id", "description"]) + for site in get_active_site_subscriptions(includes=["subscription_id", "description"]) } return Choice("Select a site", zip(site_subscriptions.keys(), site_subscriptions.items(), strict=True)) # type: ignore[arg-type] @@ -229,17 +235,20 @@ def active_router_selector() -> Choice: """Generate a dropdown selector for choosing an active Router in an input form.""" router_subscriptions = { str(router["subscription_id"]): router["description"] - for router in subscriptions.get_active_router_subscriptions(includes=["subscription_id", "description"]) + for router in get_active_router_subscriptions(includes=["subscription_id", "description"]) } return Choice("Select a router", zip(router_subscriptions.keys(), router_subscriptions.items(), strict=True)) # type: ignore[arg-type] -def active_pe_router_selector() -> Choice: +def active_pe_router_selector(excludes: list[UUIDstr] | None = None) -> Choice: """Generate a dropdown selector for choosing an active PE Router in an input form.""" + excludes = excludes or [] + routers = { str(router.subscription_id): router.description - for router in subscriptions.get_active_subscriptions_by_field_and_value("router_role", RouterRole.PE) + for router in get_active_subscriptions_by_field_and_value("router_role", RouterRole.PE) + if router.subscription_id not in excludes } return Choice("Select a router", zip(routers.keys(), routers.items(), strict=True)) # type: ignore[arg-type] @@ -249,7 +258,7 @@ def active_switch_selector() -> Choice: """Generate a dropdown selector for choosing an active Switch in an input form.""" switch_subscriptions = { str(switch["subscription_id"]): switch["description"] - for switch in subscriptions.get_active_switch_subscriptions(includes=["subscription_id", "description"]) + for switch in get_active_switch_subscriptions(includes=["subscription_id", "description"]) } return Choice("Select a switch", zip(switch_subscriptions.keys(), switch_subscriptions.items(), strict=True)) # type: ignore[arg-type] @@ -257,27 +266,19 @@ def active_switch_selector() -> Choice: def active_edge_port_selector(*, partner_id: UUIDstr | None = None) -> Choice: """Generate a dropdown selector for choosing an active Edge Port in an input form.""" - edge_port_subscriptions = subscriptions.get_active_edge_port_subscriptions( - includes=["subscription_id", "description", "customer_id"] - ) - - if partner_id: - # `partner_id` is set, so we will filter accordingly. - edge_port_subscriptions = list( - filter(lambda subscription: bool(subscription["customer_id"] == partner_id), edge_port_subscriptions) - ) + edge_ports = get_active_edge_port_subscriptions(partner_id=partner_id) - edge_ports = {str(port["subscription_id"]): port["description"] for port in edge_port_subscriptions} + options = {str(edge_port.subscription_id): edge_port.description for edge_port in edge_ports} return Choice( "Select an Edge Port", - zip(edge_ports.keys(), edge_ports.items(), strict=True), # type: ignore[arg-type] + zip(options.keys(), options.items(), strict=True), # type: ignore[arg-type] ) def partner_choice() -> Choice: """Return a Choice object containing a list of available partners.""" - partners = {partner["partner_id"]: partner["name"] for partner in get_all_partners()} + partners = {partner.partner_id: partner.name for partner in get_all_partners()} return Choice("Select a partner", zip(partners.values(), partners.items(), strict=True)) # type: ignore[arg-type] diff --git a/gso/workflows/__init__.py b/gso/workflows/__init__.py index f07e2d69ee98d6f4e72e02054ab5704abf71e1f9..92710b591759234a5239bf893b9909b84566448f 100644 --- a/gso/workflows/__init__.py +++ b/gso/workflows/__init__.py @@ -124,6 +124,7 @@ LazyWorkflowInstance("gso.workflows.l3_core_service.create_imported_l3_core_serv LazyWorkflowInstance("gso.workflows.l3_core_service.import_l3_core_service", "import_l3_core_service") LazyWorkflowInstance("gso.workflows.l3_core_service.migrate_l3_core_service", "migrate_l3_core_service") LazyWorkflowInstance("gso.workflows.l3_core_service.validate_l3_core_service", "validate_l3_core_service") +LazyWorkflowInstance("gso.workflows.l3_core_service.terminate_l3_core_service", "terminate_l3_core_service") # Layer 2 Circuit workflows LazyWorkflowInstance("gso.workflows.l2_circuit.create_layer_2_circuit", "create_layer_2_circuit") diff --git a/gso/workflows/edge_port/create_imported_edge_port.py b/gso/workflows/edge_port/create_imported_edge_port.py index 97e66fd5dd77b94a224bea076f094b0bc60a4912..ef5d2d033203e082c67a67f88eb511043d4e2a51 100644 --- a/gso/workflows/edge_port/create_imported_edge_port.py +++ b/gso/workflows/edge_port/create_imported_edge_port.py @@ -27,7 +27,7 @@ from gso.utils.types.interfaces import LAGMember, PhysicalPortCapacity @step("Create subscription") def create_subscription(partner: str) -> State: """Create a new subscription object.""" - partner_id = get_partner_by_name(partner)["partner_id"] + partner_id = get_partner_by_name(partner).partner_id product_id = get_product_id_by_name(ProductName.IMPORTED_EDGE_PORT) subscription = ImportedEdgePortInactive.from_product_id(product_id, partner_id) diff --git a/gso/workflows/iptrunk/create_imported_iptrunk.py b/gso/workflows/iptrunk/create_imported_iptrunk.py index 70ee7abe253b4158cd94f02a72ec50af594959a1..db1bdb6eb452a836eefa20216e5167e7c36baf0b 100644 --- a/gso/workflows/iptrunk/create_imported_iptrunk.py +++ b/gso/workflows/iptrunk/create_imported_iptrunk.py @@ -59,7 +59,7 @@ def initial_input_form_generator() -> FormGenerator: @step("Create a new subscription") def create_subscription(partner: str) -> State: """Create a new subscription in the service database.""" - partner_id = get_partner_by_name(partner)["partner_id"] + partner_id = get_partner_by_name(partner).partner_id product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_IP_TRUNK) subscription = ImportedIptrunkInactive.from_product_id(product_id, partner_id) diff --git a/gso/workflows/iptrunk/create_iptrunk.py b/gso/workflows/iptrunk/create_iptrunk.py index 8267bd09011465a8d09aedcf60e2895e97583123..8498d77b80d40383f32a160a21d9b36b9205f0dc 100644 --- a/gso/workflows/iptrunk/create_iptrunk.py +++ b/gso/workflows/iptrunk/create_iptrunk.py @@ -215,7 +215,7 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: @step("Create subscription") def create_subscription(product: UUIDstr, partner: str) -> State: """Create a new subscription object in the database.""" - subscription = IptrunkInactive.from_product_id(product, get_partner_by_name(partner)["partner_id"]) + subscription = IptrunkInactive.from_product_id(product, get_partner_by_name(partner).partner_id) return { "subscription": subscription, diff --git a/gso/workflows/l2_circuit/create_imported_layer_2_circuit.py b/gso/workflows/l2_circuit/create_imported_layer_2_circuit.py index baaa1e05868fd3e52cd16d170d8523a41eac41b2..9d7f0f7a7c04cf2a9cf92e689d9709f8efec2f24 100644 --- a/gso/workflows/l2_circuit/create_imported_layer_2_circuit.py +++ b/gso/workflows/l2_circuit/create_imported_layer_2_circuit.py @@ -83,7 +83,7 @@ def initial_input_form_generator() -> FormGenerator: @step("Create subscription") def create_subscription(partner: str, service_type: Layer2CircuitServiceType) -> State: """Create a new subscription object.""" - partner_id = get_partner_by_name(partner)["partner_id"] + partner_id = get_partner_by_name(partner).partner_id product_id = get_product_id_by_name(ProductName(service_type)) subscription = ImportedLayer2CircuitInactive.from_product_id(product_id, partner_id) diff --git a/gso/workflows/l2_circuit/create_layer_2_circuit.py b/gso/workflows/l2_circuit/create_layer_2_circuit.py index 00d37b998b4426103befa2673f915646b610de9b..2548639e5471aad73093481b58da78ed73cfe2cf 100644 --- a/gso/workflows/l2_circuit/create_layer_2_circuit.py +++ b/gso/workflows/l2_circuit/create_layer_2_circuit.py @@ -29,7 +29,7 @@ from gso.utils.types.virtual_identifiers import VLAN_ID def initial_input_generator(product_name: str) -> FormGenerator: """Gather input from the operator about a new Layer 2 Circuit subscription.""" - geant_partner_id = get_partner_by_name("GEANT")["partner_id"] + geant_partner_id = get_partner_by_name("GEANT").partner_id class CreateLayer2CircuitServicePage(FormPage): model_config = ConfigDict(title=f"{product_name}") diff --git a/gso/workflows/l3_core_service/create_imported_l3_core_service.py b/gso/workflows/l3_core_service/create_imported_l3_core_service.py index 95a9b13e2f3e34159d7b5f349e5f4e752f4a4e88..f8104c016e217369106c7c53bd1a8c857ce1dbde 100644 --- a/gso/workflows/l3_core_service/create_imported_l3_core_service.py +++ b/gso/workflows/l3_core_service/create_imported_l3_core_service.py @@ -80,7 +80,7 @@ def initial_input_form_generator() -> FormGenerator: @step("Create subscription") def create_subscription(partner: str, service_type: L3CoreServiceType) -> dict: """Create a new subscription object in the database.""" - partner_id = get_partner_by_name(partner)["partner_id"] + partner_id = get_partner_by_name(partner).partner_id match service_type: case L3CoreServiceType.GEANT_IP: product_id = get_product_id_by_name(ProductName.IMPORTED_GEANT_IP) diff --git a/gso/workflows/l3_core_service/migrate_l3_core_service.py b/gso/workflows/l3_core_service/migrate_l3_core_service.py index bf153e337d8421d4d0b77533586f2bb18571191a..2aa87f02e35bc2a6a94f10e7b8efcd56c81dfbfe 100644 --- a/gso/workflows/l3_core_service/migrate_l3_core_service.py +++ b/gso/workflows/l3_core_service/migrate_l3_core_service.py @@ -29,12 +29,12 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: existing_ep_name_list = [ap.sbp.edge_port.owner_subscription_id for ap in subscription.l3_core_service.ap_list] edge_port_subscriptions = list( filter( - lambda ep: bool(ep["customer_id"] == pid) and ep["subscription_id"] not in existing_ep_name_list, - get_active_edge_port_subscriptions(includes=["subscription_id", "description", "customer_id"]), + lambda ep: bool(ep.customer_id == pid) and ep.subscription_id not in existing_ep_name_list, + get_active_edge_port_subscriptions(), ) ) - edge_ports = {str(port["subscription_id"]): port["description"] for port in edge_port_subscriptions} + edge_ports = {str(port.subscription_id): port.description for port in edge_port_subscriptions} return Choice( "Select an Edge Port", diff --git a/gso/workflows/l3_core_service/terminate_l3_core_service.py b/gso/workflows/l3_core_service/terminate_l3_core_service.py new file mode 100644 index 0000000000000000000000000000000000000000..9cf401a5c89edbef1169e9f642ef603fff9b27bb --- /dev/null +++ b/gso/workflows/l3_core_service/terminate_l3_core_service.py @@ -0,0 +1,40 @@ +"""Workflow for terminating a Layer 3 Core Service.""" + +from orchestrator import begin, workflow +from orchestrator.forms import SubmitFormPage +from orchestrator.targets import Target +from orchestrator.types import SubscriptionLifecycle, UUIDstr +from orchestrator.workflow import StepList, done +from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync +from orchestrator.workflows.utils import wrap_modify_initial_input_form +from pydantic_forms.types import FormGenerator + +from gso.products.product_types.l3_core_service import L3CoreService +from gso.utils.types.tt_number import TTNumber + + +def _input_form_generator(subscription_id: UUIDstr) -> FormGenerator: + subscription = L3CoreService.from_subscription(subscription_id) + + class TerminateForm(SubmitFormPage): + tt_number: TTNumber + + yield TerminateForm + return {"subscription": subscription} + + +@workflow( + "Terminate Layer 3 Core Service", + initial_input_form=wrap_modify_initial_input_form(_input_form_generator), + target=Target.TERMINATE, +) +def terminate_l3_core_service() -> StepList: + """Terminate a Layer 3 Core Service subscription.""" + return ( + begin + >> store_process_subscription(Target.TERMINATE) + >> unsync + >> set_status(SubscriptionLifecycle.TERMINATED) + >> resync + >> done + ) diff --git a/gso/workflows/lan_switch_interconnect/create_imported_lan_switch_interconnect.py b/gso/workflows/lan_switch_interconnect/create_imported_lan_switch_interconnect.py index 8c7b78944ee1ba37f40c4d22a2e5a8ea29307a08..03c6e8aa8216809b82d10c51186a2bb56f25634e 100644 --- a/gso/workflows/lan_switch_interconnect/create_imported_lan_switch_interconnect.py +++ b/gso/workflows/lan_switch_interconnect/create_imported_lan_switch_interconnect.py @@ -44,7 +44,7 @@ def _initial_input_form_generator() -> FormGenerator: @step("Create subscription") def create_subscription(partner: str) -> State: """Create a new subscription object.""" - partner_id = get_partner_by_name(partner)["partner_id"] + partner_id = get_partner_by_name(partner).partner_id product_id = get_product_id_by_name(ProductName.IMPORTED_LAN_SWITCH_INTERCONNECT) subscription = ImportedLanSwitchInterconnectInactive.from_product_id(product_id, partner_id) diff --git a/gso/workflows/lan_switch_interconnect/create_lan_switch_interconnect.py b/gso/workflows/lan_switch_interconnect/create_lan_switch_interconnect.py index 1e8c3bc58cedd1eeb4d16bf5f0cfbc7be414c192..0c5f74850a8c518740d1eb73510e8aa0af80a198 100644 --- a/gso/workflows/lan_switch_interconnect/create_lan_switch_interconnect.py +++ b/gso/workflows/lan_switch_interconnect/create_lan_switch_interconnect.py @@ -125,7 +125,7 @@ def _initial_input_form(product_name: str) -> FormGenerator: @step("Create subscription") def create_subscription(product: UUIDstr, partner: str) -> State: """Create a new subscription object in the database.""" - subscription = LanSwitchInterconnectInactive.from_product_id(product, get_partner_by_name(partner)["partner_id"]) + subscription = LanSwitchInterconnectInactive.from_product_id(product, get_partner_by_name(partner).partner_id) return {"subscription": subscription, "subscription_id": subscription.subscription_id} diff --git a/gso/workflows/office_router/create_imported_office_router.py b/gso/workflows/office_router/create_imported_office_router.py index 5f92e36b62840018a91c47b910784e787f882062..c1280c13d0b1e0e729843078939b091573267f14 100644 --- a/gso/workflows/office_router/create_imported_office_router.py +++ b/gso/workflows/office_router/create_imported_office_router.py @@ -20,7 +20,7 @@ from gso.utils.types.ip_address import IPv4AddressType, IPv6AddressType, PortNum @step("Create subscription") def create_subscription(partner: str) -> State: """Create a new subscription object.""" - partner_id = get_partner_by_name(partner)["partner_id"] + partner_id = get_partner_by_name(partner).partner_id product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_OFFICE_ROUTER) subscription = ImportedOfficeRouterInactive.from_product_id(product_id, partner_id) diff --git a/gso/workflows/opengear/create_imported_opengear.py b/gso/workflows/opengear/create_imported_opengear.py index fb9502ebe854604a8f6037e17c1769ff9f0aa87f..a7433f2bfed6a6b5400893d0f52b669a1e70fc9a 100644 --- a/gso/workflows/opengear/create_imported_opengear.py +++ b/gso/workflows/opengear/create_imported_opengear.py @@ -18,7 +18,7 @@ from gso.utils.types.ip_address import IPv4AddressType @step("Create subscription") def create_subscription(partner: str) -> State: """Create a new subscription object.""" - partner_id = get_partner_by_name(partner)["partner_id"] + partner_id = get_partner_by_name(partner).partner_id product_id = get_product_id_by_name(ProductName.IMPORTED_OPENGEAR) subscription = ImportedOpengearInactive.from_product_id(product_id, partner_id) diff --git a/gso/workflows/router/create_imported_router.py b/gso/workflows/router/create_imported_router.py index 3a79df0c91c5451a9e2232afe24b09e9db18ddb9..81acba1aedb5209a3e113ba5542a57492270667b 100644 --- a/gso/workflows/router/create_imported_router.py +++ b/gso/workflows/router/create_imported_router.py @@ -21,7 +21,7 @@ from gso.utils.types.ip_address import IPv4AddressType, IPv6AddressType, PortNum @step("Create subscription") def create_subscription(partner: str) -> State: """Create a new subscription object.""" - partner_id = get_partner_by_name(partner)["partner_id"] + partner_id = get_partner_by_name(partner).partner_id product_id = get_product_id_by_name(ProductName.IMPORTED_ROUTER) subscription = ImportedRouterInactive.from_product_id(product_id, partner_id) diff --git a/gso/workflows/router/create_router.py b/gso/workflows/router/create_router.py index c30fc518981c29dc0538e231cd47e8e27fe556c3..6cd77a108e8c3bf3aaf62827f3b69156b20c1fbe 100644 --- a/gso/workflows/router/create_router.py +++ b/gso/workflows/router/create_router.py @@ -117,7 +117,7 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: @step("Create subscription") def create_subscription(product: UUIDstr, partner: str) -> State: """Create a new subscription object.""" - subscription = RouterInactive.from_product_id(product, get_partner_by_name(partner)["partner_id"]) + subscription = RouterInactive.from_product_id(product, get_partner_by_name(partner).partner_id) return { "subscription": subscription, diff --git a/gso/workflows/site/create_imported_site.py b/gso/workflows/site/create_imported_site.py index d3c1720b0d76daa0e05c1f59f44aeca153c1cdae..16f2471f7fd97b0d6918be70da8e0645415cf3b4 100644 --- a/gso/workflows/site/create_imported_site.py +++ b/gso/workflows/site/create_imported_site.py @@ -22,7 +22,7 @@ from gso.utils.types.ip_address import IPAddress @step("Create subscription") def create_subscription(partner: str) -> State: """Create a new subscription object in the service database.""" - partner_id = get_partner_by_name(partner)["partner_id"] + partner_id = get_partner_by_name(partner).partner_id product_id: UUID = subscriptions.get_product_id_by_name(ProductName.IMPORTED_SITE) subscription = ImportedSiteInactive.from_product_id(product_id, partner_id) diff --git a/gso/workflows/site/create_site.py b/gso/workflows/site/create_site.py index 9fe35fc175ad03cfdce30853e3d175edede6ee12..149178e346c5526f2d42bcee95aae89e977a3c68 100644 --- a/gso/workflows/site/create_site.py +++ b/gso/workflows/site/create_site.py @@ -52,7 +52,7 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: @step("Create subscription") def create_subscription(product: UUIDstr, partner: str) -> State: """Create a new subscription object in the service database.""" - subscription = site.SiteInactive.from_product_id(product, get_partner_by_name(partner)["partner_id"]) + subscription = site.SiteInactive.from_product_id(product, get_partner_by_name(partner).partner_id) return { "subscription": subscription, diff --git a/gso/workflows/super_pop_switch/create_imported_super_pop_switch.py b/gso/workflows/super_pop_switch/create_imported_super_pop_switch.py index e57bbd587aa6f4255376419ded8aeefefd9fbbf8..6b34253173ce2b5ec72963cd274b78141fba2504 100644 --- a/gso/workflows/super_pop_switch/create_imported_super_pop_switch.py +++ b/gso/workflows/super_pop_switch/create_imported_super_pop_switch.py @@ -21,7 +21,7 @@ from gso.utils.types.ip_address import IPv4AddressType, PortNumber @step("Create subscription") def create_subscription(partner: str) -> State: """Create a new subscription object.""" - partner_id = get_partner_by_name(partner)["partner_id"] + partner_id = get_partner_by_name(partner).partner_id product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_SUPER_POP_SWITCH) subscription = ImportedSuperPopSwitchInactive.from_product_id(product_id, partner_id) diff --git a/gso/workflows/switch/create_imported_switch.py b/gso/workflows/switch/create_imported_switch.py index 0d35083f4648eb1387d96f702f22b259510293e4..30d7ea46360d01d431c6c635bc8813af39146e07 100644 --- a/gso/workflows/switch/create_imported_switch.py +++ b/gso/workflows/switch/create_imported_switch.py @@ -32,7 +32,7 @@ def _initial_input_form_generator() -> FormGenerator: @step("Create subscription") def create_subscription(partner: str) -> State: """Create a new subscription object.""" - partner_id = get_partner_by_name(partner)["partner_id"] + partner_id = get_partner_by_name(partner).partner_id product_id = get_product_id_by_name(ProductName.IMPORTED_SWITCH) subscription = ImportedSwitchInactive.from_product_id(product_id, partner_id) diff --git a/gso/workflows/switch/create_switch.py b/gso/workflows/switch/create_switch.py index e566eed2c8a5fd310da7db34efe0cda8bd019642..09b0a03ea25af0880dbf6ef89ac02e6bd49f1a45 100644 --- a/gso/workflows/switch/create_switch.py +++ b/gso/workflows/switch/create_switch.py @@ -67,7 +67,7 @@ def _initial_input_form_generator(product_name: str) -> FormGenerator: @step("Create subscription") def create_subscription(product: UUIDstr, partner: str) -> State: """Create a new subscription object.""" - subscription = SwitchInactive.from_product_id(product, get_partner_by_name(partner)["partner_id"]) + subscription = SwitchInactive.from_product_id(product, get_partner_by_name(partner).partner_id) return {"subscription": subscription, "subscription_id": subscription.subscription_id} diff --git a/gso/workflows/tasks/delete_partners.py b/gso/workflows/tasks/delete_partners.py index 1462744c6a111be81cb5d213aab521ff55861127..15cbe2ac2f932ad2de5fc0d584ce5e00eb3325e5 100644 --- a/gso/workflows/tasks/delete_partners.py +++ b/gso/workflows/tasks/delete_partners.py @@ -32,7 +32,7 @@ def initial_input_form_generator() -> FormGenerator: partner = get_partner_by_name(name=initial_user_input.partners.name) - return {"email": partner["email"], "name": partner["name"], "partner_id": partner["partner_id"]} + return {"email": partner.email, "name": partner.name, "partner_id": partner.partner_id} @step("Delete partner information from database") diff --git a/gso/workflows/tasks/modify_partners.py b/gso/workflows/tasks/modify_partners.py index 3e5dd822a42e7743a31e3edf5613edb164e6bc58..0445ceeeb65f82bb4e7ea8e1124bc2bc8613c34d 100644 --- a/gso/workflows/tasks/modify_partners.py +++ b/gso/workflows/tasks/modify_partners.py @@ -31,12 +31,12 @@ def initial_input_form_generator() -> FormGenerator: class ModifyPartnerForm(SubmitFormPage): model_config = ConfigDict(title="Modify a Partner") - name: str | None = partner["name"] - email: EmailStr | None = partner["email"] + name: str | None = partner.name + email: EmailStr | None = partner.email @field_validator("name") def validate_name(cls, name: str) -> str | None: - if partner["name"] == name: + if partner.name == name: return None if filter_partners_by_name(name=name, case_sensitive=False): msg = "Partner with this name already exists." @@ -45,7 +45,7 @@ def initial_input_form_generator() -> FormGenerator: @field_validator("email") def validate_email(cls, email: str) -> EmailStr | None: - if partner["email"] == email: + if partner.email == email: return None if filter_partners_by_email(email=email, case_sensitive=False): msg = "Partner with this email already exists." @@ -54,7 +54,7 @@ def initial_input_form_generator() -> FormGenerator: user_input = yield ModifyPartnerForm - return user_input.model_dump() | {"partner_id": partner["partner_id"]} + return user_input.model_dump() | {"partner_id": partner.partner_id} @step("Save partner information to database") diff --git a/gso/workflows/vrf/create_vrf.py b/gso/workflows/vrf/create_vrf.py index 8da4f2d91a9584090a3d2b1fd5b53489a1dc1438..8844ea0c0d3911c406446a25268882f383b2b830 100644 --- a/gso/workflows/vrf/create_vrf.py +++ b/gso/workflows/vrf/create_vrf.py @@ -40,7 +40,7 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: @step("Create subscription") def create_subscription(product: UUIDstr, partner: str) -> State: """Create a new VRF subscription.""" - subscription = VRFInactive.from_product_id(product, get_partner_by_name(partner)["partner_id"]) + subscription = VRFInactive.from_product_id(product, get_partner_by_name(partner).partner_id) return { "subscription": subscription, diff --git a/setup.py b/setup.py index bdef2e25ec18b0633410b66a4f919d3e302c1257..bd523b15a8fe2513695e3011c0a6d6717b57f7c6 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ from setuptools import find_packages, setup setup( name="geant-service-orchestrator", - version="2.32", + version="2.33", author="GÉANT Orchestration and Automation Team", author_email="goat@geant.org", description="GÉANT Service Orchestrator", diff --git a/test/conftest.py b/test/conftest.py index 4548fe0fadd99957fee38e96fc0874402063f048..38fa426f64bd4365511221b3c679ff6c1cd31181 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -32,7 +32,6 @@ from sqlalchemy.orm import scoped_session, sessionmaker from starlette.testclient import TestClient from urllib3_mock import Responses -from gso.main import init_gso_app from gso.services.partners import PartnerSchema, create_partner from gso.utils.types.interfaces import LAGMember, LAGMemberList from test.fixtures import * # noqa: F403 @@ -49,6 +48,17 @@ class UseJuniperSide(strEnum): SIDE_BOTH = "side_both" +def pytest_configure(config): + """Set an environment variable before loading any test modules.""" + config_filename = "gso/oss-params-example.json" + os.environ["OSS_PARAMS_FILENAME"] = config_filename + + +def pytest_unconfigure(config): + """Clean up the environment variable after all tests.""" + os.environ.pop("OSS_PARAMS_FILENAME", None) + + class FakerProvider(BaseProvider): def ipv4_network(self, *, min_subnet=1, max_subnet=32) -> ipaddress.IPv4Network: subnet = str(self.generator.random_int(min=min_subnet, max=max_subnet)) @@ -146,16 +156,6 @@ def faker() -> Faker: return fake -@pytest.fixture(scope="session", autouse=True) -def data_config_filename() -> str: - """Set an environment variable to the path of the example OSS parameters file.""" - config_filename = "gso/oss-params-example.json" - - os.environ["OSS_PARAMS_FILENAME"] = config_filename - yield config_filename - del os.environ["OSS_PARAMS_FILENAME"] - - @pytest.fixture(scope="session") def db_uri(): """Provide a unique database URI for each pytest-xdist worker, or a default URI if running without xdist.""" @@ -281,10 +281,13 @@ def fastapi_app(_database, db_uri): This implementation is as close as possible to the one present in orchestrator-core. """ + from gso.main import app + oauth2lib_settings.OAUTH2_ACTIVE = False oauth2lib_settings.ENVIRONMENT_IGNORE_MUTATION_DISABLED = ["local", "TESTING"] app_settings.DATABASE_URI = db_uri - return init_gso_app() + + return app @pytest.fixture(scope="session") diff --git a/test/fixtures/edge_port_fixtures.py b/test/fixtures/edge_port_fixtures.py index 90a30c672d38fa295b29d543a8a50f0342b05d23..6beb81817855be8db10d4292fdb303be849ab43d 100644 --- a/test/fixtures/edge_port_fixtures.py +++ b/test/fixtures/edge_port_fixtures.py @@ -37,24 +37,25 @@ def edge_port_subscription_factory(faker, partner_factory, router_subscription_f *, enable_lacp=True, ignore_if_down=False, - is_imported=True, + is_imported=False, ) -> UUIDstr: partner = partner or partner_factory() - node = Router.from_subscription(router_subscription_factory(vendor=Vendor.NOKIA)).router + node = node or Router.from_subscription(router_subscription_factory(vendor=Vendor.NOKIA)).router + if is_imported: - product_id = subscriptions.get_product_id_by_name(ProductName.EDGE_PORT) - edge_port_subscription = EdgePortInactive.from_product_id( + product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_EDGE_PORT) + edge_port_subscription = ImportedEdgePortInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) else: - product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_EDGE_PORT) - edge_port_subscription = ImportedEdgePortInactive.from_product_id( + product_id = subscriptions.get_product_id_by_name(ProductName.EDGE_PORT) + edge_port_subscription = EdgePortInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) edge_port_subscription.edge_port.edge_port_description = description or faker.text(max_nb_chars=30) edge_port_subscription.edge_port.ga_id = ga_id or faker.ga_id() - edge_port_subscription.edge_port.node = node or node + edge_port_subscription.edge_port.node = node edge_port_subscription.edge_port.edge_port_name = name or f"lag-{faker.pyint(21, 50)}" edge_port_subscription.edge_port.edge_port_description = edge_port_description or faker.sentence() edge_port_subscription.edge_port.enable_lacp = enable_lacp diff --git a/test/fixtures/iptrunk_fixtures.py b/test/fixtures/iptrunk_fixtures.py index 742e974a98001ee4908f08b6502d7d5ca9ae8c20..09fe95827e71a4188dfb5fb3d36b362fa2f7f783 100644 --- a/test/fixtures/iptrunk_fixtures.py +++ b/test/fixtures/iptrunk_fixtures.py @@ -67,19 +67,19 @@ def iptrunk_subscription_factory(iptrunk_side_subscription_factory, faker, geant status: SubscriptionLifecycle | None = None, partner: dict | None = None, *, - is_imported: bool | None = True, + is_imported: bool | None = False, ) -> UUIDstr: if partner is None: partner = geant_partner if is_imported: - product_id = subscriptions.get_product_id_by_name(ProductName.IP_TRUNK) - iptrunk_subscription = IptrunkInactive.from_product_id( + product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_IP_TRUNK) + iptrunk_subscription = ImportedIptrunkInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) else: - product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_IP_TRUNK) - iptrunk_subscription = ImportedIptrunkInactive.from_product_id( + product_id = subscriptions.get_product_id_by_name(ProductName.IP_TRUNK) + iptrunk_subscription = IptrunkInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) diff --git a/test/fixtures/l3_core_service_fixtures.py b/test/fixtures/l3_core_service_fixtures.py index 21ea6379595e8860d95e35508ec1b6a6e3cd7721..c810d6412a03eaaeda1a44f8cd1c7093a3247942 100644 --- a/test/fixtures/l3_core_service_fixtures.py +++ b/test/fixtures/l3_core_service_fixtures.py @@ -127,11 +127,12 @@ def access_port_factory(faker, service_binding_port_factory): def create_access_port( ap_type: APType | None = None, service_binding_port: ServiceBindingPort | None = None, + edge_port: EdgePort | None = None, ): return AccessPort.new( subscription_id=uuid4(), ap_type=ap_type or random.choice(list(APType)), # noqa: S311 - sbp=service_binding_port or service_binding_port_factory(), + sbp=service_binding_port or service_binding_port_factory(edge_port=edge_port), ) return create_access_port @@ -150,6 +151,7 @@ def l3_core_service_subscription_factory( ap_list: list[AccessPort] | None = None, start_date="2023-05-24T00:00:00+00:00", status: SubscriptionLifecycle | None = None, + edge_port: EdgePort | None = None, ) -> UUIDstr: partner = partner or partner_factory() match l3_core_service_type: @@ -209,8 +211,8 @@ def l3_core_service_subscription_factory( # Default ap_list creation with primary and backup access ports l3_core_service_subscription.l3_core_service.ap_list = ap_list or [ - access_port_factory(ap_type=APType.PRIMARY), - access_port_factory(ap_type=APType.BACKUP), + access_port_factory(ap_type=APType.PRIMARY, edge_port=edge_port), + access_port_factory(ap_type=APType.BACKUP, edge_port=edge_port), ] # Update subscription with description, start date, and status diff --git a/test/fixtures/lan_switch_interconnect_fixtures.py b/test/fixtures/lan_switch_interconnect_fixtures.py index b98d800d1fa68f5b2e22b8d75c651ceb7dc9adfb..e2cb053263cc153bc57a9139041754c8bf648d2c 100644 --- a/test/fixtures/lan_switch_interconnect_fixtures.py +++ b/test/fixtures/lan_switch_interconnect_fixtures.py @@ -45,16 +45,16 @@ def lan_switch_interconnect_subscription_factory( switch_side_ae_iface: str | None = None, switch_side_ae_members: list[dict[str, str]] | None = None, *, - is_imported: bool | None = True, + is_imported: bool | None = False, ) -> UUIDstr: if partner is None: partner = geant_partner if is_imported: - product_id = get_product_id_by_name(ProductName.LAN_SWITCH_INTERCONNECT) - subscription = LanSwitchInterconnectInactive.from_product_id(product_id, partner["partner_id"]) - else: product_id = get_product_id_by_name(ProductName.IMPORTED_LAN_SWITCH_INTERCONNECT) subscription = ImportedLanSwitchInterconnectInactive.from_product_id(product_id, partner["partner_id"]) + else: + product_id = get_product_id_by_name(ProductName.LAN_SWITCH_INTERCONNECT) + subscription = LanSwitchInterconnectInactive.from_product_id(product_id, partner["partner_id"]) router_side_ae_members = router_side_ae_members or [ LanSwitchInterconnectInterfaceBlockInactive.new( diff --git a/test/fixtures/office_router_fixtures.py b/test/fixtures/office_router_fixtures.py index 0c55ada08ae12b730a491f15eedd42af15e23b6d..070e3c9f517836fbeb81d1a0bda59525d6da25d7 100644 --- a/test/fixtures/office_router_fixtures.py +++ b/test/fixtures/office_router_fixtures.py @@ -25,7 +25,7 @@ def office_router_subscription_factory(site_subscription_factory, faker, geant_p status: SubscriptionLifecycle | None = None, partner: dict | None = None, *, - is_imported: bool | None = True, + is_imported: bool | None = False, ) -> UUIDstr: if partner is None: partner = geant_partner @@ -38,13 +38,13 @@ def office_router_subscription_factory(site_subscription_factory, faker, geant_p office_router_site = office_router_site or site_subscription_factory() if is_imported: - product_id = subscriptions.get_product_id_by_name(ProductName.OFFICE_ROUTER) - office_router_subscription = OfficeRouterInactive.from_product_id( + product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_OFFICE_ROUTER) + office_router_subscription = ImportedOfficeRouterInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) else: - product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_OFFICE_ROUTER) - office_router_subscription = ImportedOfficeRouterInactive.from_product_id( + product_id = subscriptions.get_product_id_by_name(ProductName.OFFICE_ROUTER) + office_router_subscription = OfficeRouterInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) diff --git a/test/fixtures/opengear_fixtures.py b/test/fixtures/opengear_fixtures.py index b6dbff50f0fdc8c786caadddccade778311a83e1..93e0b7c6f841d14df22ce9df58843864d9df9a31 100644 --- a/test/fixtures/opengear_fixtures.py +++ b/test/fixtures/opengear_fixtures.py @@ -22,7 +22,7 @@ def opengear_subscription_factory(site_subscription_factory, faker, geant_partne status: SubscriptionLifecycle | None = None, partner: dict | None = None, *, - is_imported: bool | None = True, + is_imported: bool | None = False, ) -> UUIDstr: if partner is None: partner = geant_partner @@ -35,13 +35,13 @@ def opengear_subscription_factory(site_subscription_factory, faker, geant_partne opengear_wan_gateway = opengear_wan_gateway or faker.ipv4() if is_imported: - product_id = subscriptions.get_product_id_by_name(ProductName.OPENGEAR) - opengear_subscription = OpengearInactive.from_product_id( + product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_OPENGEAR) + opengear_subscription = ImportedOpengearInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) else: - product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_OPENGEAR) - opengear_subscription = ImportedOpengearInactive.from_product_id( + product_id = subscriptions.get_product_id_by_name(ProductName.OPENGEAR) + opengear_subscription = OpengearInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) diff --git a/test/fixtures/router_fixtures.py b/test/fixtures/router_fixtures.py index 04c77cede9d10f0d573e217983bd3a1462281d3e..1ae14dcc4cc420e187a3fc7efbb59d62efef3d5a 100644 --- a/test/fixtures/router_fixtures.py +++ b/test/fixtures/router_fixtures.py @@ -32,18 +32,18 @@ def router_subscription_factory(site_subscription_factory, faker, geant_partner) vendor: Vendor | None = Vendor.NOKIA, *, router_access_via_ts: bool | None = None, - is_imported: bool | None = True, + is_imported: bool | None = False, ) -> UUIDstr: if partner is None: partner = geant_partner if is_imported: - product_id = subscriptions.get_product_id_by_name(ProductName.ROUTER) - router_subscription = RouterInactive.from_product_id( + product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_ROUTER) + router_subscription = ImportedRouterInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) else: - product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_ROUTER) - router_subscription = ImportedRouterInactive.from_product_id( + product_id = subscriptions.get_product_id_by_name(ProductName.ROUTER) + router_subscription = RouterInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) diff --git a/test/fixtures/site_fixtures.py b/test/fixtures/site_fixtures.py index 2467bd89067602cfd8b99270b81a4a350efe93e5..c97c11b38528ec1dcd3c43410d6a62e5bf52cedb 100644 --- a/test/fixtures/site_fixtures.py +++ b/test/fixtures/site_fixtures.py @@ -30,20 +30,20 @@ def site_subscription_factory(faker, geant_partner): partner: dict | None = None, start_date="2023-05-24T00:00:00+00:00", *, - is_imported: bool | None = True, + is_imported: bool | None = False, site_contains_optical_equipment: bool | None = True, ) -> UUIDstr: if partner is None: partner = geant_partner if is_imported: - product_id = subscriptions.get_product_id_by_name(ProductName.SITE) - site_subscription = SiteInactive.from_product_id(product_id, customer_id=partner["partner_id"], insync=True) - else: product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_SITE) site_subscription = ImportedSiteInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) + else: + product_id = subscriptions.get_product_id_by_name(ProductName.SITE) + site_subscription = SiteInactive.from_product_id(product_id, customer_id=partner["partner_id"], insync=True) site_subscription.site.site_city = site_city or faker.city() site_subscription.site.site_name = site_name or faker.site_name() diff --git a/test/fixtures/super_pop_switch_fixtures.py b/test/fixtures/super_pop_switch_fixtures.py index 5350e2a70b8eead32521ee16e5de095f24585133..464c0360a910f008e732e14bf477e382e66b56c6 100644 --- a/test/fixtures/super_pop_switch_fixtures.py +++ b/test/fixtures/super_pop_switch_fixtures.py @@ -24,7 +24,7 @@ def super_pop_switch_subscription_factory(site_subscription_factory, faker, gean status: SubscriptionLifecycle | None = None, partner: dict | None = None, *, - is_imported: bool | None = True, + is_imported: bool | None = False, ) -> UUIDstr: if partner is None: partner = geant_partner @@ -36,13 +36,13 @@ def super_pop_switch_subscription_factory(site_subscription_factory, faker, gean super_pop_switch_site = super_pop_switch_site or site_subscription_factory() if is_imported: - product_id = subscriptions.get_product_id_by_name(ProductName.SUPER_POP_SWITCH) - super_pop_switch_subscription = SuperPopSwitchInactive.from_product_id( + product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_SUPER_POP_SWITCH) + super_pop_switch_subscription = ImportedSuperPopSwitchInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) else: - product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_SUPER_POP_SWITCH) - super_pop_switch_subscription = ImportedSuperPopSwitchInactive.from_product_id( + product_id = subscriptions.get_product_id_by_name(ProductName.SUPER_POP_SWITCH) + super_pop_switch_subscription = SuperPopSwitchInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) diff --git a/test/fixtures/switch_fixtures.py b/test/fixtures/switch_fixtures.py index c0167a1fbc684f121be38b543fcdc1e9a9bd607d..1c42b54caae75d76042b7b311a11ab522806d064 100644 --- a/test/fixtures/switch_fixtures.py +++ b/test/fixtures/switch_fixtures.py @@ -26,17 +26,18 @@ def switch_subscription_factory(faker, geant_partner, site_subscription_factory) switch_model: SwitchModel | None = None, status: SubscriptionLifecycle | None = None, *, - is_imported: bool = True, + is_imported: bool = False, ) -> UUIDstr: if partner is None: partner = geant_partner if is_imported: - product_id = get_product_id_by_name(ProductName.SWITCH) - switch_subscription = SwitchInactive.from_product_id(product_id, partner["partner_id"]) - else: product_id = get_product_id_by_name(ProductName.IMPORTED_SWITCH) switch_subscription = ImportedSwitchInactive.from_product_id(product_id, partner["partner_id"]) + else: + product_id = get_product_id_by_name(ProductName.SWITCH) + switch_subscription = SwitchInactive.from_product_id(product_id, partner["partner_id"]) + switch_subscription.switch.fqdn = fqdn or faker.domain_name(levels=4) switch_subscription.switch.ts_port = ts_port or faker.port_number(is_user=True) switch_subscription.switch.site = site or Site.from_subscription(site_subscription_factory()).site diff --git a/test/services/test_infoblox.py b/test/services/test_infoblox.py index 3a7332606ed6314c104fd281a1c8c03bfcd18a91..594eae35ad256c1938fb05a6c5a4144df214c81d 100644 --- a/test/services/test_infoblox.py +++ b/test/services/test_infoblox.py @@ -1,7 +1,6 @@ import ipaddress import logging import re -from os import PathLike import pytest import responses @@ -151,7 +150,7 @@ def _set_up_host_responses(): @responses.activate -def test_allocate_networks(data_config_filename: PathLike): +def test_allocate_networks(): _set_up_network_responses() new_v4_network = infoblox.allocate_v4_network("TRUNK") @@ -162,7 +161,7 @@ def test_allocate_networks(data_config_filename: PathLike): @responses.activate -def test_allocate_bad_network(data_config_filename: PathLike): +def test_allocate_bad_network(): _set_up_network_responses() with pytest.raises(AllocationError) as e: @@ -175,7 +174,7 @@ def test_allocate_bad_network(data_config_filename: PathLike): @responses.activate -def test_allocate_good_host(data_config_filename: PathLike): +def test_allocate_good_host(): _set_up_host_responses() new_host = infoblox.allocate_host("test.lo.geant.net", "LO", [], "test host") assert new_host == ( @@ -185,7 +184,7 @@ def test_allocate_good_host(data_config_filename: PathLike): @responses.activate -def test_allocate_bad_host(data_config_filename: PathLike): +def test_allocate_bad_host(): _set_up_host_responses() with pytest.raises(AllocationError) as e: infoblox.allocate_host("broken", "TRUNK", [], "Unavailable host") @@ -193,7 +192,7 @@ def test_allocate_bad_host(data_config_filename: PathLike): @responses.activate -def test_delete_good_network(data_config_filename: PathLike): +def test_delete_good_network(): responses.add( method=responses.GET, url="https://10.0.0.1/wapi/v2.12/network?network=10.255.255.0%2F26&_return_fields=comment%2Cextattrs%2Cnetwork%" @@ -217,7 +216,7 @@ def test_delete_good_network(data_config_filename: PathLike): @responses.activate -def test_delete_non_existent_network(data_config_filename: PathLike, caplog): +def test_delete_non_existent_network(caplog): responses.add( method=responses.GET, url="https://10.0.0.1/wapi/v2.12/network?network=10.255.255.0%2F26&_return_fields=comment%2Cextattrs%2Cnetwork%" @@ -230,7 +229,7 @@ def test_delete_non_existent_network(data_config_filename: PathLike, caplog): @responses.activate -def test_delete_good_host(data_config_filename: PathLike): +def test_delete_good_host(): responses.add( method=responses.GET, url=re.compile( @@ -277,7 +276,7 @@ def test_delete_good_host(data_config_filename: PathLike): @responses.activate -def test_delete_bad_host(data_config_filename: PathLike, caplog): +def test_delete_bad_host(caplog): responses.add( method=responses.GET, url=re.compile(r".+"), diff --git a/test/services/test_netbox_client.py b/test/services/test_netbox_client.py index 55b82af17c7e9b67b140ed66d51e88905ec14b3a..cce1910ad0264db692d55e4ea635c9b78a3de1e1 100644 --- a/test/services/test_netbox_client.py +++ b/test/services/test_netbox_client.py @@ -1,7 +1,6 @@ """Unit tests for testing the netbox client.""" import uuid -from os import PathLike from unittest.mock import Mock, patch import pytest @@ -83,7 +82,6 @@ def test_create_device( site, device_bay, card_type, - data_config_filename: PathLike, ): device_name = "mx1.lab.geant.net" device.name = device_name @@ -105,7 +103,7 @@ def test_create_device( @patch("gso.services.netbox_client.Router.from_subscription") @patch("gso.services.netbox_client.pynetbox.api") -def test_get_available_lags(mock_api, mock_from_subscription, data_config_filename: PathLike): +def test_get_available_lags(mock_api, mock_from_subscription): router_id = uuid.uuid4() feasible_lags = [f"lag-{i}" for i in range(1, 11)] @@ -127,7 +125,7 @@ def test_get_available_lags(mock_api, mock_from_subscription, data_config_filena @patch("gso.services.netbox_client.pynetbox.api") -def test_create_interface(mock_api, device, interface, data_config_filename: PathLike): +def test_create_interface(mock_api, device, interface): # Mock netbox calls mock_api.return_value.dcim.devices.get.return_value = device mock_api.return_value.dcim.interfaces.create.return_value = interface @@ -141,7 +139,7 @@ def test_create_interface(mock_api, device, interface, data_config_filename: Pat @patch("gso.services.netbox_client.pynetbox.api") -def test_reserve_interface_exception(mock_api, device, interface, data_config_filename: PathLike): +def test_reserve_interface_exception(mock_api, device, interface): """Test for checking if interface is reserved. If the interface is already reserved @@ -163,7 +161,7 @@ def test_reserve_interface_exception(mock_api, device, interface, data_config_fi @patch("gso.services.netbox_client.pynetbox.api") -def test_reserve_interface(mock_api, device, interface, data_config_filename: PathLike): +def test_reserve_interface(mock_api, device, interface): """Test a normal reservation of a interface.""" # Set interface to not reserved interface.enabled = False @@ -186,7 +184,7 @@ def test_reserve_interface(mock_api, device, interface, data_config_filename: Pa @patch("gso.services.netbox_client.pynetbox.api") -def test_allocate_interface_exception(mock_api, device, interface, data_config_filename: PathLike): +def test_allocate_interface_exception(mock_api, device, interface): """Test to check exception during allocation. If the interface is already allocated @@ -211,7 +209,7 @@ def test_allocate_interface_exception(mock_api, device, interface, data_config_f @patch("gso.services.netbox_client.pynetbox.api") -def test_allocation_interface(mock_api, device, interface, data_config_filename: PathLike): +def test_allocation_interface(mock_api, device, interface): """Test a normal allocation of a interface.""" # Set interface to not allocated interface.mark_connected = False @@ -234,7 +232,7 @@ def test_allocation_interface(mock_api, device, interface, data_config_filename: @patch("gso.services.netbox_client.pynetbox.api") -def test_delete_device(mock_api, device, data_config_filename: PathLike): +def test_delete_device(mock_api, device): """Test a delete of a device.""" # Mock netbox api mock_api.return_value.dcim.devices.get.return_value = device @@ -250,7 +248,7 @@ def test_delete_device(mock_api, device, data_config_filename: PathLike): @patch("gso.services.netbox_client.pynetbox.api") -def test_get_interfaces_by_device(mock_api, device, interface, data_config_filename: PathLike): +def test_get_interfaces_by_device(mock_api, device, interface): """Test if a interface is returned for a device.""" # Setup interface speed speed = "1000" @@ -267,7 +265,7 @@ def test_get_interfaces_by_device(mock_api, device, interface, data_config_filen @patch("gso.services.netbox_client.pynetbox.api") -def test_attach_interface_to_lag(mock_api, device, interface, lag, data_config_filename: PathLike): +def test_attach_interface_to_lag(mock_api, device, interface, lag): """Test if a interface is attached correctly to a LAG interface.""" # Define site effect function diff --git a/test/workflows/__init__.py b/test/workflows/__init__.py index 0607621d10c0de9066b244ba256b69cabed73104..d01dde086f1d5f2d3d95d2b63c4540547c94b412 100644 --- a/test/workflows/__init__.py +++ b/test/workflows/__init__.py @@ -272,14 +272,6 @@ def resume_workflow( return result, step_log -def user_accept_and_assert_suspended(process_stat, step_log, extra_data=None): - extra_data = extra_data or {} - result, step_log = resume_workflow(process_stat, step_log, extra_data) - assert_suspended(result) - - return result, step_log - - def assert_lso_success(result: Process, process_stat: ProcessStat, step_log: list): """Assert a successful LSO execution in a workflow.""" assert_awaiting_callback(result) diff --git a/test/workflows/edge_port/test_create_edge_port.py b/test/workflows/edge_port/test_create_edge_port.py index 90cf117b15bca2958308dd7bec3807592782673e..9690b587f62f574b701646e5a2de92b76bc52033 100644 --- a/test/workflows/edge_port/test_create_edge_port.py +++ b/test/workflows/edge_port/test_create_edge_port.py @@ -1,4 +1,3 @@ -from os import PathLike from unittest.mock import patch import pytest @@ -86,7 +85,6 @@ def test_successful_edge_port_creation( input_form_wizard_data, faker, _netbox_client_mock, # noqa: PT019 - data_config_filename: PathLike, test_client, ): product_id = get_product_id_by_name(ProductName.EDGE_PORT) @@ -117,7 +115,6 @@ def test_successful_edge_port_creation_with_auto_ga_id_creation( input_form_wizard_data, faker, _netbox_client_mock, # noqa: PT019 - data_config_filename: PathLike, test_client, ): product_id = get_product_id_by_name(ProductName.EDGE_PORT) @@ -144,7 +141,6 @@ def test_edge_port_creation_with_invalid_input( input_form_wizard_data, faker, _netbox_client_mock, # noqa: PT019 - data_config_filename: PathLike, test_client, ): product_id = get_product_id_by_name(ProductName.EDGE_PORT) diff --git a/test/workflows/edge_port/test_import_edge_port.py b/test/workflows/edge_port/test_import_edge_port.py index 7fb1fcbdf851c9fb7b73aa30933e508c679b8743..d417a59bccf7727fe1d49a31f959833fd3d683e3 100644 --- a/test/workflows/edge_port/test_import_edge_port.py +++ b/test/workflows/edge_port/test_import_edge_port.py @@ -8,7 +8,7 @@ from test.workflows import assert_complete, run_workflow @pytest.mark.workflow() def test_import_edge_port_success(edge_port_subscription_factory): - imported_edge_port = edge_port_subscription_factory(is_imported=False) + imported_edge_port = edge_port_subscription_factory(is_imported=True) result, _, _ = run_workflow("import_edge_port", [{"subscription_id": imported_edge_port}]) subscription = EdgePort.from_subscription(imported_edge_port) diff --git a/test/workflows/edge_port/test_modify_edge_port.py b/test/workflows/edge_port/test_modify_edge_port.py index 227cde2380bb5a9aaa0fc112a55660cdb83bf575..d62598b64e717795cc2746ab39f1e6b052802347 100644 --- a/test/workflows/edge_port/test_modify_edge_port.py +++ b/test/workflows/edge_port/test_modify_edge_port.py @@ -75,7 +75,6 @@ def test_modify_edge_port_with_changing_capacity( mocked_execute_playbook, input_form_wizard_data, faker, - data_config_filename, ): # Set up mock return values mocked_netbox = MockedNetboxClient() @@ -152,7 +151,6 @@ def test_modify_edge_port_without_changing_capacity( mocked_execute_playbook, input_form_wizard_without_changing_capacity, faker, - data_config_filename, ): # Set up mock return values mocked_netbox = MockedNetboxClient() diff --git a/test/workflows/edge_port/test_terminate_edge_port.py b/test/workflows/edge_port/test_terminate_edge_port.py index 9af27ab4195c0d7bccb544eab432b4e5a216dcc7..6ae7967726af992c8f8c2819b2bf99d51c56c4ea 100644 --- a/test/workflows/edge_port/test_terminate_edge_port.py +++ b/test/workflows/edge_port/test_terminate_edge_port.py @@ -22,7 +22,6 @@ def test_successful_edge_port_termination( mock_execute_playbook, edge_port_subscription_factory, faker, - data_config_filename, ): # Set up mock return values subscription_id = edge_port_subscription_factory() diff --git a/test/workflows/edge_port/test_validate_edge_port.py b/test/workflows/edge_port/test_validate_edge_port.py index 37f9e9dd8522932f1e4c7989a42413b0ca66c6be..94494da13d0828184a9898ff0d7cfc9b5f4b8ee7 100644 --- a/test/workflows/edge_port/test_validate_edge_port.py +++ b/test/workflows/edge_port/test_validate_edge_port.py @@ -20,7 +20,6 @@ def test_validate_edge_port_success( mock_execute_playbook, edge_port_subscription_factory, faker, - data_config_filename, ): subscription_id = edge_port_subscription_factory() mock_get_interface_by_name_and_device.side_effect = [ diff --git a/test/workflows/iptrunk/test_create_iptrunk.py b/test/workflows/iptrunk/test_create_iptrunk.py index d0f9cfa539b3b52600dff506931c3cd0699250ce..f6b0092752c43a017dac42e5727e462b58bea6fd 100644 --- a/test/workflows/iptrunk/test_create_iptrunk.py +++ b/test/workflows/iptrunk/test_create_iptrunk.py @@ -1,4 +1,3 @@ -from os import PathLike from unittest.mock import patch import pytest @@ -119,7 +118,6 @@ def test_successful_iptrunk_creation_with_standard_lso_result( input_form_wizard_data, faker, _netbox_client_mock, # noqa: PT019 - data_config_filename: PathLike, test_client, ): mock_create_host.return_value = None @@ -174,7 +172,6 @@ def test_iptrunk_creation_fails_when_lso_return_code_is_one( input_form_wizard_data, faker, _netbox_client_mock, # noqa: PT019 - data_config_filename: PathLike, ): mock_allocate_v4_network.return_value = faker.ipv4_network(min_subnet=31, max_subnet=31) mock_allocate_v6_network.return_value = faker.ipv6_network(min_subnet=126, max_subnet=126) @@ -213,7 +210,6 @@ def test_successful_iptrunk_creation_with_juniper_interface_names( mock_execute_playbook, input_form_wizard_data, faker, - data_config_filename: PathLike, _netbox_client_mock, # noqa: PT019 test_client, ): diff --git a/test/workflows/iptrunk/test_import_iptrunk.py b/test/workflows/iptrunk/test_import_iptrunk.py index 52a52c329dbad987d3b360315522d0bda827354c..eeca3e310cd8c9be1f1980a33c035dc8d465b0f7 100644 --- a/test/workflows/iptrunk/test_import_iptrunk.py +++ b/test/workflows/iptrunk/test_import_iptrunk.py @@ -8,7 +8,7 @@ from test.workflows import assert_complete, run_workflow @pytest.mark.workflow() def test_import_iptrunk_success(iptrunk_subscription_factory): - imported_iptrunk = iptrunk_subscription_factory(is_imported=False) + imported_iptrunk = iptrunk_subscription_factory(is_imported=True) result, _, _ = run_workflow("import_iptrunk", [{"subscription_id": imported_iptrunk}]) subscription = Iptrunk.from_subscription(imported_iptrunk) diff --git a/test/workflows/iptrunk/test_migrate_iptrunk.py b/test/workflows/iptrunk/test_migrate_iptrunk.py index 078942a1e9a9c347f8af5eb3cd773dfc6932436d..2ed47dcc19473f818ffa6e6525a4db40018e26cd 100644 --- a/test/workflows/iptrunk/test_migrate_iptrunk.py +++ b/test/workflows/iptrunk/test_migrate_iptrunk.py @@ -1,4 +1,3 @@ -from os import PathLike from unittest.mock import patch import pytest @@ -138,7 +137,6 @@ def test_migrate_iptrunk_success( # noqa: PLR0915 mock_create_host_by_ip, migrate_form_input, restore_isis_metric, - data_config_filename: PathLike, ): # Set up mock return values mocked_netbox = MockedNetboxClient() diff --git a/test/workflows/iptrunk/test_modify_trunk_interface.py b/test/workflows/iptrunk/test_modify_trunk_interface.py index d8a96588a3bc6182fa498b2b50a8e849c646b483..2f77576dae1a9b712dd5a89f8aa2124d99ae9a4a 100644 --- a/test/workflows/iptrunk/test_modify_trunk_interface.py +++ b/test/workflows/iptrunk/test_modify_trunk_interface.py @@ -108,7 +108,6 @@ def test_iptrunk_modify_trunk_interface_success( mock_provision_ip_trunk, input_form_iptrunk_data, faker, - data_config_filename, ): # Set up mock return values mocked_netbox = MockedNetboxClient() diff --git a/test/workflows/iptrunk/test_terminate_iptrunk.py b/test/workflows/iptrunk/test_terminate_iptrunk.py index 83e0324a0f1baa661dbd473bf11d179d17588f95..9e1ab25e551d2e5c9aa4154b55456544aae2bbf2 100644 --- a/test/workflows/iptrunk/test_terminate_iptrunk.py +++ b/test/workflows/iptrunk/test_terminate_iptrunk.py @@ -26,7 +26,6 @@ def test_successful_iptrunk_termination( mock_execute_playbook, iptrunk_subscription_factory, faker, - data_config_filename, router_subscription_factory, ): # Set up mock return values diff --git a/test/workflows/iptrunk/test_validate_iptrunk.py b/test/workflows/iptrunk/test_validate_iptrunk.py index 88f998cd2b9dc6db677689e556f02e0a73162966..5a8d924109f592d7de80bcf16ca68bb7801895c7 100644 --- a/test/workflows/iptrunk/test_validate_iptrunk.py +++ b/test/workflows/iptrunk/test_validate_iptrunk.py @@ -54,7 +54,6 @@ def test_validate_iptrunk_success( mock_find_v6_host_by_fqdn, mock_find_network_by_cidr, faker, - data_config_filename, iptrunk_subscription_factory, iptrunk_side_subscription_factory, router_subscription_factory, @@ -212,7 +211,6 @@ def test_validate_iptrunk_skip_legacy_trunks( mock_find_v6_host_by_fqdn, mock_find_network_by_cidr, faker, - data_config_filename, iptrunk_subscription_factory, iptrunk_side_subscription_factory, router_subscription_factory, diff --git a/test/workflows/l2_circuit/test_create_layer_2_circuit.py b/test/workflows/l2_circuit/test_create_layer_2_circuit.py index 29eb825ab6a533c80783f914a959bbe14b111560..04144556d1147c4de0e689b40d2f93020f943867 100644 --- a/test/workflows/l2_circuit/test_create_layer_2_circuit.py +++ b/test/workflows/l2_circuit/test_create_layer_2_circuit.py @@ -42,7 +42,6 @@ def test_create_layer_2_circuit_success( layer_2_circuit_input, faker, partner_factory, - data_config_filename, ): result, _, _ = run_workflow("create_layer_2_circuit", layer_2_circuit_input) assert_complete(result) diff --git a/test/workflows/l2_circuit/test_modify_layer_2_circuit.py b/test/workflows/l2_circuit/test_modify_layer_2_circuit.py index 0e20789722003af945f273076022a33ada5aa700..74ff7a9eeeb2c690afdcf04a3071b0cc85536da0 100644 --- a/test/workflows/l2_circuit/test_modify_layer_2_circuit.py +++ b/test/workflows/l2_circuit/test_modify_layer_2_circuit.py @@ -14,7 +14,6 @@ def test_modify_layer_2_circuit_change_policer_bandwidth( layer_2_circuit_subscription_factory, faker, partner_factory, - data_config_filename, ): subscription_id = layer_2_circuit_subscription_factory(layer_2_circuit_service_type=layer_2_circuit_service_type) subscription = Layer2Circuit.from_subscription(subscription_id) @@ -50,7 +49,6 @@ def test_modify_layer_2_circuit_change_circuit_type( layer_2_circuit_subscription_factory, faker, partner_factory, - data_config_filename, ): subscription_id = layer_2_circuit_subscription_factory(layer_2_circuit_service_type=layer_2_circuit_service_type) subscription = Layer2Circuit.from_subscription(subscription_id) diff --git a/test/workflows/l3_core_service/test_create_l3_core_service.py b/test/workflows/l3_core_service/test_create_l3_core_service.py index bab30bef4197dd9c7ce4176ff7bca1d7c493a0de..6196249a55a691fc9a6f5aa0b4f3b028db650017 100644 --- a/test/workflows/l3_core_service/test_create_l3_core_service.py +++ b/test/workflows/l3_core_service/test_create_l3_core_service.py @@ -49,7 +49,6 @@ def test_create_l3_core_service_success( partner_factory, edge_port_subscription_factory, base_bgp_peer_input, - data_config_filename, ): partner = partner_factory() product_id = get_product_id_by_name(l3_core_type) diff --git a/test/workflows/l3_core_service/test_terminate_l3_core_service.py b/test/workflows/l3_core_service/test_terminate_l3_core_service.py new file mode 100644 index 0000000000000000000000000000000000000000..b3c48cec74c8c046ea7fcb0cc3768b0ddb441a1c --- /dev/null +++ b/test/workflows/l3_core_service/test_terminate_l3_core_service.py @@ -0,0 +1,27 @@ +import pytest + +from gso.products.product_types.l3_core_service import L3CoreService, L3CoreServiceType +from test.workflows import assert_complete, extract_state, run_workflow + + +@pytest.mark.workflow() +@pytest.mark.parametrize( + "l3_core_service_type", + [ + L3CoreServiceType.GEANT_IP, + L3CoreServiceType.IAS, + L3CoreServiceType.COPERNICUS, + L3CoreServiceType.LHCONE, + L3CoreServiceType.GWS, + ], +) +def test_terminate_l3_core_service(l3_core_service_type, l3_core_service_subscription_factory, faker): + subscription_id = l3_core_service_subscription_factory(l3_core_service_type=l3_core_service_type) + initial_form_data = [{"subscription_id": subscription_id}, {"tt_number": faker.tt_number()}] + result, _, _ = run_workflow("terminate_l3_core_service", initial_form_data) + assert_complete(result) + + state = extract_state(result) + subscription_id = state["subscription_id"] + subscription = L3CoreService.from_subscription(subscription_id) + assert subscription.status == "terminated" diff --git a/test/workflows/lan_switch_interconnect/test_import_lan_switch_interconnect.py b/test/workflows/lan_switch_interconnect/test_import_lan_switch_interconnect.py index 5808299f5de5f01fd001a39819f091cb716bca8b..3d82ad065f73c6274c601dd2734977c6a63deffd 100644 --- a/test/workflows/lan_switch_interconnect/test_import_lan_switch_interconnect.py +++ b/test/workflows/lan_switch_interconnect/test_import_lan_switch_interconnect.py @@ -8,7 +8,7 @@ from test.workflows import assert_complete, run_workflow @pytest.mark.workflow() def test_import_lan_switch_interconnect_success(lan_switch_interconnect_subscription_factory): - imported_lan_switch_interconnect = lan_switch_interconnect_subscription_factory(is_imported=False) + imported_lan_switch_interconnect = lan_switch_interconnect_subscription_factory(is_imported=True) result, _, _ = run_workflow( "import_lan_switch_interconnect", [{"subscription_id": imported_lan_switch_interconnect}] ) diff --git a/test/workflows/office_router/test_import_office_router.py b/test/workflows/office_router/test_import_office_router.py index e86e3ed1978271adb61cc4c0024759434ba9953a..306cffa4b8be910122313e7af080533f00f0c14a 100644 --- a/test/workflows/office_router/test_import_office_router.py +++ b/test/workflows/office_router/test_import_office_router.py @@ -8,7 +8,7 @@ from test.workflows import assert_complete, run_workflow @pytest.mark.workflow() def test_import_office_router_success(office_router_subscription_factory): - imported_office_router = office_router_subscription_factory(is_imported=False) + imported_office_router = office_router_subscription_factory(is_imported=True) result, _, _ = run_workflow("import_office_router", [{"subscription_id": imported_office_router}]) subscription = OfficeRouter.from_subscription(imported_office_router) diff --git a/test/workflows/opengear/test_import_opengear.py b/test/workflows/opengear/test_import_opengear.py index 6ca1da2523951ffc5ff8fe3e296bfd46fed9a76f..8643ec4935c7bcda4783fb9e33adbdc0d2f605c1 100644 --- a/test/workflows/opengear/test_import_opengear.py +++ b/test/workflows/opengear/test_import_opengear.py @@ -8,7 +8,7 @@ from test.workflows import assert_complete, run_workflow @pytest.mark.workflow() def test_import_office_router_success(opengear_subscription_factory): - imported_opengear = opengear_subscription_factory(is_imported=False) + imported_opengear = opengear_subscription_factory(is_imported=True) result, _, _ = run_workflow("import_opengear", [{"subscription_id": imported_opengear}]) subscription = Opengear.from_subscription(imported_opengear) diff --git a/test/workflows/router/test_import_router.py b/test/workflows/router/test_import_router.py index 2bab9ac22a03bb9a0ec5f60da14fb56403668033..74fed0f5fa03324dcb535b029668d7d2d89bdbf8 100644 --- a/test/workflows/router/test_import_router.py +++ b/test/workflows/router/test_import_router.py @@ -8,7 +8,7 @@ from test.workflows import assert_complete, run_workflow @pytest.mark.workflow() def test_import_site_success(router_subscription_factory): - imported_router = router_subscription_factory(is_imported=False) + imported_router = router_subscription_factory(is_imported=True) result, _, _ = run_workflow("import_router", [{"subscription_id": imported_router}]) subscription = Router.from_subscription(imported_router) diff --git a/test/workflows/router/test_promote_p_to_pe.py b/test/workflows/router/test_promote_p_to_pe.py index 3a8d3be9c8ff7714c26833bbd01a79388ad8167e..1c7ef1dc3e74e1119e288c9ea6c1a100a0124da1 100644 --- a/test/workflows/router/test_promote_p_to_pe.py +++ b/test/workflows/router/test_promote_p_to_pe.py @@ -24,7 +24,6 @@ def test_promote_p_to_pe_success( mock_kentik_client, mock_execute_playbook, router_subscription_factory, - data_config_filename, faker, ): """Test the successful promotion of a Nokia P router to a PE router.""" @@ -49,7 +48,7 @@ def test_promote_p_to_pe_success( @pytest.mark.workflow() -def test_promote_p_to_pe_juniper_router(router_subscription_factory, data_config_filename, faker): +def test_promote_p_to_pe_juniper_router(router_subscription_factory, faker): """Test that the workflow does not run for a Juniper P router since this workflow is only for Nokia routers.""" router_id = router_subscription_factory( vendor=Vendor.JUNIPER, router_role=RouterRole.P, status=SubscriptionLifecycle.ACTIVE @@ -64,9 +63,7 @@ def test_promote_p_to_pe_juniper_router(router_subscription_factory, data_config @pytest.mark.workflow() @patch("gso.services.lso_client._send_request") -def test_promote_p_to_pe_nokia_pe_router( - mock_execute_playbook, router_subscription_factory, data_config_filename, faker -): +def test_promote_p_to_pe_nokia_pe_router(mock_execute_playbook, router_subscription_factory, faker): """Test that the workflow does not run for a Nokia PE router since it is already a PE router.""" router_id = router_subscription_factory( vendor=Vendor.NOKIA, router_role=RouterRole.PE, status=SubscriptionLifecycle.ACTIVE diff --git a/test/workflows/router/test_terminate_router.py b/test/workflows/router/test_terminate_router.py index d2dca96f7492c67f2396fb8935801670798b43b4..ed9c565d87730bd783406ba803c950a9744c976e 100644 --- a/test/workflows/router/test_terminate_router.py +++ b/test/workflows/router/test_terminate_router.py @@ -26,7 +26,6 @@ def test_terminate_pe_router_full_success( update_ibgp_mesh, router_subscription_factory, faker, - data_config_filename, ): # Prepare mock values and expected results product_id = router_subscription_factory() @@ -80,7 +79,6 @@ def test_terminate_p_router_full_success( update_ibgp_mesh, router_subscription_factory, faker, - data_config_filename, ): # Prepare mock values and expected results product_id = router_subscription_factory(router_role=RouterRole.P) diff --git a/test/workflows/router/test_update_ibgp_mesh.py b/test/workflows/router/test_update_ibgp_mesh.py index 693bfd671160abe62d35a6340a08c8220c31a59f..055ad43aa3324e25a2cc4c0c3edf90f907187c9f 100644 --- a/test/workflows/router/test_update_ibgp_mesh.py +++ b/test/workflows/router/test_update_ibgp_mesh.py @@ -32,7 +32,6 @@ def test_update_ibgp_mesh_success( iptrunk_subscription_factory, iptrunk_side_subscription_factory, router_subscription_factory, - data_config_filename, faker, ): mock_librenms_device_exists.return_value = False @@ -71,7 +70,7 @@ def test_update_ibgp_mesh_success( @pytest.mark.parametrize("trunk_status", [SubscriptionLifecycle.INITIAL, SubscriptionLifecycle.TERMINATED]) @pytest.mark.workflow() -def test_update_ibgp_mesh_failure(iptrunk_subscription_factory, data_config_filename, trunk_status): +def test_update_ibgp_mesh_failure(iptrunk_subscription_factory, trunk_status): ip_trunk = Iptrunk.from_subscription(iptrunk_subscription_factory(status=trunk_status)) ibgp_mesh_input_form_data = { "subscription_id": ip_trunk.iptrunk.iptrunk_sides[0].iptrunk_side_node.owner_subscription_id @@ -83,7 +82,7 @@ def test_update_ibgp_mesh_failure(iptrunk_subscription_factory, data_config_file @pytest.mark.workflow() -def test_update_ibgp_mesh_isolated_router(router_subscription_factory, data_config_filename): +def test_update_ibgp_mesh_isolated_router(router_subscription_factory): router_id = router_subscription_factory(router_role=RouterRole.P) exception_message = "Selected router does not terminate any available IP trunks." diff --git a/test/workflows/site/test_import_site.py b/test/workflows/site/test_import_site.py index f706a4737b5c2747693529ab084646c47848d32e..df609aa27a86c12a06bc7655b8d2f93378dc9dfc 100644 --- a/test/workflows/site/test_import_site.py +++ b/test/workflows/site/test_import_site.py @@ -8,7 +8,7 @@ from test.workflows import assert_complete, run_workflow @pytest.mark.workflow() def test_import_site_success(site_subscription_factory): - imported_site = site_subscription_factory(is_imported=False) + imported_site = site_subscription_factory(is_imported=True) result, _, _ = run_workflow("import_site", [{"subscription_id": imported_site}]) subscription = Site.from_subscription(imported_site) diff --git a/test/workflows/super_pop_switch/test_import_super_pop_switch.py b/test/workflows/super_pop_switch/test_import_super_pop_switch.py index 2961b53988122eec82bcb4ccbde63f85acbfb864..80d4f37029f341cb5649af4a1e6a1e6e6df9bd8b 100644 --- a/test/workflows/super_pop_switch/test_import_super_pop_switch.py +++ b/test/workflows/super_pop_switch/test_import_super_pop_switch.py @@ -8,7 +8,7 @@ from test.workflows import assert_complete, run_workflow @pytest.mark.workflow() def test_import_super_pop_switch_success(super_pop_switch_subscription_factory): - imported_super_pop_switch = super_pop_switch_subscription_factory(is_imported=False) + imported_super_pop_switch = super_pop_switch_subscription_factory(is_imported=True) result, _, _ = run_workflow("import_super_pop_switch", [{"subscription_id": imported_super_pop_switch}]) subscription = SuperPopSwitch.from_subscription(imported_super_pop_switch) diff --git a/test/workflows/switch/test_import_switch.py b/test/workflows/switch/test_import_switch.py index 9bdd578d66553b9e5f06f09717aae2b35e633460..b35760573a94faa7d6f058035413a45aaf747e5f 100644 --- a/test/workflows/switch/test_import_switch.py +++ b/test/workflows/switch/test_import_switch.py @@ -8,7 +8,7 @@ from test.workflows import assert_complete, run_workflow @pytest.mark.workflow() def test_import_switch_success(switch_subscription_factory): - imported_switch = switch_subscription_factory(is_imported=False) + imported_switch = switch_subscription_factory(is_imported=True) result, _, _ = run_workflow("import_switch", [{"subscription_id": imported_switch}]) subscription = Switch.from_subscription(imported_switch) diff --git a/test/workflows/switch/test_validate_switch.py b/test/workflows/switch/test_validate_switch.py index f4e449b0ca84b439193cc55825e43c0bce1cbfb1..13e81c6f68459b6f4a8016f86069e8e57504251f 100644 --- a/test/workflows/switch/test_validate_switch.py +++ b/test/workflows/switch/test_validate_switch.py @@ -19,7 +19,6 @@ def test_validate_switch_success( mock_execute_playbook, switch_subscription_factory, faker, - data_config_filename, geant_partner, ): # Run workflow diff --git a/test/workflows/tasks/test_create_partners.py b/test/workflows/tasks/test_create_partners.py index 4a902aa42771f74ee6cebb4b8ba78c92a79b2253..5879a14141137ecf980ecc75a463d48f1a8873e7 100644 --- a/test/workflows/tasks/test_create_partners.py +++ b/test/workflows/tasks/test_create_partners.py @@ -20,8 +20,8 @@ def test_create_partner_success(): state = extract_state(result) partner = get_partner_by_name(state["name"]) - assert partner["name"] == "GEANT-TEST-CREATION" - assert partner["email"] == "goat-test-creation@geant.org" + assert partner.name == "GEANT-TEST-CREATION" + assert partner.email == "goat-test-creation@geant.org" @pytest.mark.workflow() diff --git a/test/workflows/vrf/test_create_vrf.py b/test/workflows/vrf/test_create_vrf.py index 5ec13598548d7d658605c0863bae7e04a5bdb460..cae26f1188bf397a7aabd8ce9dac8779544620ee 100644 --- a/test/workflows/vrf/test_create_vrf.py +++ b/test/workflows/vrf/test_create_vrf.py @@ -29,7 +29,6 @@ def vrf_input(faker): def test_create_vrf_success( vrf_input, faker, - data_config_filename, ): result, _, _ = run_workflow("create_vrf", vrf_input) assert_complete(result) @@ -48,7 +47,6 @@ def test_create_vrf_with_duplicate_vrf_name( vrf_input, faker, vrf_subscription_factory, - data_config_filename, ): vrf_subscription_factory(vrf_name=vrf_input[1]["vrf_name"]) with pytest.raises(FormValidationError, match="vrf_name must be unique."):