diff --git a/gso/cli/imports.py b/gso/cli/imports.py index d8f82908330a35f9602db3bf0fdbf3a58b741776..1fcb35105bcdc946d57019543dddfb025e96bad8 100644 --- a/gso/cli/imports.py +++ b/gso/cli/imports.py @@ -302,7 +302,7 @@ class L3CoreServiceImportModel(BaseModel): @field_validator("service_binding_ports") def validate_node(cls, value: list[ServiceBindingPort]) -> list[ServiceBindingPort]: """Check if the Service Binding Ports are valid.""" - edge_ports = [str(subscription["subscription_id"]) for subscription in get_active_edge_port_subscriptions()] + edge_ports = [str(subscription.subscription_id) for subscription in get_active_edge_port_subscriptions()] for sbp in value: if sbp.edge_port not in edge_ports: msg = f"Edge Port {sbp.edge_port} not found" diff --git a/gso/products/__init__.py b/gso/products/__init__.py index 9b2f12447ee440a504557fb6b4ada80aa5d47df9..7af2244220aa1b321ba57db01e2bcb1504a155c0 100644 --- a/gso/products/__init__.py +++ b/gso/products/__init__.py @@ -80,6 +80,10 @@ class ProductName(strEnum): """VRFs.""" +L3_CORE_SERVICE_PRODUCT_TYPE = L3CoreService.__name__ +L2_CORE_SERVICE_PRODUCT_TYPE = Layer2Circuit.__name__ + + class ProductType(strEnum): """An enumerator of available product types in GSO.""" @@ -102,19 +106,19 @@ class ProductType(strEnum): IMPORTED_OPENGEAR = Opengear.__name__ EDGE_PORT = EdgePort.__name__ IMPORTED_EDGE_PORT = ImportedEdgePort.__name__ - GEANT_IP = L3CoreService.__name__ + GEANT_IP = L3_CORE_SERVICE_PRODUCT_TYPE IMPORTED_GEANT_IP = ImportedL3CoreService.__name__ - IAS = L3CoreService.__name__ + IAS = L3_CORE_SERVICE_PRODUCT_TYPE IMPORTED_IAS = ImportedL3CoreService.__name__ - GWS = L3CoreService.__name__ + GWS = L3_CORE_SERVICE_PRODUCT_TYPE IMPORTED_GWS = ImportedL3CoreService.__name__ - LHCONE = L3CoreService.__name__ + LHCONE = L3_CORE_SERVICE_PRODUCT_TYPE IMPORTED_LHCONE = ImportedL3CoreService.__name__ - COPERNICUS = L3CoreService.__name__ + COPERNICUS = L3_CORE_SERVICE_PRODUCT_TYPE IMPORTED_COPERNICUS = ImportedL3CoreService.__name__ - GEANT_PLUS = Layer2Circuit.__name__ + GEANT_PLUS = L2_CORE_SERVICE_PRODUCT_TYPE IMPORTED_GEANT_PLUS = ImportedLayer2Circuit.__name__ - EXPRESSROUTE = Layer2Circuit.__name__ + EXPRESSROUTE = L2_CORE_SERVICE_PRODUCT_TYPE IMPORTED_EXPRESSROUTE = ImportedLayer2Circuit.__name__ VRF = VRF.__name__ diff --git a/gso/services/subscriptions.py b/gso/services/subscriptions.py index 55823e5a6ee3b7b1b9dcf32f0e2a9cce1d2aa52c..e83f7831ccaf1ad5928f94a866c3e086c65bf033 100644 --- a/gso/services/subscriptions.py +++ b/gso/services/subscriptions.py @@ -227,11 +227,6 @@ def get_subscription_by_process_id(process_id: str) -> SubscriptionModel | None: return SubscriptionModel.from_subscription(subscription_table.subscription_id) if subscription_table else None -def get_insync_subscriptions() -> list[SubscriptionTable]: - """Retrieve all subscriptions that are currently in sync.""" - return SubscriptionTable.query.join(ProductTable).filter(SubscriptionTable.insync.is_(True)).all() - - def get_active_insync_subscriptions() -> list[SubscriptionTable]: """Retrieve all subscriptions that are currently active and in sync.""" return ( @@ -255,19 +250,24 @@ def get_active_site_subscriptions(includes: list[str] | None = None) -> list[Sub ) -def get_active_edge_port_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]: +def get_active_edge_port_subscriptions(partner_id: UUIDstr | None = None) -> list[SubscriptionModel]: """Retrieve active Edge Port subscriptions. Args: - includes: The fields to be included in the returned Subscription objects. + partner_id: The customer id of subscriptions. Returns: A list of Subscription objects for Edge Ports. """ - return get_subscriptions( - product_types=[ProductType.EDGE_PORT], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=includes + results = get_subscriptions( + product_types=[ProductType.EDGE_PORT], + lifecycles=[SubscriptionLifecycle.ACTIVE], + includes=["subscription_id"], + partner_id=partner_id, ) + return [SubscriptionModel.from_subscription(result["subscription_id"]) for result in results] + def get_site_by_name(site_name: str) -> Site: """Get a site by its name. diff --git a/gso/utils/helpers.py b/gso/utils/helpers.py index 572bb4e2c16c7834d60edce73290d04548565cc5..ca7abb8eb582405e94244f5158a46e7d84b7b1e9 100644 --- a/gso/utils/helpers.py +++ b/gso/utils/helpers.py @@ -13,10 +13,17 @@ from pydantic_forms.validators import Choice from gso import settings from gso.products.product_blocks.router import RouterRole from gso.products.product_types.router import Router -from gso.services import subscriptions from gso.services.netbox_client import NetboxClient from gso.services.partners import get_all_partners -from gso.services.subscriptions import is_virtual_circuit_id_available +from gso.services.subscriptions import ( + get_active_edge_port_subscriptions, + get_active_router_subscriptions, + get_active_site_subscriptions, + get_active_subscriptions_by_field_and_value, + get_active_switch_subscriptions, + get_router_subscriptions, + is_virtual_circuit_id_available, +) from gso.utils.shared_enums import Vendor from gso.utils.types.interfaces import PhysicalPortCapacity from gso.utils.types.ip_address import IPv4AddressType, IPv4NetworkType, IPv6NetworkType @@ -175,8 +182,7 @@ def generate_inventory_for_routers( else [SubscriptionLifecycle.ACTIVE] ) all_routers = [ - Router.from_subscription(r["subscription_id"]) - for r in subscriptions.get_router_subscriptions(lifecycles=lifecycles) + Router.from_subscription(r["subscription_id"]) for r in get_router_subscriptions(lifecycles=lifecycles) ] exclude_routers = exclude_routers or [] @@ -219,7 +225,7 @@ def active_site_selector() -> Choice: """Generate a dropdown selector for choosing an active site in an input form.""" site_subscriptions = { str(site["subscription_id"]): site["description"] - for site in subscriptions.get_active_site_subscriptions(includes=["subscription_id", "description"]) + for site in get_active_site_subscriptions(includes=["subscription_id", "description"]) } return Choice("Select a site", zip(site_subscriptions.keys(), site_subscriptions.items(), strict=True)) # type: ignore[arg-type] @@ -229,17 +235,20 @@ def active_router_selector() -> Choice: """Generate a dropdown selector for choosing an active Router in an input form.""" router_subscriptions = { str(router["subscription_id"]): router["description"] - for router in subscriptions.get_active_router_subscriptions(includes=["subscription_id", "description"]) + for router in get_active_router_subscriptions(includes=["subscription_id", "description"]) } return Choice("Select a router", zip(router_subscriptions.keys(), router_subscriptions.items(), strict=True)) # type: ignore[arg-type] -def active_pe_router_selector() -> Choice: +def active_pe_router_selector(excludes: list[UUIDstr] | None = None) -> Choice: """Generate a dropdown selector for choosing an active PE Router in an input form.""" + excludes = excludes or [] + routers = { str(router.subscription_id): router.description - for router in subscriptions.get_active_subscriptions_by_field_and_value("router_role", RouterRole.PE) + for router in get_active_subscriptions_by_field_and_value("router_role", RouterRole.PE) + if router.subscription_id not in excludes } return Choice("Select a router", zip(routers.keys(), routers.items(), strict=True)) # type: ignore[arg-type] @@ -249,7 +258,7 @@ def active_switch_selector() -> Choice: """Generate a dropdown selector for choosing an active Switch in an input form.""" switch_subscriptions = { str(switch["subscription_id"]): switch["description"] - for switch in subscriptions.get_active_switch_subscriptions(includes=["subscription_id", "description"]) + for switch in get_active_switch_subscriptions(includes=["subscription_id", "description"]) } return Choice("Select a switch", zip(switch_subscriptions.keys(), switch_subscriptions.items(), strict=True)) # type: ignore[arg-type] @@ -257,21 +266,13 @@ def active_switch_selector() -> Choice: def active_edge_port_selector(*, partner_id: UUIDstr | None = None) -> Choice: """Generate a dropdown selector for choosing an active Edge Port in an input form.""" - edge_port_subscriptions = subscriptions.get_active_edge_port_subscriptions( - includes=["subscription_id", "description", "customer_id"] - ) - - if partner_id: - # `partner_id` is set, so we will filter accordingly. - edge_port_subscriptions = list( - filter(lambda subscription: bool(subscription["customer_id"] == partner_id), edge_port_subscriptions) - ) + edge_ports = get_active_edge_port_subscriptions(partner_id=partner_id) - edge_ports = {str(port["subscription_id"]): port["description"] for port in edge_port_subscriptions} + options = {str(edge_port.subscription_id): edge_port.description for edge_port in edge_ports} return Choice( "Select an Edge Port", - zip(edge_ports.keys(), edge_ports.items(), strict=True), # type: ignore[arg-type] + zip(options.keys(), options.items(), strict=True), # type: ignore[arg-type] ) diff --git a/gso/workflows/l3_core_service/migrate_l3_core_service.py b/gso/workflows/l3_core_service/migrate_l3_core_service.py index bf153e337d8421d4d0b77533586f2bb18571191a..2aa87f02e35bc2a6a94f10e7b8efcd56c81dfbfe 100644 --- a/gso/workflows/l3_core_service/migrate_l3_core_service.py +++ b/gso/workflows/l3_core_service/migrate_l3_core_service.py @@ -29,12 +29,12 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: existing_ep_name_list = [ap.sbp.edge_port.owner_subscription_id for ap in subscription.l3_core_service.ap_list] edge_port_subscriptions = list( filter( - lambda ep: bool(ep["customer_id"] == pid) and ep["subscription_id"] not in existing_ep_name_list, - get_active_edge_port_subscriptions(includes=["subscription_id", "description", "customer_id"]), + lambda ep: bool(ep.customer_id == pid) and ep.subscription_id not in existing_ep_name_list, + get_active_edge_port_subscriptions(), ) ) - edge_ports = {str(port["subscription_id"]): port["description"] for port in edge_port_subscriptions} + edge_ports = {str(port.subscription_id): port.description for port in edge_port_subscriptions} return Choice( "Select an Edge Port", diff --git a/test/conftest.py b/test/conftest.py index 4548fe0fadd99957fee38e96fc0874402063f048..38fa426f64bd4365511221b3c679ff6c1cd31181 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -32,7 +32,6 @@ from sqlalchemy.orm import scoped_session, sessionmaker from starlette.testclient import TestClient from urllib3_mock import Responses -from gso.main import init_gso_app from gso.services.partners import PartnerSchema, create_partner from gso.utils.types.interfaces import LAGMember, LAGMemberList from test.fixtures import * # noqa: F403 @@ -49,6 +48,17 @@ class UseJuniperSide(strEnum): SIDE_BOTH = "side_both" +def pytest_configure(config): + """Set an environment variable before loading any test modules.""" + config_filename = "gso/oss-params-example.json" + os.environ["OSS_PARAMS_FILENAME"] = config_filename + + +def pytest_unconfigure(config): + """Clean up the environment variable after all tests.""" + os.environ.pop("OSS_PARAMS_FILENAME", None) + + class FakerProvider(BaseProvider): def ipv4_network(self, *, min_subnet=1, max_subnet=32) -> ipaddress.IPv4Network: subnet = str(self.generator.random_int(min=min_subnet, max=max_subnet)) @@ -146,16 +156,6 @@ def faker() -> Faker: return fake -@pytest.fixture(scope="session", autouse=True) -def data_config_filename() -> str: - """Set an environment variable to the path of the example OSS parameters file.""" - config_filename = "gso/oss-params-example.json" - - os.environ["OSS_PARAMS_FILENAME"] = config_filename - yield config_filename - del os.environ["OSS_PARAMS_FILENAME"] - - @pytest.fixture(scope="session") def db_uri(): """Provide a unique database URI for each pytest-xdist worker, or a default URI if running without xdist.""" @@ -281,10 +281,13 @@ def fastapi_app(_database, db_uri): This implementation is as close as possible to the one present in orchestrator-core. """ + from gso.main import app + oauth2lib_settings.OAUTH2_ACTIVE = False oauth2lib_settings.ENVIRONMENT_IGNORE_MUTATION_DISABLED = ["local", "TESTING"] app_settings.DATABASE_URI = db_uri - return init_gso_app() + + return app @pytest.fixture(scope="session") diff --git a/test/fixtures/edge_port_fixtures.py b/test/fixtures/edge_port_fixtures.py index 90a30c672d38fa295b29d543a8a50f0342b05d23..6beb81817855be8db10d4292fdb303be849ab43d 100644 --- a/test/fixtures/edge_port_fixtures.py +++ b/test/fixtures/edge_port_fixtures.py @@ -37,24 +37,25 @@ def edge_port_subscription_factory(faker, partner_factory, router_subscription_f *, enable_lacp=True, ignore_if_down=False, - is_imported=True, + is_imported=False, ) -> UUIDstr: partner = partner or partner_factory() - node = Router.from_subscription(router_subscription_factory(vendor=Vendor.NOKIA)).router + node = node or Router.from_subscription(router_subscription_factory(vendor=Vendor.NOKIA)).router + if is_imported: - product_id = subscriptions.get_product_id_by_name(ProductName.EDGE_PORT) - edge_port_subscription = EdgePortInactive.from_product_id( + product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_EDGE_PORT) + edge_port_subscription = ImportedEdgePortInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) else: - product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_EDGE_PORT) - edge_port_subscription = ImportedEdgePortInactive.from_product_id( + product_id = subscriptions.get_product_id_by_name(ProductName.EDGE_PORT) + edge_port_subscription = EdgePortInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) edge_port_subscription.edge_port.edge_port_description = description or faker.text(max_nb_chars=30) edge_port_subscription.edge_port.ga_id = ga_id or faker.ga_id() - edge_port_subscription.edge_port.node = node or node + edge_port_subscription.edge_port.node = node edge_port_subscription.edge_port.edge_port_name = name or f"lag-{faker.pyint(21, 50)}" edge_port_subscription.edge_port.edge_port_description = edge_port_description or faker.sentence() edge_port_subscription.edge_port.enable_lacp = enable_lacp diff --git a/test/fixtures/iptrunk_fixtures.py b/test/fixtures/iptrunk_fixtures.py index 742e974a98001ee4908f08b6502d7d5ca9ae8c20..09fe95827e71a4188dfb5fb3d36b362fa2f7f783 100644 --- a/test/fixtures/iptrunk_fixtures.py +++ b/test/fixtures/iptrunk_fixtures.py @@ -67,19 +67,19 @@ def iptrunk_subscription_factory(iptrunk_side_subscription_factory, faker, geant status: SubscriptionLifecycle | None = None, partner: dict | None = None, *, - is_imported: bool | None = True, + is_imported: bool | None = False, ) -> UUIDstr: if partner is None: partner = geant_partner if is_imported: - product_id = subscriptions.get_product_id_by_name(ProductName.IP_TRUNK) - iptrunk_subscription = IptrunkInactive.from_product_id( + product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_IP_TRUNK) + iptrunk_subscription = ImportedIptrunkInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) else: - product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_IP_TRUNK) - iptrunk_subscription = ImportedIptrunkInactive.from_product_id( + product_id = subscriptions.get_product_id_by_name(ProductName.IP_TRUNK) + iptrunk_subscription = IptrunkInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) diff --git a/test/fixtures/l3_core_service_fixtures.py b/test/fixtures/l3_core_service_fixtures.py index 21ea6379595e8860d95e35508ec1b6a6e3cd7721..c810d6412a03eaaeda1a44f8cd1c7093a3247942 100644 --- a/test/fixtures/l3_core_service_fixtures.py +++ b/test/fixtures/l3_core_service_fixtures.py @@ -127,11 +127,12 @@ def access_port_factory(faker, service_binding_port_factory): def create_access_port( ap_type: APType | None = None, service_binding_port: ServiceBindingPort | None = None, + edge_port: EdgePort | None = None, ): return AccessPort.new( subscription_id=uuid4(), ap_type=ap_type or random.choice(list(APType)), # noqa: S311 - sbp=service_binding_port or service_binding_port_factory(), + sbp=service_binding_port or service_binding_port_factory(edge_port=edge_port), ) return create_access_port @@ -150,6 +151,7 @@ def l3_core_service_subscription_factory( ap_list: list[AccessPort] | None = None, start_date="2023-05-24T00:00:00+00:00", status: SubscriptionLifecycle | None = None, + edge_port: EdgePort | None = None, ) -> UUIDstr: partner = partner or partner_factory() match l3_core_service_type: @@ -209,8 +211,8 @@ def l3_core_service_subscription_factory( # Default ap_list creation with primary and backup access ports l3_core_service_subscription.l3_core_service.ap_list = ap_list or [ - access_port_factory(ap_type=APType.PRIMARY), - access_port_factory(ap_type=APType.BACKUP), + access_port_factory(ap_type=APType.PRIMARY, edge_port=edge_port), + access_port_factory(ap_type=APType.BACKUP, edge_port=edge_port), ] # Update subscription with description, start date, and status diff --git a/test/fixtures/lan_switch_interconnect_fixtures.py b/test/fixtures/lan_switch_interconnect_fixtures.py index b98d800d1fa68f5b2e22b8d75c651ceb7dc9adfb..e2cb053263cc153bc57a9139041754c8bf648d2c 100644 --- a/test/fixtures/lan_switch_interconnect_fixtures.py +++ b/test/fixtures/lan_switch_interconnect_fixtures.py @@ -45,16 +45,16 @@ def lan_switch_interconnect_subscription_factory( switch_side_ae_iface: str | None = None, switch_side_ae_members: list[dict[str, str]] | None = None, *, - is_imported: bool | None = True, + is_imported: bool | None = False, ) -> UUIDstr: if partner is None: partner = geant_partner if is_imported: - product_id = get_product_id_by_name(ProductName.LAN_SWITCH_INTERCONNECT) - subscription = LanSwitchInterconnectInactive.from_product_id(product_id, partner["partner_id"]) - else: product_id = get_product_id_by_name(ProductName.IMPORTED_LAN_SWITCH_INTERCONNECT) subscription = ImportedLanSwitchInterconnectInactive.from_product_id(product_id, partner["partner_id"]) + else: + product_id = get_product_id_by_name(ProductName.LAN_SWITCH_INTERCONNECT) + subscription = LanSwitchInterconnectInactive.from_product_id(product_id, partner["partner_id"]) router_side_ae_members = router_side_ae_members or [ LanSwitchInterconnectInterfaceBlockInactive.new( diff --git a/test/fixtures/office_router_fixtures.py b/test/fixtures/office_router_fixtures.py index 0c55ada08ae12b730a491f15eedd42af15e23b6d..070e3c9f517836fbeb81d1a0bda59525d6da25d7 100644 --- a/test/fixtures/office_router_fixtures.py +++ b/test/fixtures/office_router_fixtures.py @@ -25,7 +25,7 @@ def office_router_subscription_factory(site_subscription_factory, faker, geant_p status: SubscriptionLifecycle | None = None, partner: dict | None = None, *, - is_imported: bool | None = True, + is_imported: bool | None = False, ) -> UUIDstr: if partner is None: partner = geant_partner @@ -38,13 +38,13 @@ def office_router_subscription_factory(site_subscription_factory, faker, geant_p office_router_site = office_router_site or site_subscription_factory() if is_imported: - product_id = subscriptions.get_product_id_by_name(ProductName.OFFICE_ROUTER) - office_router_subscription = OfficeRouterInactive.from_product_id( + product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_OFFICE_ROUTER) + office_router_subscription = ImportedOfficeRouterInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) else: - product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_OFFICE_ROUTER) - office_router_subscription = ImportedOfficeRouterInactive.from_product_id( + product_id = subscriptions.get_product_id_by_name(ProductName.OFFICE_ROUTER) + office_router_subscription = OfficeRouterInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) diff --git a/test/fixtures/opengear_fixtures.py b/test/fixtures/opengear_fixtures.py index b6dbff50f0fdc8c786caadddccade778311a83e1..93e0b7c6f841d14df22ce9df58843864d9df9a31 100644 --- a/test/fixtures/opengear_fixtures.py +++ b/test/fixtures/opengear_fixtures.py @@ -22,7 +22,7 @@ def opengear_subscription_factory(site_subscription_factory, faker, geant_partne status: SubscriptionLifecycle | None = None, partner: dict | None = None, *, - is_imported: bool | None = True, + is_imported: bool | None = False, ) -> UUIDstr: if partner is None: partner = geant_partner @@ -35,13 +35,13 @@ def opengear_subscription_factory(site_subscription_factory, faker, geant_partne opengear_wan_gateway = opengear_wan_gateway or faker.ipv4() if is_imported: - product_id = subscriptions.get_product_id_by_name(ProductName.OPENGEAR) - opengear_subscription = OpengearInactive.from_product_id( + product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_OPENGEAR) + opengear_subscription = ImportedOpengearInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) else: - product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_OPENGEAR) - opengear_subscription = ImportedOpengearInactive.from_product_id( + product_id = subscriptions.get_product_id_by_name(ProductName.OPENGEAR) + opengear_subscription = OpengearInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) diff --git a/test/fixtures/router_fixtures.py b/test/fixtures/router_fixtures.py index 04c77cede9d10f0d573e217983bd3a1462281d3e..1ae14dcc4cc420e187a3fc7efbb59d62efef3d5a 100644 --- a/test/fixtures/router_fixtures.py +++ b/test/fixtures/router_fixtures.py @@ -32,18 +32,18 @@ def router_subscription_factory(site_subscription_factory, faker, geant_partner) vendor: Vendor | None = Vendor.NOKIA, *, router_access_via_ts: bool | None = None, - is_imported: bool | None = True, + is_imported: bool | None = False, ) -> UUIDstr: if partner is None: partner = geant_partner if is_imported: - product_id = subscriptions.get_product_id_by_name(ProductName.ROUTER) - router_subscription = RouterInactive.from_product_id( + product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_ROUTER) + router_subscription = ImportedRouterInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) else: - product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_ROUTER) - router_subscription = ImportedRouterInactive.from_product_id( + product_id = subscriptions.get_product_id_by_name(ProductName.ROUTER) + router_subscription = RouterInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) diff --git a/test/fixtures/site_fixtures.py b/test/fixtures/site_fixtures.py index 2467bd89067602cfd8b99270b81a4a350efe93e5..c97c11b38528ec1dcd3c43410d6a62e5bf52cedb 100644 --- a/test/fixtures/site_fixtures.py +++ b/test/fixtures/site_fixtures.py @@ -30,20 +30,20 @@ def site_subscription_factory(faker, geant_partner): partner: dict | None = None, start_date="2023-05-24T00:00:00+00:00", *, - is_imported: bool | None = True, + is_imported: bool | None = False, site_contains_optical_equipment: bool | None = True, ) -> UUIDstr: if partner is None: partner = geant_partner if is_imported: - product_id = subscriptions.get_product_id_by_name(ProductName.SITE) - site_subscription = SiteInactive.from_product_id(product_id, customer_id=partner["partner_id"], insync=True) - else: product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_SITE) site_subscription = ImportedSiteInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) + else: + product_id = subscriptions.get_product_id_by_name(ProductName.SITE) + site_subscription = SiteInactive.from_product_id(product_id, customer_id=partner["partner_id"], insync=True) site_subscription.site.site_city = site_city or faker.city() site_subscription.site.site_name = site_name or faker.site_name() diff --git a/test/fixtures/super_pop_switch_fixtures.py b/test/fixtures/super_pop_switch_fixtures.py index 5350e2a70b8eead32521ee16e5de095f24585133..464c0360a910f008e732e14bf477e382e66b56c6 100644 --- a/test/fixtures/super_pop_switch_fixtures.py +++ b/test/fixtures/super_pop_switch_fixtures.py @@ -24,7 +24,7 @@ def super_pop_switch_subscription_factory(site_subscription_factory, faker, gean status: SubscriptionLifecycle | None = None, partner: dict | None = None, *, - is_imported: bool | None = True, + is_imported: bool | None = False, ) -> UUIDstr: if partner is None: partner = geant_partner @@ -36,13 +36,13 @@ def super_pop_switch_subscription_factory(site_subscription_factory, faker, gean super_pop_switch_site = super_pop_switch_site or site_subscription_factory() if is_imported: - product_id = subscriptions.get_product_id_by_name(ProductName.SUPER_POP_SWITCH) - super_pop_switch_subscription = SuperPopSwitchInactive.from_product_id( + product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_SUPER_POP_SWITCH) + super_pop_switch_subscription = ImportedSuperPopSwitchInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) else: - product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_SUPER_POP_SWITCH) - super_pop_switch_subscription = ImportedSuperPopSwitchInactive.from_product_id( + product_id = subscriptions.get_product_id_by_name(ProductName.SUPER_POP_SWITCH) + super_pop_switch_subscription = SuperPopSwitchInactive.from_product_id( product_id, customer_id=partner["partner_id"], insync=True ) diff --git a/test/fixtures/switch_fixtures.py b/test/fixtures/switch_fixtures.py index c0167a1fbc684f121be38b543fcdc1e9a9bd607d..1c42b54caae75d76042b7b311a11ab522806d064 100644 --- a/test/fixtures/switch_fixtures.py +++ b/test/fixtures/switch_fixtures.py @@ -26,17 +26,18 @@ def switch_subscription_factory(faker, geant_partner, site_subscription_factory) switch_model: SwitchModel | None = None, status: SubscriptionLifecycle | None = None, *, - is_imported: bool = True, + is_imported: bool = False, ) -> UUIDstr: if partner is None: partner = geant_partner if is_imported: - product_id = get_product_id_by_name(ProductName.SWITCH) - switch_subscription = SwitchInactive.from_product_id(product_id, partner["partner_id"]) - else: product_id = get_product_id_by_name(ProductName.IMPORTED_SWITCH) switch_subscription = ImportedSwitchInactive.from_product_id(product_id, partner["partner_id"]) + else: + product_id = get_product_id_by_name(ProductName.SWITCH) + switch_subscription = SwitchInactive.from_product_id(product_id, partner["partner_id"]) + switch_subscription.switch.fqdn = fqdn or faker.domain_name(levels=4) switch_subscription.switch.ts_port = ts_port or faker.port_number(is_user=True) switch_subscription.switch.site = site or Site.from_subscription(site_subscription_factory()).site diff --git a/test/services/test_infoblox.py b/test/services/test_infoblox.py index 3a7332606ed6314c104fd281a1c8c03bfcd18a91..594eae35ad256c1938fb05a6c5a4144df214c81d 100644 --- a/test/services/test_infoblox.py +++ b/test/services/test_infoblox.py @@ -1,7 +1,6 @@ import ipaddress import logging import re -from os import PathLike import pytest import responses @@ -151,7 +150,7 @@ def _set_up_host_responses(): @responses.activate -def test_allocate_networks(data_config_filename: PathLike): +def test_allocate_networks(): _set_up_network_responses() new_v4_network = infoblox.allocate_v4_network("TRUNK") @@ -162,7 +161,7 @@ def test_allocate_networks(data_config_filename: PathLike): @responses.activate -def test_allocate_bad_network(data_config_filename: PathLike): +def test_allocate_bad_network(): _set_up_network_responses() with pytest.raises(AllocationError) as e: @@ -175,7 +174,7 @@ def test_allocate_bad_network(data_config_filename: PathLike): @responses.activate -def test_allocate_good_host(data_config_filename: PathLike): +def test_allocate_good_host(): _set_up_host_responses() new_host = infoblox.allocate_host("test.lo.geant.net", "LO", [], "test host") assert new_host == ( @@ -185,7 +184,7 @@ def test_allocate_good_host(data_config_filename: PathLike): @responses.activate -def test_allocate_bad_host(data_config_filename: PathLike): +def test_allocate_bad_host(): _set_up_host_responses() with pytest.raises(AllocationError) as e: infoblox.allocate_host("broken", "TRUNK", [], "Unavailable host") @@ -193,7 +192,7 @@ def test_allocate_bad_host(data_config_filename: PathLike): @responses.activate -def test_delete_good_network(data_config_filename: PathLike): +def test_delete_good_network(): responses.add( method=responses.GET, url="https://10.0.0.1/wapi/v2.12/network?network=10.255.255.0%2F26&_return_fields=comment%2Cextattrs%2Cnetwork%" @@ -217,7 +216,7 @@ def test_delete_good_network(data_config_filename: PathLike): @responses.activate -def test_delete_non_existent_network(data_config_filename: PathLike, caplog): +def test_delete_non_existent_network(caplog): responses.add( method=responses.GET, url="https://10.0.0.1/wapi/v2.12/network?network=10.255.255.0%2F26&_return_fields=comment%2Cextattrs%2Cnetwork%" @@ -230,7 +229,7 @@ def test_delete_non_existent_network(data_config_filename: PathLike, caplog): @responses.activate -def test_delete_good_host(data_config_filename: PathLike): +def test_delete_good_host(): responses.add( method=responses.GET, url=re.compile( @@ -277,7 +276,7 @@ def test_delete_good_host(data_config_filename: PathLike): @responses.activate -def test_delete_bad_host(data_config_filename: PathLike, caplog): +def test_delete_bad_host(caplog): responses.add( method=responses.GET, url=re.compile(r".+"), diff --git a/test/services/test_netbox_client.py b/test/services/test_netbox_client.py index 55b82af17c7e9b67b140ed66d51e88905ec14b3a..cce1910ad0264db692d55e4ea635c9b78a3de1e1 100644 --- a/test/services/test_netbox_client.py +++ b/test/services/test_netbox_client.py @@ -1,7 +1,6 @@ """Unit tests for testing the netbox client.""" import uuid -from os import PathLike from unittest.mock import Mock, patch import pytest @@ -83,7 +82,6 @@ def test_create_device( site, device_bay, card_type, - data_config_filename: PathLike, ): device_name = "mx1.lab.geant.net" device.name = device_name @@ -105,7 +103,7 @@ def test_create_device( @patch("gso.services.netbox_client.Router.from_subscription") @patch("gso.services.netbox_client.pynetbox.api") -def test_get_available_lags(mock_api, mock_from_subscription, data_config_filename: PathLike): +def test_get_available_lags(mock_api, mock_from_subscription): router_id = uuid.uuid4() feasible_lags = [f"lag-{i}" for i in range(1, 11)] @@ -127,7 +125,7 @@ def test_get_available_lags(mock_api, mock_from_subscription, data_config_filena @patch("gso.services.netbox_client.pynetbox.api") -def test_create_interface(mock_api, device, interface, data_config_filename: PathLike): +def test_create_interface(mock_api, device, interface): # Mock netbox calls mock_api.return_value.dcim.devices.get.return_value = device mock_api.return_value.dcim.interfaces.create.return_value = interface @@ -141,7 +139,7 @@ def test_create_interface(mock_api, device, interface, data_config_filename: Pat @patch("gso.services.netbox_client.pynetbox.api") -def test_reserve_interface_exception(mock_api, device, interface, data_config_filename: PathLike): +def test_reserve_interface_exception(mock_api, device, interface): """Test for checking if interface is reserved. If the interface is already reserved @@ -163,7 +161,7 @@ def test_reserve_interface_exception(mock_api, device, interface, data_config_fi @patch("gso.services.netbox_client.pynetbox.api") -def test_reserve_interface(mock_api, device, interface, data_config_filename: PathLike): +def test_reserve_interface(mock_api, device, interface): """Test a normal reservation of a interface.""" # Set interface to not reserved interface.enabled = False @@ -186,7 +184,7 @@ def test_reserve_interface(mock_api, device, interface, data_config_filename: Pa @patch("gso.services.netbox_client.pynetbox.api") -def test_allocate_interface_exception(mock_api, device, interface, data_config_filename: PathLike): +def test_allocate_interface_exception(mock_api, device, interface): """Test to check exception during allocation. If the interface is already allocated @@ -211,7 +209,7 @@ def test_allocate_interface_exception(mock_api, device, interface, data_config_f @patch("gso.services.netbox_client.pynetbox.api") -def test_allocation_interface(mock_api, device, interface, data_config_filename: PathLike): +def test_allocation_interface(mock_api, device, interface): """Test a normal allocation of a interface.""" # Set interface to not allocated interface.mark_connected = False @@ -234,7 +232,7 @@ def test_allocation_interface(mock_api, device, interface, data_config_filename: @patch("gso.services.netbox_client.pynetbox.api") -def test_delete_device(mock_api, device, data_config_filename: PathLike): +def test_delete_device(mock_api, device): """Test a delete of a device.""" # Mock netbox api mock_api.return_value.dcim.devices.get.return_value = device @@ -250,7 +248,7 @@ def test_delete_device(mock_api, device, data_config_filename: PathLike): @patch("gso.services.netbox_client.pynetbox.api") -def test_get_interfaces_by_device(mock_api, device, interface, data_config_filename: PathLike): +def test_get_interfaces_by_device(mock_api, device, interface): """Test if a interface is returned for a device.""" # Setup interface speed speed = "1000" @@ -267,7 +265,7 @@ def test_get_interfaces_by_device(mock_api, device, interface, data_config_filen @patch("gso.services.netbox_client.pynetbox.api") -def test_attach_interface_to_lag(mock_api, device, interface, lag, data_config_filename: PathLike): +def test_attach_interface_to_lag(mock_api, device, interface, lag): """Test if a interface is attached correctly to a LAG interface.""" # Define site effect function diff --git a/test/workflows/__init__.py b/test/workflows/__init__.py index 0607621d10c0de9066b244ba256b69cabed73104..d01dde086f1d5f2d3d95d2b63c4540547c94b412 100644 --- a/test/workflows/__init__.py +++ b/test/workflows/__init__.py @@ -272,14 +272,6 @@ def resume_workflow( return result, step_log -def user_accept_and_assert_suspended(process_stat, step_log, extra_data=None): - extra_data = extra_data or {} - result, step_log = resume_workflow(process_stat, step_log, extra_data) - assert_suspended(result) - - return result, step_log - - def assert_lso_success(result: Process, process_stat: ProcessStat, step_log: list): """Assert a successful LSO execution in a workflow.""" assert_awaiting_callback(result) diff --git a/test/workflows/edge_port/test_create_edge_port.py b/test/workflows/edge_port/test_create_edge_port.py index 90cf117b15bca2958308dd7bec3807592782673e..9690b587f62f574b701646e5a2de92b76bc52033 100644 --- a/test/workflows/edge_port/test_create_edge_port.py +++ b/test/workflows/edge_port/test_create_edge_port.py @@ -1,4 +1,3 @@ -from os import PathLike from unittest.mock import patch import pytest @@ -86,7 +85,6 @@ def test_successful_edge_port_creation( input_form_wizard_data, faker, _netbox_client_mock, # noqa: PT019 - data_config_filename: PathLike, test_client, ): product_id = get_product_id_by_name(ProductName.EDGE_PORT) @@ -117,7 +115,6 @@ def test_successful_edge_port_creation_with_auto_ga_id_creation( input_form_wizard_data, faker, _netbox_client_mock, # noqa: PT019 - data_config_filename: PathLike, test_client, ): product_id = get_product_id_by_name(ProductName.EDGE_PORT) @@ -144,7 +141,6 @@ def test_edge_port_creation_with_invalid_input( input_form_wizard_data, faker, _netbox_client_mock, # noqa: PT019 - data_config_filename: PathLike, test_client, ): product_id = get_product_id_by_name(ProductName.EDGE_PORT) diff --git a/test/workflows/edge_port/test_import_edge_port.py b/test/workflows/edge_port/test_import_edge_port.py index 7fb1fcbdf851c9fb7b73aa30933e508c679b8743..d417a59bccf7727fe1d49a31f959833fd3d683e3 100644 --- a/test/workflows/edge_port/test_import_edge_port.py +++ b/test/workflows/edge_port/test_import_edge_port.py @@ -8,7 +8,7 @@ from test.workflows import assert_complete, run_workflow @pytest.mark.workflow() def test_import_edge_port_success(edge_port_subscription_factory): - imported_edge_port = edge_port_subscription_factory(is_imported=False) + imported_edge_port = edge_port_subscription_factory(is_imported=True) result, _, _ = run_workflow("import_edge_port", [{"subscription_id": imported_edge_port}]) subscription = EdgePort.from_subscription(imported_edge_port) diff --git a/test/workflows/edge_port/test_modify_edge_port.py b/test/workflows/edge_port/test_modify_edge_port.py index 227cde2380bb5a9aaa0fc112a55660cdb83bf575..d62598b64e717795cc2746ab39f1e6b052802347 100644 --- a/test/workflows/edge_port/test_modify_edge_port.py +++ b/test/workflows/edge_port/test_modify_edge_port.py @@ -75,7 +75,6 @@ def test_modify_edge_port_with_changing_capacity( mocked_execute_playbook, input_form_wizard_data, faker, - data_config_filename, ): # Set up mock return values mocked_netbox = MockedNetboxClient() @@ -152,7 +151,6 @@ def test_modify_edge_port_without_changing_capacity( mocked_execute_playbook, input_form_wizard_without_changing_capacity, faker, - data_config_filename, ): # Set up mock return values mocked_netbox = MockedNetboxClient() diff --git a/test/workflows/edge_port/test_terminate_edge_port.py b/test/workflows/edge_port/test_terminate_edge_port.py index 9af27ab4195c0d7bccb544eab432b4e5a216dcc7..6ae7967726af992c8f8c2819b2bf99d51c56c4ea 100644 --- a/test/workflows/edge_port/test_terminate_edge_port.py +++ b/test/workflows/edge_port/test_terminate_edge_port.py @@ -22,7 +22,6 @@ def test_successful_edge_port_termination( mock_execute_playbook, edge_port_subscription_factory, faker, - data_config_filename, ): # Set up mock return values subscription_id = edge_port_subscription_factory() diff --git a/test/workflows/edge_port/test_validate_edge_port.py b/test/workflows/edge_port/test_validate_edge_port.py index 37f9e9dd8522932f1e4c7989a42413b0ca66c6be..94494da13d0828184a9898ff0d7cfc9b5f4b8ee7 100644 --- a/test/workflows/edge_port/test_validate_edge_port.py +++ b/test/workflows/edge_port/test_validate_edge_port.py @@ -20,7 +20,6 @@ def test_validate_edge_port_success( mock_execute_playbook, edge_port_subscription_factory, faker, - data_config_filename, ): subscription_id = edge_port_subscription_factory() mock_get_interface_by_name_and_device.side_effect = [ diff --git a/test/workflows/iptrunk/test_create_iptrunk.py b/test/workflows/iptrunk/test_create_iptrunk.py index d0f9cfa539b3b52600dff506931c3cd0699250ce..f6b0092752c43a017dac42e5727e462b58bea6fd 100644 --- a/test/workflows/iptrunk/test_create_iptrunk.py +++ b/test/workflows/iptrunk/test_create_iptrunk.py @@ -1,4 +1,3 @@ -from os import PathLike from unittest.mock import patch import pytest @@ -119,7 +118,6 @@ def test_successful_iptrunk_creation_with_standard_lso_result( input_form_wizard_data, faker, _netbox_client_mock, # noqa: PT019 - data_config_filename: PathLike, test_client, ): mock_create_host.return_value = None @@ -174,7 +172,6 @@ def test_iptrunk_creation_fails_when_lso_return_code_is_one( input_form_wizard_data, faker, _netbox_client_mock, # noqa: PT019 - data_config_filename: PathLike, ): mock_allocate_v4_network.return_value = faker.ipv4_network(min_subnet=31, max_subnet=31) mock_allocate_v6_network.return_value = faker.ipv6_network(min_subnet=126, max_subnet=126) @@ -213,7 +210,6 @@ def test_successful_iptrunk_creation_with_juniper_interface_names( mock_execute_playbook, input_form_wizard_data, faker, - data_config_filename: PathLike, _netbox_client_mock, # noqa: PT019 test_client, ): diff --git a/test/workflows/iptrunk/test_import_iptrunk.py b/test/workflows/iptrunk/test_import_iptrunk.py index 52a52c329dbad987d3b360315522d0bda827354c..eeca3e310cd8c9be1f1980a33c035dc8d465b0f7 100644 --- a/test/workflows/iptrunk/test_import_iptrunk.py +++ b/test/workflows/iptrunk/test_import_iptrunk.py @@ -8,7 +8,7 @@ from test.workflows import assert_complete, run_workflow @pytest.mark.workflow() def test_import_iptrunk_success(iptrunk_subscription_factory): - imported_iptrunk = iptrunk_subscription_factory(is_imported=False) + imported_iptrunk = iptrunk_subscription_factory(is_imported=True) result, _, _ = run_workflow("import_iptrunk", [{"subscription_id": imported_iptrunk}]) subscription = Iptrunk.from_subscription(imported_iptrunk) diff --git a/test/workflows/iptrunk/test_migrate_iptrunk.py b/test/workflows/iptrunk/test_migrate_iptrunk.py index 078942a1e9a9c347f8af5eb3cd773dfc6932436d..2ed47dcc19473f818ffa6e6525a4db40018e26cd 100644 --- a/test/workflows/iptrunk/test_migrate_iptrunk.py +++ b/test/workflows/iptrunk/test_migrate_iptrunk.py @@ -1,4 +1,3 @@ -from os import PathLike from unittest.mock import patch import pytest @@ -138,7 +137,6 @@ def test_migrate_iptrunk_success( # noqa: PLR0915 mock_create_host_by_ip, migrate_form_input, restore_isis_metric, - data_config_filename: PathLike, ): # Set up mock return values mocked_netbox = MockedNetboxClient() diff --git a/test/workflows/iptrunk/test_modify_trunk_interface.py b/test/workflows/iptrunk/test_modify_trunk_interface.py index d8a96588a3bc6182fa498b2b50a8e849c646b483..2f77576dae1a9b712dd5a89f8aa2124d99ae9a4a 100644 --- a/test/workflows/iptrunk/test_modify_trunk_interface.py +++ b/test/workflows/iptrunk/test_modify_trunk_interface.py @@ -108,7 +108,6 @@ def test_iptrunk_modify_trunk_interface_success( mock_provision_ip_trunk, input_form_iptrunk_data, faker, - data_config_filename, ): # Set up mock return values mocked_netbox = MockedNetboxClient() diff --git a/test/workflows/iptrunk/test_terminate_iptrunk.py b/test/workflows/iptrunk/test_terminate_iptrunk.py index 83e0324a0f1baa661dbd473bf11d179d17588f95..9e1ab25e551d2e5c9aa4154b55456544aae2bbf2 100644 --- a/test/workflows/iptrunk/test_terminate_iptrunk.py +++ b/test/workflows/iptrunk/test_terminate_iptrunk.py @@ -26,7 +26,6 @@ def test_successful_iptrunk_termination( mock_execute_playbook, iptrunk_subscription_factory, faker, - data_config_filename, router_subscription_factory, ): # Set up mock return values diff --git a/test/workflows/iptrunk/test_validate_iptrunk.py b/test/workflows/iptrunk/test_validate_iptrunk.py index 88f998cd2b9dc6db677689e556f02e0a73162966..5a8d924109f592d7de80bcf16ca68bb7801895c7 100644 --- a/test/workflows/iptrunk/test_validate_iptrunk.py +++ b/test/workflows/iptrunk/test_validate_iptrunk.py @@ -54,7 +54,6 @@ def test_validate_iptrunk_success( mock_find_v6_host_by_fqdn, mock_find_network_by_cidr, faker, - data_config_filename, iptrunk_subscription_factory, iptrunk_side_subscription_factory, router_subscription_factory, @@ -212,7 +211,6 @@ def test_validate_iptrunk_skip_legacy_trunks( mock_find_v6_host_by_fqdn, mock_find_network_by_cidr, faker, - data_config_filename, iptrunk_subscription_factory, iptrunk_side_subscription_factory, router_subscription_factory, diff --git a/test/workflows/l2_circuit/test_create_layer_2_circuit.py b/test/workflows/l2_circuit/test_create_layer_2_circuit.py index 29eb825ab6a533c80783f914a959bbe14b111560..04144556d1147c4de0e689b40d2f93020f943867 100644 --- a/test/workflows/l2_circuit/test_create_layer_2_circuit.py +++ b/test/workflows/l2_circuit/test_create_layer_2_circuit.py @@ -42,7 +42,6 @@ def test_create_layer_2_circuit_success( layer_2_circuit_input, faker, partner_factory, - data_config_filename, ): result, _, _ = run_workflow("create_layer_2_circuit", layer_2_circuit_input) assert_complete(result) diff --git a/test/workflows/l2_circuit/test_modify_layer_2_circuit.py b/test/workflows/l2_circuit/test_modify_layer_2_circuit.py index 0e20789722003af945f273076022a33ada5aa700..74ff7a9eeeb2c690afdcf04a3071b0cc85536da0 100644 --- a/test/workflows/l2_circuit/test_modify_layer_2_circuit.py +++ b/test/workflows/l2_circuit/test_modify_layer_2_circuit.py @@ -14,7 +14,6 @@ def test_modify_layer_2_circuit_change_policer_bandwidth( layer_2_circuit_subscription_factory, faker, partner_factory, - data_config_filename, ): subscription_id = layer_2_circuit_subscription_factory(layer_2_circuit_service_type=layer_2_circuit_service_type) subscription = Layer2Circuit.from_subscription(subscription_id) @@ -50,7 +49,6 @@ def test_modify_layer_2_circuit_change_circuit_type( layer_2_circuit_subscription_factory, faker, partner_factory, - data_config_filename, ): subscription_id = layer_2_circuit_subscription_factory(layer_2_circuit_service_type=layer_2_circuit_service_type) subscription = Layer2Circuit.from_subscription(subscription_id) diff --git a/test/workflows/l3_core_service/test_create_l3_core_service.py b/test/workflows/l3_core_service/test_create_l3_core_service.py index bab30bef4197dd9c7ce4176ff7bca1d7c493a0de..6196249a55a691fc9a6f5aa0b4f3b028db650017 100644 --- a/test/workflows/l3_core_service/test_create_l3_core_service.py +++ b/test/workflows/l3_core_service/test_create_l3_core_service.py @@ -49,7 +49,6 @@ def test_create_l3_core_service_success( partner_factory, edge_port_subscription_factory, base_bgp_peer_input, - data_config_filename, ): partner = partner_factory() product_id = get_product_id_by_name(l3_core_type) diff --git a/test/workflows/lan_switch_interconnect/test_import_lan_switch_interconnect.py b/test/workflows/lan_switch_interconnect/test_import_lan_switch_interconnect.py index 5808299f5de5f01fd001a39819f091cb716bca8b..3d82ad065f73c6274c601dd2734977c6a63deffd 100644 --- a/test/workflows/lan_switch_interconnect/test_import_lan_switch_interconnect.py +++ b/test/workflows/lan_switch_interconnect/test_import_lan_switch_interconnect.py @@ -8,7 +8,7 @@ from test.workflows import assert_complete, run_workflow @pytest.mark.workflow() def test_import_lan_switch_interconnect_success(lan_switch_interconnect_subscription_factory): - imported_lan_switch_interconnect = lan_switch_interconnect_subscription_factory(is_imported=False) + imported_lan_switch_interconnect = lan_switch_interconnect_subscription_factory(is_imported=True) result, _, _ = run_workflow( "import_lan_switch_interconnect", [{"subscription_id": imported_lan_switch_interconnect}] ) diff --git a/test/workflows/office_router/test_import_office_router.py b/test/workflows/office_router/test_import_office_router.py index e86e3ed1978271adb61cc4c0024759434ba9953a..306cffa4b8be910122313e7af080533f00f0c14a 100644 --- a/test/workflows/office_router/test_import_office_router.py +++ b/test/workflows/office_router/test_import_office_router.py @@ -8,7 +8,7 @@ from test.workflows import assert_complete, run_workflow @pytest.mark.workflow() def test_import_office_router_success(office_router_subscription_factory): - imported_office_router = office_router_subscription_factory(is_imported=False) + imported_office_router = office_router_subscription_factory(is_imported=True) result, _, _ = run_workflow("import_office_router", [{"subscription_id": imported_office_router}]) subscription = OfficeRouter.from_subscription(imported_office_router) diff --git a/test/workflows/opengear/test_import_opengear.py b/test/workflows/opengear/test_import_opengear.py index 6ca1da2523951ffc5ff8fe3e296bfd46fed9a76f..8643ec4935c7bcda4783fb9e33adbdc0d2f605c1 100644 --- a/test/workflows/opengear/test_import_opengear.py +++ b/test/workflows/opengear/test_import_opengear.py @@ -8,7 +8,7 @@ from test.workflows import assert_complete, run_workflow @pytest.mark.workflow() def test_import_office_router_success(opengear_subscription_factory): - imported_opengear = opengear_subscription_factory(is_imported=False) + imported_opengear = opengear_subscription_factory(is_imported=True) result, _, _ = run_workflow("import_opengear", [{"subscription_id": imported_opengear}]) subscription = Opengear.from_subscription(imported_opengear) diff --git a/test/workflows/router/test_import_router.py b/test/workflows/router/test_import_router.py index 2bab9ac22a03bb9a0ec5f60da14fb56403668033..74fed0f5fa03324dcb535b029668d7d2d89bdbf8 100644 --- a/test/workflows/router/test_import_router.py +++ b/test/workflows/router/test_import_router.py @@ -8,7 +8,7 @@ from test.workflows import assert_complete, run_workflow @pytest.mark.workflow() def test_import_site_success(router_subscription_factory): - imported_router = router_subscription_factory(is_imported=False) + imported_router = router_subscription_factory(is_imported=True) result, _, _ = run_workflow("import_router", [{"subscription_id": imported_router}]) subscription = Router.from_subscription(imported_router) diff --git a/test/workflows/router/test_promote_p_to_pe.py b/test/workflows/router/test_promote_p_to_pe.py index 3a8d3be9c8ff7714c26833bbd01a79388ad8167e..1c7ef1dc3e74e1119e288c9ea6c1a100a0124da1 100644 --- a/test/workflows/router/test_promote_p_to_pe.py +++ b/test/workflows/router/test_promote_p_to_pe.py @@ -24,7 +24,6 @@ def test_promote_p_to_pe_success( mock_kentik_client, mock_execute_playbook, router_subscription_factory, - data_config_filename, faker, ): """Test the successful promotion of a Nokia P router to a PE router.""" @@ -49,7 +48,7 @@ def test_promote_p_to_pe_success( @pytest.mark.workflow() -def test_promote_p_to_pe_juniper_router(router_subscription_factory, data_config_filename, faker): +def test_promote_p_to_pe_juniper_router(router_subscription_factory, faker): """Test that the workflow does not run for a Juniper P router since this workflow is only for Nokia routers.""" router_id = router_subscription_factory( vendor=Vendor.JUNIPER, router_role=RouterRole.P, status=SubscriptionLifecycle.ACTIVE @@ -64,9 +63,7 @@ def test_promote_p_to_pe_juniper_router(router_subscription_factory, data_config @pytest.mark.workflow() @patch("gso.services.lso_client._send_request") -def test_promote_p_to_pe_nokia_pe_router( - mock_execute_playbook, router_subscription_factory, data_config_filename, faker -): +def test_promote_p_to_pe_nokia_pe_router(mock_execute_playbook, router_subscription_factory, faker): """Test that the workflow does not run for a Nokia PE router since it is already a PE router.""" router_id = router_subscription_factory( vendor=Vendor.NOKIA, router_role=RouterRole.PE, status=SubscriptionLifecycle.ACTIVE diff --git a/test/workflows/router/test_terminate_router.py b/test/workflows/router/test_terminate_router.py index d2dca96f7492c67f2396fb8935801670798b43b4..ed9c565d87730bd783406ba803c950a9744c976e 100644 --- a/test/workflows/router/test_terminate_router.py +++ b/test/workflows/router/test_terminate_router.py @@ -26,7 +26,6 @@ def test_terminate_pe_router_full_success( update_ibgp_mesh, router_subscription_factory, faker, - data_config_filename, ): # Prepare mock values and expected results product_id = router_subscription_factory() @@ -80,7 +79,6 @@ def test_terminate_p_router_full_success( update_ibgp_mesh, router_subscription_factory, faker, - data_config_filename, ): # Prepare mock values and expected results product_id = router_subscription_factory(router_role=RouterRole.P) diff --git a/test/workflows/router/test_update_ibgp_mesh.py b/test/workflows/router/test_update_ibgp_mesh.py index 693bfd671160abe62d35a6340a08c8220c31a59f..055ad43aa3324e25a2cc4c0c3edf90f907187c9f 100644 --- a/test/workflows/router/test_update_ibgp_mesh.py +++ b/test/workflows/router/test_update_ibgp_mesh.py @@ -32,7 +32,6 @@ def test_update_ibgp_mesh_success( iptrunk_subscription_factory, iptrunk_side_subscription_factory, router_subscription_factory, - data_config_filename, faker, ): mock_librenms_device_exists.return_value = False @@ -71,7 +70,7 @@ def test_update_ibgp_mesh_success( @pytest.mark.parametrize("trunk_status", [SubscriptionLifecycle.INITIAL, SubscriptionLifecycle.TERMINATED]) @pytest.mark.workflow() -def test_update_ibgp_mesh_failure(iptrunk_subscription_factory, data_config_filename, trunk_status): +def test_update_ibgp_mesh_failure(iptrunk_subscription_factory, trunk_status): ip_trunk = Iptrunk.from_subscription(iptrunk_subscription_factory(status=trunk_status)) ibgp_mesh_input_form_data = { "subscription_id": ip_trunk.iptrunk.iptrunk_sides[0].iptrunk_side_node.owner_subscription_id @@ -83,7 +82,7 @@ def test_update_ibgp_mesh_failure(iptrunk_subscription_factory, data_config_file @pytest.mark.workflow() -def test_update_ibgp_mesh_isolated_router(router_subscription_factory, data_config_filename): +def test_update_ibgp_mesh_isolated_router(router_subscription_factory): router_id = router_subscription_factory(router_role=RouterRole.P) exception_message = "Selected router does not terminate any available IP trunks." diff --git a/test/workflows/site/test_import_site.py b/test/workflows/site/test_import_site.py index f706a4737b5c2747693529ab084646c47848d32e..df609aa27a86c12a06bc7655b8d2f93378dc9dfc 100644 --- a/test/workflows/site/test_import_site.py +++ b/test/workflows/site/test_import_site.py @@ -8,7 +8,7 @@ from test.workflows import assert_complete, run_workflow @pytest.mark.workflow() def test_import_site_success(site_subscription_factory): - imported_site = site_subscription_factory(is_imported=False) + imported_site = site_subscription_factory(is_imported=True) result, _, _ = run_workflow("import_site", [{"subscription_id": imported_site}]) subscription = Site.from_subscription(imported_site) diff --git a/test/workflows/super_pop_switch/test_import_super_pop_switch.py b/test/workflows/super_pop_switch/test_import_super_pop_switch.py index 2961b53988122eec82bcb4ccbde63f85acbfb864..80d4f37029f341cb5649af4a1e6a1e6e6df9bd8b 100644 --- a/test/workflows/super_pop_switch/test_import_super_pop_switch.py +++ b/test/workflows/super_pop_switch/test_import_super_pop_switch.py @@ -8,7 +8,7 @@ from test.workflows import assert_complete, run_workflow @pytest.mark.workflow() def test_import_super_pop_switch_success(super_pop_switch_subscription_factory): - imported_super_pop_switch = super_pop_switch_subscription_factory(is_imported=False) + imported_super_pop_switch = super_pop_switch_subscription_factory(is_imported=True) result, _, _ = run_workflow("import_super_pop_switch", [{"subscription_id": imported_super_pop_switch}]) subscription = SuperPopSwitch.from_subscription(imported_super_pop_switch) diff --git a/test/workflows/switch/test_import_switch.py b/test/workflows/switch/test_import_switch.py index 9bdd578d66553b9e5f06f09717aae2b35e633460..b35760573a94faa7d6f058035413a45aaf747e5f 100644 --- a/test/workflows/switch/test_import_switch.py +++ b/test/workflows/switch/test_import_switch.py @@ -8,7 +8,7 @@ from test.workflows import assert_complete, run_workflow @pytest.mark.workflow() def test_import_switch_success(switch_subscription_factory): - imported_switch = switch_subscription_factory(is_imported=False) + imported_switch = switch_subscription_factory(is_imported=True) result, _, _ = run_workflow("import_switch", [{"subscription_id": imported_switch}]) subscription = Switch.from_subscription(imported_switch) diff --git a/test/workflows/switch/test_validate_switch.py b/test/workflows/switch/test_validate_switch.py index f4e449b0ca84b439193cc55825e43c0bce1cbfb1..13e81c6f68459b6f4a8016f86069e8e57504251f 100644 --- a/test/workflows/switch/test_validate_switch.py +++ b/test/workflows/switch/test_validate_switch.py @@ -19,7 +19,6 @@ def test_validate_switch_success( mock_execute_playbook, switch_subscription_factory, faker, - data_config_filename, geant_partner, ): # Run workflow diff --git a/test/workflows/vrf/test_create_vrf.py b/test/workflows/vrf/test_create_vrf.py index 5ec13598548d7d658605c0863bae7e04a5bdb460..cae26f1188bf397a7aabd8ce9dac8779544620ee 100644 --- a/test/workflows/vrf/test_create_vrf.py +++ b/test/workflows/vrf/test_create_vrf.py @@ -29,7 +29,6 @@ def vrf_input(faker): def test_create_vrf_success( vrf_input, faker, - data_config_filename, ): result, _, _ = run_workflow("create_vrf", vrf_input) assert_complete(result) @@ -48,7 +47,6 @@ def test_create_vrf_with_duplicate_vrf_name( vrf_input, faker, vrf_subscription_factory, - data_config_filename, ): vrf_subscription_factory(vrf_name=vrf_input[1]["vrf_name"]) with pytest.raises(FormValidationError, match="vrf_name must be unique."):