diff --git a/gso/services/netbox_client.py b/gso/services/netbox_client.py index 2566f603329e912621f51977fa40694581170a49..63ce9da8f426315541070b493f721a3129caa625 100644 --- a/gso/services/netbox_client.py +++ b/gso/services/netbox_client.py @@ -13,6 +13,7 @@ from gso.settings import load_oss_params from gso.utils.device_info import ( DEFAULT_SITE, FEASIBLE_IP_TRUNK_LAG_RANGE, + FEASIBLE_SERVICES_LAG_RANGE, ROUTER_ROLE, TierInfo, ) @@ -311,7 +312,7 @@ class NetboxClient: def get_available_services_lags(self, router_id: UUID) -> list[str]: """Return all available Edge port LAGs not assigned to a device.""" - return self.get_available_lags_in_range(router_id, range(20, 51)) + return self.get_available_lags_in_range(router_id, FEASIBLE_SERVICES_LAG_RANGE) @staticmethod def calculate_speed_bits_per_sec(speed: str) -> int: diff --git a/gso/utils/helpers.py b/gso/utils/helpers.py index 4e4c7646debeb0962fd3ad4d640dd3de2334fb60..47fbec269627f4ca9184a34ed496a77f8104d07f 100644 --- a/gso/utils/helpers.py +++ b/gso/utils/helpers.py @@ -215,5 +215,5 @@ def validate_edge_port_number_of_members_based_on_lacp(*, number_of_members: int :raises ValueError: If the number of members is greater than 1 and LACP is disabled. """ if number_of_members > 1 and not enable_lacp: - err_msg = "Number of members must be 1 if LACP is disabled" + err_msg = "Number of members must be 1 if LACP is disabled." raise ValueError(err_msg) diff --git a/gso/workflows/edge_port/create_edge_port.py b/gso/workflows/edge_port/create_edge_port.py index ce87d1f5b5e065e334d5a0949c5a547763a957e2..312461a038668289d835199bd77a31c592fb801a 100644 --- a/gso/workflows/edge_port/create_edge_port.py +++ b/gso/workflows/edge_port/create_edge_port.py @@ -24,6 +24,7 @@ from gso.products.product_types.router import Router from gso.services import lso_client, subscriptions from gso.services.lso_client import lso_interaction from gso.services.netbox_client import NetboxClient +from gso.services.partners import get_partner_by_id from gso.utils.helpers import ( available_interfaces_choices, available_service_lags_choices, @@ -131,7 +132,8 @@ def initialize_subscription( subscription.edge_port.edge_port_ignore_if_down = ignore_if_down subscription.edge_port.edge_port_geant_ga_id = geant_ga_id subscription.edge_port.edge_port_mac_address = mac_address - subscription.description = f"Edge Port {name} on {router.router_fqdn}, {partner}, {geant_ga_id or ""}" + partner_name = get_partner_by_id(partner).name + subscription.description = f"Edge Port {name} on {router.router_fqdn}, {partner_name}, {geant_ga_id or ""}" subscription.edge_port.edge_port_description = description for member in ae_members: subscription.edge_port.edge_port_ae_members.append( diff --git a/gso/workflows/edge_port/validate_edge_port.py b/gso/workflows/edge_port/validate_edge_port.py index 60611acc37c33955ac34492b40eea8cb0d4a3abf..81714721ee2cb411da22270a854b5b47edee32ac 100644 --- a/gso/workflows/edge_port/validate_edge_port.py +++ b/gso/workflows/edge_port/validate_edge_port.py @@ -10,7 +10,7 @@ from orchestrator.workflows.steps import resync, store_process_subscription from orchestrator.workflows.utils import wrap_modify_initial_input_form from gso.products.product_types.edge_port import EdgePort -from gso.services.lso_client import execute_playbook +from gso.services.lso_client import anonymous_lso_interaction, execute_playbook from gso.services.netbox_client import NetboxClient @@ -85,7 +85,7 @@ def validate_edge_port() -> StepList: >> store_process_subscription(Target.SYSTEM) >> prepare_state >> verify_netbox_entries - >> verify_base_config + >> anonymous_lso_interaction(verify_base_config) >> resync >> done ) diff --git a/test/conftest.py b/test/conftest.py index ed4cb631e966caa4149356f0c258c2c164d850e3..5b1e91f17ff182286d05db81dddd1e9f2b73f891 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -36,6 +36,7 @@ from gso.main import init_gso_app from gso.services.partners import PartnerSchema, create_partner from gso.utils.types.interfaces import LAGMember, LAGMemberList from test.fixtures import ( # noqa: F401 + edge_port_subscription_factory, iptrunk_side_subscription_factory, iptrunk_subscription_factory, office_router_subscription_factory, @@ -43,7 +44,6 @@ from test.fixtures import ( # noqa: F401 router_subscription_factory, site_subscription_factory, super_pop_switch_subscription_factory, - test_workflow, ) logging.getLogger("faker.factory").setLevel(logging.WARNING) diff --git a/test/fixtures/__init__.py b/test/fixtures/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..2269a6b867b32a9adc6179b0f814e89b82cd2352 --- /dev/null +++ b/test/fixtures/__init__.py @@ -0,0 +1,19 @@ +from test.fixtures.edge_port_fixtures import edge_port_subscription_factory +from test.fixtures.iptrunk_fixtures import iptrunk_side_subscription_factory, iptrunk_subscription_factory +from test.fixtures.office_router_fixtures import office_router_subscription_factory +from test.fixtures.opengear_fixtures import opengear_subscription_factory +from test.fixtures.router_fixtures import juniper_router_subscription_factory, nokia_router_subscription_factory +from test.fixtures.site_fixtures import site_subscription_factory +from test.fixtures.super_pop_switch_fixtures import super_pop_switch_subscription_factory + +__all__ = [ + "edge_port_subscription_factory", + "iptrunk_side_subscription_factory", + "iptrunk_subscription_factory", + "juniper_router_subscription_factory", + "nokia_router_subscription_factory", + "office_router_subscription_factory", + "opengear_subscription_factory", + "site_subscription_factory", + "super_pop_switch_subscription_factory", +] diff --git a/test/fixtures/common_fixtures.py b/test/fixtures/common_fixtures.py new file mode 100644 index 0000000000000000000000000000000000000000..aeac5a991c3afda7b32c7da7ff9b5e223f630aab --- /dev/null +++ b/test/fixtures/common_fixtures.py @@ -0,0 +1,81 @@ +from typing import Any + +from orchestrator.db import ( + ProductTable, + SubscriptionInstanceTable, + SubscriptionInstanceValueTable, + SubscriptionTable, + db, +) +from orchestrator.utils.datetime import nowtz +from pydantic_forms.types import SubscriptionMapping + + +def create_subscription_for_mapping( + product: ProductTable, mapping: SubscriptionMapping, values: dict[str, Any], **kwargs: Any +) -> SubscriptionTable: + """Create a subscription in the test coredb for the given subscription_mapping and values. + + This function handles optional resource types starting with a ? in the mapping not supplied in the values array. + + Args: + product: the ProductTable to create a sub for + mapping: the subscription_mapping belonging to that product + values: a dictionary of keys from the sub_map and their corresponding test values + kwargs: The rest of the arguments + + Returns: The conforming subscription. + """ + + def build_instance(name, value_mapping): + block = product.find_block_by_name(name) + + def build_value(rt, value): + resource_type = block.find_resource_type_by_name(rt) + return SubscriptionInstanceValueTable(resource_type_id=resource_type.resource_type_id, value=value) + + return SubscriptionInstanceTable( + product_block_id=block.product_block_id, + values=[ + build_value(resource_type, values[value_key]) for (resource_type, value_key) in value_mapping.items() + ], + ) + + # recreate the mapping: leave out the ?keys if no value supplied for them + mapping = { + name: [ + { + **{k: value_map[k] for k in value_map if not value_map[k].startswith("?")}, + **{ + k: value_map[k][1:] + for k in value_map + if value_map[k].startswith("?") and value_map[k][1:] in values + }, + } + for value_map in mapping[name] + ] + for name in mapping + } + + instances = [ + build_instance(name, value_mapping) + for (name, value_mappings) in mapping.items() + for value_mapping in value_mappings + ] + + return create_subscription(instances=instances, product=product, **kwargs) + + +def create_subscription(**kwargs): + attrs = { + "description": "A subscription.", + "customer_id": kwargs.get("customer_id", "85938c4c-0a11-e511-80d0-005056956c1a"), + "start_date": nowtz(), + "status": "active", + "insync": True, + **kwargs, + } + o = SubscriptionTable(**attrs) + db.session.add(o) + db.session.commit() + return o diff --git a/test/fixtures/edge_port_fixtures.py b/test/fixtures/edge_port_fixtures.py new file mode 100644 index 0000000000000000000000000000000000000000..730da5b4eeef781a1b8f21df6a3cdc07b57d49bb --- /dev/null +++ b/test/fixtures/edge_port_fixtures.py @@ -0,0 +1,89 @@ +import pytest +from orchestrator.db import ( + db, +) +from orchestrator.domain import SubscriptionModel +from orchestrator.types import SubscriptionLifecycle, UUIDstr + +from gso.products import ProductName, Router +from gso.products.product_blocks.edge_port import ( + EdgePortAEMemberBlock, + EdgePortType, + EncapsulationType, +) +from gso.products.product_types.edge_port import EdgePortInactive +from gso.services import subscriptions +from gso.utils.types.interfaces import PhysicalPortCapacity + + +@pytest.fixture() +def edge_port_subscription_factory(faker, partner_factory, nokia_router_subscription_factory): + def subscription_create( + description=None, + partner: dict | None = None, + start_date="2023-05-24T00:00:00+00:00", + node=None, + name=None, + edge_port_description=None, + encapsulation=EncapsulationType.DOT1Q, + mac_address=None, + member_speed=PhysicalPortCapacity.HUNDRED_GIGABIT_PER_SECOND, + minimum_links=None, + edge_port_type=EdgePortType.PUBLIC, + geant_ga_id=None, + edge_port_ae_members=None, + edge_port_sbp_list=None, + status: SubscriptionLifecycle | None = None, + *, + enable_lacp=True, + ignore_if_down=False, + ) -> UUIDstr: + partner = partner or partner_factory(name=faker.company(), email=faker.email()) + edge_port_node = Router.from_subscription(nokia_router_subscription_factory()).router + product_id = subscriptions.get_product_id_by_name(ProductName.EDGE_PORT) + edge_port_subscription = EdgePortInactive.from_product_id( + product_id, customer_id=partner["partner_id"], insync=True + ) + + edge_port_subscription.edge_port.edge_port_description = description or faker.text(max_nb_chars=30) + edge_port_subscription.edge_port.edge_port_geant_ga_id = geant_ga_id or faker.geant_sid() + edge_port_subscription.edge_port.edge_port_node = node or edge_port_node + edge_port_subscription.edge_port.edge_port_name = name or f"lag-{faker.pyint(21, 50)}" + edge_port_subscription.edge_port.edge_port_description = edge_port_description or faker.sentence() + edge_port_subscription.edge_port.edge_port_enable_lacp = enable_lacp + edge_port_subscription.edge_port.edge_port_encapsulation = encapsulation + edge_port_subscription.edge_port.edge_port_mac_address = mac_address or faker.mac_address() + edge_port_subscription.edge_port.edge_port_member_speed = member_speed + edge_port_subscription.edge_port.edge_port_minimum_links = minimum_links or faker.pyint(1, 2) + edge_port_subscription.edge_port.edge_port_type = edge_port_type + edge_port_subscription.edge_port.edge_port_ignore_if_down = ignore_if_down + edge_port_subscription.edge_port.edge_port_geant_ga_id = geant_ga_id + edge_port_subscription.edge_port.edge_port_ae_members = edge_port_ae_members or [ + EdgePortAEMemberBlock.new( + faker.uuid4(), + interface_name="Interface2", + interface_description=faker.sentence(), + ), + EdgePortAEMemberBlock.new( + faker.uuid4(), + interface_name="Interface3", + interface_description=faker.sentence(), + ), + ] + edge_port_subscription.edge_port.edge_port_sbp_list = edge_port_sbp_list or [] + edge_port_subscription = SubscriptionModel.from_other_lifecycle( + edge_port_subscription, + SubscriptionLifecycle.ACTIVE, + ) + + if status: + edge_port_subscription.status = status + + edge_port_subscription.description = description or faker.text(max_nb_chars=30) + edge_port_subscription.start_date = start_date + edge_port_subscription.save() + db.session.commit() + + return str(edge_port_subscription.subscription_id) + + return subscription_create diff --git a/test/fixtures/iptrunk_fixtures.py b/test/fixtures/iptrunk_fixtures.py new file mode 100644 index 0000000000000000000000000000000000000000..f2251fc7d5bac67daba1f96906a73f0d61339fdc --- /dev/null +++ b/test/fixtures/iptrunk_fixtures.py @@ -0,0 +1,126 @@ +import pytest +from orchestrator.db import ( + db, +) +from orchestrator.domain import SubscriptionModel +from orchestrator.types import SubscriptionLifecycle, UUIDstr + +from gso.products import ProductName +from gso.products.product_blocks.iptrunk import ( + IptrunkInterfaceBlock, + IptrunkSideBlock, + IptrunkType, +) +from gso.products.product_types.iptrunk import ImportedIptrunkInactive, IptrunkInactive +from gso.products.product_types.router import Router +from gso.services import subscriptions +from gso.utils.types.interfaces import PhysicalPortCapacity + + +@pytest.fixture() +def iptrunk_side_subscription_factory(nokia_router_subscription_factory, faker): + def subscription_create( + iptrunk_side_node=None, + iptrunk_side_ae_iface=None, + iptrunk_side_ae_geant_a_sid=None, + iptrunk_side_ae_members=None, + iptrunk_side_ae_members_description=None, + ) -> IptrunkSideBlock: + iptrunk_side_node_id = iptrunk_side_node or nokia_router_subscription_factory() + iptrunk_side_node = Router.from_subscription(iptrunk_side_node_id).router + iptrunk_side_ae_iface = iptrunk_side_ae_iface or faker.pystr() + iptrunk_side_ae_geant_a_sid = iptrunk_side_ae_geant_a_sid or faker.geant_sid() + iptrunk_side_ae_members = iptrunk_side_ae_members or [ + IptrunkInterfaceBlock.new( + faker.uuid4(), + interface_name=faker.network_interface(), + interface_description=faker.sentence(), + ), + IptrunkInterfaceBlock.new( + faker.uuid4(), + interface_name=faker.network_interface(), + interface_description=faker.sentence(), + ), + ] + + return IptrunkSideBlock.new( + faker.uuid4(), + iptrunk_side_node=iptrunk_side_node, + iptrunk_side_ae_iface=iptrunk_side_ae_iface, + iptrunk_side_ae_geant_a_sid=iptrunk_side_ae_geant_a_sid, + iptrunk_side_ae_members=iptrunk_side_ae_members, + iptrunk_side_ae_members_description=iptrunk_side_ae_members_description, + ) + + return subscription_create + + +@pytest.fixture() +def iptrunk_subscription_factory(iptrunk_side_subscription_factory, faker, geant_partner): + def subscription_create( + description=None, + start_date="2023-05-24T00:00:00+00:00", + geant_s_sid=None, + iptrunk_description=None, + iptrunk_type=IptrunkType.LEASED, + iptrunk_speed=PhysicalPortCapacity.ONE_GIGABIT_PER_SECOND, + iptrunk_isis_metric=None, + iptrunk_ipv4_network=None, + iptrunk_ipv6_network=None, + iptrunk_sides=None, + status: SubscriptionLifecycle | None = None, + partner: dict | None = None, + *, + is_imported: bool | None = True, + ) -> UUIDstr: + if partner is None: + partner = geant_partner + + if is_imported: + product_id = subscriptions.get_product_id_by_name(ProductName.IP_TRUNK) + iptrunk_subscription = IptrunkInactive.from_product_id( + product_id, customer_id=partner["partner_id"], insync=True + ) + else: + product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_IP_TRUNK) + iptrunk_subscription = ImportedIptrunkInactive.from_product_id( + product_id, customer_id=partner["partner_id"], insync=True + ) + + description = description or faker.sentence() + geant_s_sid = geant_s_sid or faker.geant_sid() + iptrunk_description = iptrunk_description or faker.sentence() + iptrunk_isis_metric = iptrunk_isis_metric or faker.pyint() + iptrunk_ipv4_network = iptrunk_ipv4_network or faker.ipv4_network(max_subnet=31) + iptrunk_ipv6_network = iptrunk_ipv6_network or faker.ipv6_network(max_subnet=126) + iptrunk_minimum_links = 1 + iptrunk_side_a = iptrunk_side_subscription_factory() + iptrunk_side_b = iptrunk_side_subscription_factory() + iptrunk_sides = iptrunk_sides or [iptrunk_side_a, iptrunk_side_b] + + iptrunk_subscription.iptrunk.geant_s_sid = geant_s_sid + iptrunk_subscription.iptrunk.iptrunk_description = iptrunk_description + iptrunk_subscription.iptrunk.iptrunk_type = iptrunk_type + iptrunk_subscription.iptrunk.iptrunk_speed = iptrunk_speed + iptrunk_subscription.iptrunk.iptrunk_minimum_links = iptrunk_minimum_links + iptrunk_subscription.iptrunk.iptrunk_isis_metric = iptrunk_isis_metric + iptrunk_subscription.iptrunk.iptrunk_ipv4_network = iptrunk_ipv4_network + iptrunk_subscription.iptrunk.iptrunk_ipv6_network = iptrunk_ipv6_network + iptrunk_subscription.iptrunk.iptrunk_sides = iptrunk_sides + + iptrunk_subscription = SubscriptionModel.from_other_lifecycle( + iptrunk_subscription, + SubscriptionLifecycle.ACTIVE, + ) + + if status: + iptrunk_subscription.status = status + + iptrunk_subscription.description = description + iptrunk_subscription.start_date = start_date + iptrunk_subscription.save() + db.session.commit() + + return str(iptrunk_subscription.subscription_id) + + return subscription_create diff --git a/test/fixtures/office_router_fixtures.py b/test/fixtures/office_router_fixtures.py new file mode 100644 index 0000000000000000000000000000000000000000..2f43fa6b1aae13c915a3cc77acbe41942755d1dc --- /dev/null +++ b/test/fixtures/office_router_fixtures.py @@ -0,0 +1,74 @@ +import ipaddress + +import pytest +from orchestrator.db import ( + db, +) +from orchestrator.domain import SubscriptionModel +from orchestrator.types import SubscriptionLifecycle, UUIDstr + +from gso.products import ProductName +from gso.products.product_types.office_router import ImportedOfficeRouterInactive, OfficeRouterInactive +from gso.products.product_types.site import Site +from gso.services import subscriptions +from gso.utils.shared_enums import Vendor + + +@pytest.fixture() +def office_router_subscription_factory(site_subscription_factory, faker, geant_partner): + def subscription_create( + description=None, + start_date="2023-05-24T00:00:00+00:00", + office_router_fqdn=None, + office_router_ts_port=None, + office_router_lo_ipv4_address=None, + office_router_lo_ipv6_address=None, + office_router_site=None, + status: SubscriptionLifecycle | None = None, + partner: dict | None = None, + *, + is_imported: bool | None = True, + ) -> UUIDstr: + if partner is None: + partner = geant_partner + + description = description or faker.text(max_nb_chars=30) + office_router_fqdn = office_router_fqdn or faker.domain_name(levels=4) + office_router_ts_port = office_router_ts_port or faker.random_int(min=1, max=49151) + office_router_lo_ipv4_address = office_router_lo_ipv4_address or ipaddress.IPv4Address(faker.ipv4()) + office_router_lo_ipv6_address = office_router_lo_ipv6_address or ipaddress.IPv6Address(faker.ipv6()) + office_router_site = office_router_site or site_subscription_factory() + + if is_imported: + product_id = subscriptions.get_product_id_by_name(ProductName.OFFICE_ROUTER) + office_router_subscription = OfficeRouterInactive.from_product_id( + product_id, customer_id=partner["partner_id"], insync=True + ) + else: + product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_OFFICE_ROUTER) + office_router_subscription = ImportedOfficeRouterInactive.from_product_id( + product_id, customer_id=partner["partner_id"], insync=True + ) + + office_router_subscription.office_router.office_router_fqdn = office_router_fqdn + office_router_subscription.office_router.office_router_ts_port = office_router_ts_port + office_router_subscription.office_router.office_router_lo_ipv4_address = office_router_lo_ipv4_address + office_router_subscription.office_router.office_router_lo_ipv6_address = office_router_lo_ipv6_address + office_router_subscription.office_router.office_router_site = Site.from_subscription(office_router_site).site + office_router_subscription.office_router.vendor = Vendor.NOKIA + + office_router_subscription = SubscriptionModel.from_other_lifecycle( + office_router_subscription, SubscriptionLifecycle.ACTIVE + ) + office_router_subscription.description = description + office_router_subscription.start_date = start_date + + if status: + office_router_subscription.status = status + + office_router_subscription.save() + db.session.commit() + + return str(office_router_subscription.subscription_id) + + return subscription_create diff --git a/test/fixtures/opengear_fixtures.py b/test/fixtures/opengear_fixtures.py new file mode 100644 index 0000000000000000000000000000000000000000..d3cd83dacc644b62d0a33b47a055954619bc528f --- /dev/null +++ b/test/fixtures/opengear_fixtures.py @@ -0,0 +1,70 @@ +import pytest +from orchestrator.db import ( + db, +) +from orchestrator.domain import SubscriptionModel +from orchestrator.types import SubscriptionLifecycle, UUIDstr + +from gso.products import ProductName +from gso.products.product_types.opengear import ImportedOpengearInactive, OpengearInactive +from gso.products.product_types.site import Site +from gso.services import subscriptions + + +@pytest.fixture() +def opengear_subscription_factory(site_subscription_factory, faker, geant_partner): + def subscription_create( + description=None, + start_date="2023-05-24T00:00:00+00:00", + opengear_site=None, + opengear_hostname=None, + opengear_wan_address=None, + opengear_wan_netmask=None, + opengear_wan_gateway=None, + status: SubscriptionLifecycle | None = None, + partner: dict | None = None, + *, + is_imported: bool | None = True, + ) -> UUIDstr: + if partner is None: + partner = geant_partner + + description = description or faker.text(max_nb_chars=30) + opengear_site = opengear_site or site_subscription_factory() + opengear_hostname = opengear_hostname or faker.domain_name(levels=4) + opengear_wan_address = opengear_wan_address or faker.ipv4() + opengear_wan_netmask = opengear_wan_netmask or faker.ipv4() + opengear_wan_gateway = opengear_wan_gateway or faker.ipv4() + + if is_imported: + product_id = subscriptions.get_product_id_by_name(ProductName.OPENGEAR) + opengear_subscription = OpengearInactive.from_product_id( + product_id, customer_id=partner["partner_id"], insync=True + ) + else: + product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_OPENGEAR) + opengear_subscription = ImportedOpengearInactive.from_product_id( + product_id, customer_id=partner["partner_id"], insync=True + ) + + opengear_subscription.opengear.opengear_site = Site.from_subscription(opengear_site).site + opengear_subscription.opengear.opengear_hostname = opengear_hostname + opengear_subscription.opengear.opengear_wan_address = opengear_wan_address + opengear_subscription.opengear.opengear_wan_netmask = opengear_wan_netmask + opengear_subscription.opengear.opengear_wan_gateway = opengear_wan_gateway + + opengear_subscription = SubscriptionModel.from_other_lifecycle( + opengear_subscription, SubscriptionLifecycle.ACTIVE + ) + opengear_subscription.description = description + opengear_subscription.start_date = start_date + + if status: + opengear_subscription.status = status + + opengear_subscription.save() + db.session.commit() + + return str(opengear_subscription.subscription_id) + + return subscription_create diff --git a/test/fixtures/router_fixtures.py b/test/fixtures/router_fixtures.py new file mode 100644 index 0000000000000000000000000000000000000000..d08dde358b71d60e193201065dfc1507f4463220 --- /dev/null +++ b/test/fixtures/router_fixtures.py @@ -0,0 +1,148 @@ +import ipaddress + +import pytest +from orchestrator.db import ( + db, +) +from orchestrator.domain import SubscriptionModel +from orchestrator.types import SubscriptionLifecycle, UUIDstr + +from gso.products import ProductName +from gso.products.product_blocks.router import RouterRole +from gso.products.product_types.router import ImportedRouterInactive, RouterInactive +from gso.products.product_types.site import Site +from gso.services import subscriptions +from gso.utils.shared_enums import Vendor + + +@pytest.fixture() +def nokia_router_subscription_factory(site_subscription_factory, faker, geant_partner): + def subscription_create( + description=None, + start_date="2023-05-24T00:00:00+00:00", + router_fqdn=None, + router_ts_port=None, + router_access_via_ts=None, + router_lo_ipv4_address=None, + router_lo_ipv6_address=None, + router_lo_iso_address=None, + router_role=RouterRole.PE, + router_site=None, + status: SubscriptionLifecycle | None = None, + partner: dict | None = None, + *, + is_imported: bool | None = True, + ) -> UUIDstr: + if partner is None: + partner = geant_partner + + description = description or faker.text(max_nb_chars=30) + router_fqdn = router_fqdn or faker.domain_name(levels=4) + router_ts_port = router_ts_port or faker.random_int(min=1, max=49151) + router_access_via_ts = router_access_via_ts or faker.boolean() + router_lo_ipv4_address = router_lo_ipv4_address or ipaddress.IPv4Address(faker.ipv4()) + router_lo_ipv6_address = router_lo_ipv6_address or ipaddress.IPv6Address(faker.ipv6()) + router_lo_iso_address = router_lo_iso_address or faker.word() + router_site = router_site or site_subscription_factory() + + if is_imported: + product_id = subscriptions.get_product_id_by_name(ProductName.ROUTER) + router_subscription = RouterInactive.from_product_id( + product_id, customer_id=partner["partner_id"], insync=True + ) + else: + product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_ROUTER) + router_subscription = ImportedRouterInactive.from_product_id( + product_id, customer_id=partner["partner_id"], insync=True + ) + + router_subscription.router.router_fqdn = router_fqdn + router_subscription.router.router_ts_port = router_ts_port + router_subscription.router.router_access_via_ts = router_access_via_ts + router_subscription.router.router_lo_ipv4_address = router_lo_ipv4_address + router_subscription.router.router_lo_ipv6_address = router_lo_ipv6_address + router_subscription.router.router_lo_iso_address = router_lo_iso_address + router_subscription.router.router_role = router_role + router_subscription.router.router_site = Site.from_subscription(router_site).site + router_subscription.router.vendor = Vendor.NOKIA + + router_subscription = SubscriptionModel.from_other_lifecycle(router_subscription, SubscriptionLifecycle.ACTIVE) + router_subscription.insync = True + router_subscription.description = description + router_subscription.start_date = start_date + + if status: + router_subscription.status = status + + router_subscription.save() + db.session.commit() + + return str(router_subscription.subscription_id) + + return subscription_create + + +@pytest.fixture() +def juniper_router_subscription_factory(site_subscription_factory, faker, geant_partner): + def subscription_create( + description=None, + start_date="2023-05-24T00:00:00+00:00", + router_fqdn=None, + router_ts_port=None, + router_access_via_ts=None, + router_lo_ipv4_address=None, + router_lo_ipv6_address=None, + router_lo_iso_address=None, + router_role=RouterRole.PE, + router_site=None, + status: SubscriptionLifecycle | None = None, + partner: dict | None = None, + *, + is_imported: bool | None = True, + ) -> UUIDstr: + if partner is None: + partner = geant_partner + + description = description or faker.text(max_nb_chars=30) + router_fqdn = router_fqdn or faker.domain_name(levels=4) + router_ts_port = router_ts_port or faker.random_int(min=1, max=49151) + router_access_via_ts = router_access_via_ts or faker.boolean() + router_lo_ipv4_address = router_lo_ipv4_address or ipaddress.IPv4Address(faker.ipv4()) + router_lo_ipv6_address = router_lo_ipv6_address or ipaddress.IPv6Address(faker.ipv6()) + router_lo_iso_address = router_lo_iso_address or faker.word() + router_site = router_site or site_subscription_factory() + + if is_imported: + product_id = subscriptions.get_product_id_by_name(ProductName.ROUTER) + router_subscription = RouterInactive.from_product_id( + product_id, customer_id=partner["partner_id"], insync=True + ) + else: + product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_ROUTER) + router_subscription = ImportedRouterInactive.from_product_id( + product_id, customer_id=partner["partner_id"], insync=True + ) + + router_subscription.router.router_fqdn = router_fqdn + router_subscription.router.router_ts_port = router_ts_port + router_subscription.router.router_access_via_ts = router_access_via_ts + router_subscription.router.router_lo_ipv4_address = router_lo_ipv4_address + router_subscription.router.router_lo_ipv6_address = router_lo_ipv6_address + router_subscription.router.router_lo_iso_address = router_lo_iso_address + router_subscription.router.router_role = router_role + router_subscription.router.router_site = Site.from_subscription(router_site).site + router_subscription.router.vendor = Vendor.JUNIPER + + router_subscription = SubscriptionModel.from_other_lifecycle(router_subscription, SubscriptionLifecycle.ACTIVE) + router_subscription.description = description + router_subscription.start_date = start_date + + if status: + router_subscription.status = status + + router_subscription.save() + db.session.commit() + + return str(router_subscription.subscription_id) + + return subscription_create diff --git a/test/fixtures/site_fixtures.py b/test/fixtures/site_fixtures.py new file mode 100644 index 0000000000000000000000000000000000000000..b9155a29316aa2c5e43d3976c490be36bacf689b --- /dev/null +++ b/test/fixtures/site_fixtures.py @@ -0,0 +1,79 @@ +import pytest +from orchestrator.db import ( + db, +) +from orchestrator.domain import SubscriptionModel +from orchestrator.types import SubscriptionLifecycle, UUIDstr + +from gso.products import ProductName +from gso.products.product_blocks.site import SiteTier +from gso.products.product_types.site import ImportedSiteInactive, SiteInactive +from gso.services import subscriptions + + +@pytest.fixture() +def site_subscription_factory(faker, geant_partner): + def subscription_create( + description=None, + start_date="2023-05-24T00:00:00+00:00", + site_name=None, + site_city=None, + site_country=None, + site_country_code=None, + site_latitude=None, + site_longitude=None, + site_bgp_community_id=None, + site_internal_id=None, + site_tier=SiteTier.TIER1, + site_ts_address=None, + status: SubscriptionLifecycle | None = None, + partner: dict | None = None, + *, + is_imported: bool | None = True, + ) -> UUIDstr: + if partner is None: + partner = geant_partner + + description = description or "Site Subscription" + site_name = site_name or faker.site_name() + site_city = site_city or faker.city() + site_country = site_country or faker.country() + site_country_code = site_country_code or faker.country_code() + site_latitude = site_latitude or str(faker.latitude()) + site_longitude = site_longitude or str(faker.longitude()) + site_bgp_community_id = site_bgp_community_id or faker.pyint() + site_internal_id = site_internal_id or faker.pyint() + site_ts_address = site_ts_address or faker.ipv4() + + if is_imported: + product_id = subscriptions.get_product_id_by_name(ProductName.SITE) + site_subscription = SiteInactive.from_product_id(product_id, customer_id=partner["partner_id"], insync=True) + else: + product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_SITE) + site_subscription = ImportedSiteInactive.from_product_id( + product_id, customer_id=partner["partner_id"], insync=True + ) + + site_subscription.site.site_city = site_city + site_subscription.site.site_name = site_name + site_subscription.site.site_country = site_country + site_subscription.site.site_country_code = site_country_code + site_subscription.site.site_latitude = site_latitude + site_subscription.site.site_longitude = site_longitude + site_subscription.site.site_bgp_community_id = site_bgp_community_id + site_subscription.site.site_internal_id = site_internal_id + site_subscription.site.site_tier = site_tier + site_subscription.site.site_ts_address = site_ts_address + + site_subscription = SubscriptionModel.from_other_lifecycle(site_subscription, SubscriptionLifecycle.ACTIVE) + site_subscription.description = description + site_subscription.start_date = start_date + if status: + site_subscription.status = status + + site_subscription.save() + db.session.commit() + + return str(site_subscription.subscription_id) + + return subscription_create diff --git a/test/fixtures/super_pop_switch_fixtures.py b/test/fixtures/super_pop_switch_fixtures.py new file mode 100644 index 0000000000000000000000000000000000000000..33459570df8424324491caedee3fbefc768c8bab --- /dev/null +++ b/test/fixtures/super_pop_switch_fixtures.py @@ -0,0 +1,75 @@ +import ipaddress + +import pytest +from orchestrator.db import ( + db, +) +from orchestrator.domain import SubscriptionModel +from orchestrator.types import SubscriptionLifecycle, UUIDstr + +from gso.products import ProductName +from gso.products.product_types.site import Site +from gso.products.product_types.super_pop_switch import ImportedSuperPopSwitchInactive, SuperPopSwitchInactive +from gso.services import subscriptions +from gso.utils.shared_enums import Vendor + + +@pytest.fixture() +def super_pop_switch_subscription_factory(site_subscription_factory, faker, geant_partner): + def subscription_create( + description=None, + start_date="2023-05-24T00:00:00+00:00", + super_pop_switch_fqdn=None, + super_pop_switch_ts_port=None, + super_pop_switch_mgmt_ipv4_address=None, + super_pop_switch_site=None, + status: SubscriptionLifecycle | None = None, + partner: dict | None = None, + *, + is_imported: bool | None = True, + ) -> UUIDstr: + if partner is None: + partner = geant_partner + + description = description or faker.text(max_nb_chars=30) + super_pop_switch_fqdn = super_pop_switch_fqdn or faker.domain_name(levels=4) + super_pop_switch_ts_port = super_pop_switch_ts_port or faker.random_int(min=1, max=49151) + super_pop_switch_mgmt_ipv4_address = super_pop_switch_mgmt_ipv4_address or ipaddress.IPv4Address(faker.ipv4()) + super_pop_switch_site = super_pop_switch_site or site_subscription_factory() + + if is_imported: + product_id = subscriptions.get_product_id_by_name(ProductName.SUPER_POP_SWITCH) + super_pop_switch_subscription = SuperPopSwitchInactive.from_product_id( + product_id, customer_id=partner["partner_id"], insync=True + ) + else: + product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_SUPER_POP_SWITCH) + super_pop_switch_subscription = ImportedSuperPopSwitchInactive.from_product_id( + product_id, customer_id=partner["partner_id"], insync=True + ) + + super_pop_switch_subscription.super_pop_switch.super_pop_switch_fqdn = super_pop_switch_fqdn + super_pop_switch_subscription.super_pop_switch.super_pop_switch_ts_port = super_pop_switch_ts_port + super_pop_switch_subscription.super_pop_switch.super_pop_switch_mgmt_ipv4_address = ( + super_pop_switch_mgmt_ipv4_address + ) + super_pop_switch_subscription.super_pop_switch.super_pop_switch_site = Site.from_subscription( + super_pop_switch_site + ).site + super_pop_switch_subscription.super_pop_switch.vendor = Vendor.NOKIA + + super_pop_switch_subscription = SubscriptionModel.from_other_lifecycle( + super_pop_switch_subscription, SubscriptionLifecycle.ACTIVE + ) + super_pop_switch_subscription.description = description + super_pop_switch_subscription.start_date = start_date + + if status: + super_pop_switch_subscription.status = status + + super_pop_switch_subscription.save() + db.session.commit() + + return str(super_pop_switch_subscription.subscription_id) + + return subscription_create diff --git a/test/services/conftest.py b/test/services/conftest.py index 9557b1c5b091e51150ab78cf4b58c62f8018df8e..3467d545d2c571e947c75022c8a574197394e723 100644 --- a/test/services/conftest.py +++ b/test/services/conftest.py @@ -12,10 +12,22 @@ class MockedNetboxClient: def get_device_by_name(self): return self.BaseMockObject(id=1, name="test") + @staticmethod + def get_interface_by_name_and_device(interface_name: str, device_name: str): + return { + "name": f"{interface_name}", + "module": {"display": f"Module{interface_name}"}, + "description": f"Description{interface_name}-{device_name}", + } + @staticmethod def get_available_lags() -> list[str]: return [f"lag-{lag}" for lag in range(1, 5)] + @staticmethod + def get_available_services_lags() -> list[str]: + return [f"lag-{lag}" for lag in range(21, 50)] + @staticmethod def get_available_interfaces(): interfaces = [] diff --git a/test/workflows/edge_port/__init__.py b/test/workflows/edge_port/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/test/workflows/edge_port/test_create_edge_port.py b/test/workflows/edge_port/test_create_edge_port.py new file mode 100644 index 0000000000000000000000000000000000000000..db855e8a18e10ed62a8822c901dc65d447081325 --- /dev/null +++ b/test/workflows/edge_port/test_create_edge_port.py @@ -0,0 +1,124 @@ +from os import PathLike +from unittest.mock import patch + +import pytest +from pydantic_forms.exceptions import FormValidationError + +from gso.products import EdgePort, ProductName, Router +from gso.products.product_blocks.edge_port import EdgePortType, EncapsulationType +from gso.services.subscriptions import get_product_id_by_name +from gso.utils.types.interfaces import PhysicalPortCapacity +from test.services.conftest import MockedNetboxClient +from test.workflows import ( + assert_complete, + assert_lso_interaction_success, + extract_state, + run_workflow, +) + + +@pytest.fixture() +def _netbox_client_mock(): + # Mock NetboxClient methods + with ( + patch("gso.services.netbox_client.NetboxClient.get_device_by_name") as mock_get_device_by_name, + patch("gso.services.netbox_client.NetboxClient.get_available_interfaces") as mock_get_available_interfaces, + patch("gso.services.netbox_client.NetboxClient.get_available_services_lags") as mock_available_services_lags, + patch("gso.services.netbox_client.NetboxClient.create_interface") as mock_create_interface, + patch("gso.services.netbox_client.NetboxClient.attach_interface_to_lag") as mock_attach_interface_to_lag, + patch("gso.services.netbox_client.NetboxClient.reserve_interface") as mock_reserve_interface, + patch("gso.services.netbox_client.NetboxClient.allocate_interface") as mock_allocate_interface, + ): + mock_get_device_by_name.return_value = MockedNetboxClient().get_device_by_name() + mock_get_available_interfaces.return_value = MockedNetboxClient().get_available_interfaces() + mock_available_services_lags.return_value = MockedNetboxClient().get_available_services_lags() + mock_create_interface.return_value = MockedNetboxClient().create_interface() + mock_attach_interface_to_lag.return_value = MockedNetboxClient().attach_interface_to_lag() + mock_reserve_interface.return_value = MockedNetboxClient().reserve_interface() + mock_allocate_interface.return_value = MockedNetboxClient().allocate_interface() + + yield + + +@pytest.fixture() +def input_form_wizard_data(request, nokia_router_subscription_factory, partner_factory, faker): + create_edge_port_step = { + "tt_number": faker.tt_number(), + "node": nokia_router_subscription_factory(), + "partner": partner_factory(name="GAAR", email=faker.email())["partner_id"], + "service_type": EdgePortType.PUBLIC, + "geant_ga_id": faker.geant_gid(), + "enable_lacp": True, + "speed": PhysicalPortCapacity.HUNDRED_GIGABIT_PER_SECOND, + "encapsulation": EncapsulationType.DOT1Q, + "number_of_members": 2, + "minimum_links": 1, + } + create_edge_port_interface_step = { + "name": "lag-21", + "description": faker.sentence(), + "ae_members": [ + { + "interface_name": f"Interface{interface}", + "interface_description": faker.sentence(), + } + for interface in range(2) + ], + } + + return [ + create_edge_port_step, + create_edge_port_interface_step, + ] + + +@pytest.mark.workflow() +@patch("gso.workflows.edge_port.create_edge_port.lso_client.execute_playbook") +def test_successful_edge_port_creation( + mock_execute_playbook, + responses, + input_form_wizard_data, + faker, + _netbox_client_mock, # noqa: PT019 + data_config_filename: PathLike, + test_client, +): + product_id = get_product_id_by_name(ProductName.EDGE_PORT) + initial_data = [{"product": product_id}, *input_form_wizard_data] + result, process_stat, step_log = run_workflow("create_edge_port", initial_data) + + for _ in range(2): + result, step_log = assert_lso_interaction_success(result, process_stat, step_log) + + assert_complete(result) + + state = extract_state(result) + subscription_id = state["subscription_id"] + subscription = EdgePort.from_subscription(subscription_id) + + assert subscription.status == "active" + ga_id = input_form_wizard_data[0]["geant_ga_id"] + router_fqdn = Router.from_subscription(input_form_wizard_data[0]["node"]).router.router_fqdn + assert subscription.description == f"Edge Port lag-21 on {router_fqdn}, GAAR, {ga_id}" + assert len(subscription.edge_port.edge_port_ae_members) == 2 + assert mock_execute_playbook.call_count == 2 + + +def test_edge_port_creation_with_invalid_input( + input_form_wizard_data, + faker, + _netbox_client_mock, # noqa: PT019 + data_config_filename: PathLike, + test_client, +): + product_id = get_product_id_by_name(ProductName.EDGE_PORT) + # If the number of members is greater than 1 then LACP must be enabled. + input_form_wizard_data[0]["enable_lacp"] = False + initial_data = [{"product": product_id}, *input_form_wizard_data] + + with pytest.raises(FormValidationError) as error: + run_workflow("create_edge_port", initial_data) + + error = error.value.errors[0] + assert error["msg"] == "Number of members must be 1 if LACP is disabled." + assert error["loc"][0] == "__root__" diff --git a/test/workflows/edge_port/test_modify_edge_port.py b/test/workflows/edge_port/test_modify_edge_port.py new file mode 100644 index 0000000000000000000000000000000000000000..18d12a05788f2813ba022df726f807475ce8f113 --- /dev/null +++ b/test/workflows/edge_port/test_modify_edge_port.py @@ -0,0 +1,168 @@ +from unittest.mock import patch + +import pytest + +from gso.products import EdgePort +from gso.utils.types.interfaces import PhysicalPortCapacity +from test.workflows import ( + assert_complete, + assert_lso_interaction_success, + extract_state, + run_workflow, +) +from test.workflows.iptrunk.test_create_iptrunk import MockedNetboxClient + + +@pytest.fixture() +def input_form_wizard_data(request, faker, edge_port_subscription_factory, partner_factory): + subscription_id = edge_port_subscription_factory() + + return [ + {"subscription_id": subscription_id}, + { + "tt_number": faker.tt_number(), + "geant_ga_id": faker.geant_gid(), + "member_speed": PhysicalPortCapacity.FOUR_HUNDRED_GIGABIT_PER_SECOND, + "number_of_members": 1, + }, + { + "description": faker.sentence(), + "ae_members": [ + { + "interface_name": "Interface1", + "interface_description": faker.sentence(), + } + ], + }, + ] + + +@pytest.mark.workflow() +@patch("gso.workflows.edge_port.modify_edge_port.execute_playbook") +@patch("gso.services.netbox_client.NetboxClient.get_available_interfaces") +@patch("gso.services.netbox_client.NetboxClient.attach_interface_to_lag") +@patch("gso.services.netbox_client.NetboxClient.reserve_interface") +@patch("gso.services.netbox_client.NetboxClient.allocate_interface") +@patch("gso.services.netbox_client.NetboxClient.free_interface") +@patch("gso.services.netbox_client.NetboxClient.detach_interfaces_from_lag") +@patch("gso.services.netbox_client.NetboxClient.get_interface_by_name_and_device") +def test_modify_edge_port_with_changing_capacity( + mocked_get_interface_by_name_and_device, + mocked_detach_interfaces_from_lag, + mocked_free_interface, + mocked_allocate_interface, + mocked_reserve_interface, + mocked_attach_interface_to_lag, + mocked_get_available_interfaces, + mocked_execute_playbook, + input_form_wizard_data, + faker, + data_config_filename, +): + # Set up mock return values + mocked_netbox = MockedNetboxClient() + mocked_get_available_interfaces.return_value = mocked_netbox.get_available_interfaces() + mocked_attach_interface_to_lag.return_value = mocked_netbox.attach_interface_to_lag() + mocked_reserve_interface.return_value = mocked_netbox.reserve_interface() + mocked_allocate_interface.return_value = mocked_netbox.allocate_interface() + mocked_free_interface.return_value = mocked_netbox.free_interface() + mocked_detach_interfaces_from_lag.return_value = mocked_netbox.detach_interfaces_from_lag() + mocked_get_interface_by_name_and_device.side_effect = mocked_netbox.get_interface_by_name_and_device + + # Run workflow + result, process_stat, step_log = run_workflow("modify_edge_port", input_form_wizard_data) + + for _ in range(2): + result, step_log = assert_lso_interaction_success(result, process_stat, step_log) + + assert_complete(result) + # Validate the final state and subscription data + state = extract_state(result) + subscription_id = state["subscription_id"] + subscription = EdgePort.from_subscription(subscription_id) + + assert subscription.status == "active" + assert mocked_execute_playbook.call_count == 2 + + # The number of members have been changed from 2 to 1 + assert mocked_reserve_interface.call_count == 1 + assert mocked_attach_interface_to_lag.call_count == 1 + assert mocked_free_interface.call_count == 2 + assert mocked_detach_interfaces_from_lag.call_count == 1 + assert subscription.edge_port.edge_port_geant_ga_id == input_form_wizard_data[1]["geant_ga_id"] + assert len(subscription.edge_port.edge_port_ae_members) == 1 + + +@pytest.fixture() +def input_form_wizard_without_changing_capacity(request, faker, edge_port_subscription_factory, partner_factory): + subscription_id = edge_port_subscription_factory() + subscription = EdgePort.from_subscription(subscription_id) + + return [ + {"subscription_id": subscription_id}, + {"tt_number": faker.tt_number(), "geant_ga_id": faker.geant_gid()}, + { + "description": faker.sentence(), + "ae_members": [ + { + "interface_name": interface.interface_name, + "interface_description": interface.interface_description, + } + for interface in subscription.edge_port.edge_port_ae_members + ], + }, + ] + + +@pytest.mark.workflow() +@patch("gso.workflows.edge_port.modify_edge_port.execute_playbook") +@patch("gso.services.netbox_client.NetboxClient.get_available_interfaces") +@patch("gso.services.netbox_client.NetboxClient.attach_interface_to_lag") +@patch("gso.services.netbox_client.NetboxClient.reserve_interface") +@patch("gso.services.netbox_client.NetboxClient.allocate_interface") +@patch("gso.services.netbox_client.NetboxClient.free_interface") +@patch("gso.services.netbox_client.NetboxClient.detach_interfaces_from_lag") +@patch("gso.services.netbox_client.NetboxClient.get_interface_by_name_and_device") +def test_modify_edge_port_without_changing_capacity( + mocked_get_interface_by_name_and_device, + mocked_detach_interfaces_from_lag, + mocked_free_interface, + mocked_allocate_interface, + mocked_reserve_interface, + mocked_attach_interface_to_lag, + mocked_get_available_interfaces, + mocked_execute_playbook, + input_form_wizard_without_changing_capacity, + faker, + data_config_filename, +): + # Set up mock return values + mocked_netbox = MockedNetboxClient() + mocked_get_available_interfaces.return_value = mocked_netbox.get_available_interfaces() + mocked_attach_interface_to_lag.return_value = mocked_netbox.attach_interface_to_lag() + mocked_reserve_interface.return_value = mocked_netbox.reserve_interface() + mocked_allocate_interface.return_value = mocked_netbox.allocate_interface() + mocked_free_interface.return_value = mocked_netbox.free_interface() + mocked_detach_interfaces_from_lag.return_value = mocked_netbox.detach_interfaces_from_lag() + mocked_get_interface_by_name_and_device.side_effect = mocked_netbox.get_interface_by_name_and_device + + # Run workflow + result, _, _ = run_workflow("modify_edge_port", input_form_wizard_without_changing_capacity) + assert_complete(result) + + state = extract_state(result) + subscription_id = state["subscription_id"] + subscription = EdgePort.from_subscription(subscription_id) + + assert subscription.status == "active" + + # The capacity has not been changed so the following methods should not be called + assert mocked_execute_playbook.call_count == 0 + assert mocked_reserve_interface.call_count == 0 + assert mocked_attach_interface_to_lag.call_count == 0 + assert mocked_free_interface.call_count == 0 + assert mocked_detach_interfaces_from_lag.call_count == 0 + + assert subscription.edge_port.edge_port_geant_ga_id == input_form_wizard_without_changing_capacity[1]["geant_ga_id"] + assert len(subscription.edge_port.edge_port_ae_members) == 2 + assert subscription.edge_port.edge_port_description == input_form_wizard_without_changing_capacity[2]["description"] diff --git a/test/workflows/edge_port/test_terminate_edge_port.py b/test/workflows/edge_port/test_terminate_edge_port.py new file mode 100644 index 0000000000000000000000000000000000000000..cc7610c31ec23e9f53ae794d80f9bc699c51f64b --- /dev/null +++ b/test/workflows/edge_port/test_terminate_edge_port.py @@ -0,0 +1,56 @@ +from unittest.mock import patch + +import pytest + +from gso.products import EdgePort +from test.services.conftest import MockedNetboxClient +from test.workflows import ( + assert_complete, + assert_lso_interaction_success, + extract_state, + run_workflow, +) + + +@pytest.mark.workflow() +@patch("gso.workflows.edge_port.terminate_edge_port.execute_playbook") +@patch("gso.services.netbox_client.NetboxClient.delete_interface") +@patch("gso.services.netbox_client.NetboxClient.free_interface") +def test_successful_edge_port_termination( + mocked_free_interface, + mocked_delete_interface, + mock_execute_playbook, + edge_port_subscription_factory, + faker, + data_config_filename, +): + # Set up mock return values + subscription_id = edge_port_subscription_factory() + mocked_netbox = MockedNetboxClient() + mocked_delete_interface.return_value = mocked_netbox.delete_interface() + mocked_free_interface.return_value = mocked_netbox.free_interface() + + # Run workflow + initial_data = [ + {"subscription_id": subscription_id}, + { + "tt_number": faker.tt_number(), + }, + ] + result, process_stat, step_log = run_workflow("terminate_edge_port", initial_data) + + for _ in range(2): + result, step_log = assert_lso_interaction_success(result, process_stat, step_log) + + assert_complete(result) + + # Check NetboxClient calls + assert mocked_delete_interface.call_count == 1 # Delete the lag + assert mocked_free_interface.call_count == 2 # Free interfaces attached to the lag which is 2 + + state = extract_state(result) + subscription_id = state["subscription_id"] + subscription = EdgePort.from_subscription(subscription_id) + + assert subscription.status == "terminated" + assert mock_execute_playbook.call_count == 2 diff --git a/test/workflows/edge_port/test_validate_edge_port.py b/test/workflows/edge_port/test_validate_edge_port.py new file mode 100644 index 0000000000000000000000000000000000000000..261d5b24fbe7deadac6deb8d287b64200a83fce8 --- /dev/null +++ b/test/workflows/edge_port/test_validate_edge_port.py @@ -0,0 +1,62 @@ +from unittest.mock import patch + +import pytest + +from gso.products import EdgePort +from test.services.conftest import MockedNetboxClient +from test.workflows import ( + assert_complete, + assert_lso_success, + extract_state, + run_workflow, +) + + +@pytest.mark.workflow() +@patch("gso.workflows.edge_port.validate_edge_port.execute_playbook") +@patch("gso.services.netbox_client.NetboxClient.get_interface_by_name_and_device") +def test_validate_edge_port_success( + mock_get_interface_by_name_and_device, + mock_execute_playbook, + edge_port_subscription_factory, + faker, + data_config_filename, +): + subscription_id = edge_port_subscription_factory() + mock_get_interface_by_name_and_device.side_effect = [ + MockedNetboxClient.BaseMockObject( + name="iFace1", + module=MockedNetboxClient.BaseMockObject(display="display1"), + description=subscription_id, + enabled=True, + ), + MockedNetboxClient.BaseMockObject( + name="iFace2", + module=MockedNetboxClient.BaseMockObject(display="display2"), + description=subscription_id, + enabled=True, + ), + MockedNetboxClient.BaseMockObject( + name="Iface3", + module=MockedNetboxClient.BaseMockObject(display="display3"), + description=subscription_id, + enabled=True, + ), + ] + + # Run workflow + initial_data = [{"subscription_id": subscription_id}] + result, process_stat, step_log = run_workflow("validate_edge_port", initial_data) + + state = extract_state(result) + subscription_id = state["subscription_id"] + + for _ in range(1): + result, step_log = assert_lso_success(result, process_stat, step_log) + + assert_complete(result) + subscription = EdgePort.from_subscription(subscription_id) + assert subscription.status == "active" + assert mock_execute_playbook.call_count == 1 + # One time for getting the LAG and two times for getting the interfaces + assert mock_get_interface_by_name_and_device.call_count == 3 diff --git a/test/workflows/tasks/test_delete_partners.py b/test/workflows/tasks/test_delete_partners.py index b0964bdfc93be71624a8f947876cae956a4810b2..3fb36c8415016939ac22cad68f2e3590d5c9399f 100644 --- a/test/workflows/tasks/test_delete_partners.py +++ b/test/workflows/tasks/test_delete_partners.py @@ -6,7 +6,7 @@ from pydantic_forms.exceptions import FormValidationError from sqlalchemy import select from gso.services.partners import filter_partners_by_name -from test.fixtures import create_subscription_for_mapping +from test.fixtures.common_fixtures import create_subscription_for_mapping from test.workflows import assert_complete, run_workflow CORRECT_SUBSCRIPTION = str(uuid4())