import contextlib import datetime import ipaddress import logging import os from pathlib import Path import orchestrator import pytest from alembic import command from alembic.config import Config from faker import Faker from faker.providers import BaseProvider from oauth2_lib.settings import oauth2lib_settings from orchestrator import app_settings from orchestrator.db import ( Database, ProductBlockTable, ProductTable, ResourceTypeTable, SubscriptionMetadataTable, WorkflowTable, db, ) from orchestrator.db.database import ENGINE_ARGUMENTS, SESSION_ARGUMENTS, BaseModel from orchestrator.domain import SUBSCRIPTION_MODEL_REGISTRY, SubscriptionModel from orchestrator.domain.base import ProductBlockModel from orchestrator.types import SubscriptionLifecycle, strEnum from sqlalchemy import create_engine, select, text from sqlalchemy.engine import make_url from sqlalchemy.orm import scoped_session, sessionmaker from starlette.testclient import TestClient from urllib3_mock import Responses from gso.main import init_gso_app from gso.services.partners import PartnerSchema, create_partner from gso.utils.types.interfaces import LAGMember, LAGMemberList from test.fixtures import ( # noqa: F401 bgp_session_subscription_factory, edge_port_subscription_factory, geant_ip_subscription_factory, iptrunk_side_subscription_factory, iptrunk_subscription_factory, nren_access_port_factory, office_router_subscription_factory, opengear_subscription_factory, router_subscription_factory, site_subscription_factory, service_binding_port_factory, super_pop_switch_subscription_factory, ) logging.getLogger("faker.factory").setLevel(logging.WARNING) class UseJuniperSide(strEnum): """Define on tests on which side to use Juniper router.""" NONE = "none" SIDE_A = "side_a" SIDE_B = "side_b" SIDE_BOTH = "side_both" class FakerProvider(BaseProvider): def ipv4_network(self, *, min_subnet=1, max_subnet=32) -> ipaddress.IPv4Network: subnet = str(self.generator.random_int(min=min_subnet, max=max_subnet)) ipv4 = self.generator.ipv4() interface = ipaddress.IPv4Interface(ipv4 + "/" + subnet) # Extra step for converting ``10.53.92.39/24`` to ``10.53.92.0/24`` network = interface.network.network_address return ipaddress.IPv4Network(str(network) + "/" + subnet) def ipv6_network(self, *, min_subnet=1, max_subnet=128) -> ipaddress.IPv6Network: subnet = str(self.generator.random_int(min=min_subnet, max=max_subnet)) ipv6 = self.generator.ipv6() interface = ipaddress.IPv6Interface(ipv6 + "/" + subnet) network = interface.network.network_address return ipaddress.IPv6Network(str(network) + "/" + subnet) def tt_number(self) -> str: random_date = self.generator.date(pattern="%Y%m%d") random_int = self.generator.random_int(min=10000000, max=99999999) return f"TT#{random_date}{random_int}" def geant_gid(self) -> str: return self.generator.numerify("GID-#####") def geant_sid(self) -> str: return self.generator.numerify("SID-#####") def site_name(self) -> str: site_name = "".join(self.generator.random_letter().upper() for _ in range(3)) if self.generator.boolean(): digit = self.generator.random_int(min=1, max=9) site_name += str(digit) return site_name def network_interface(self) -> str: return self.generator.numerify("ge-@#/@#/@#") def link_members_juniper(self) -> LAGMemberList[LAGMember]: iface_amount = self.generator.random_int(min=2, max=5) interface_names = [f"{prefix}{i}" for prefix in ["xe-1/0/", "ge-3/0/", "xe-2/1/"] for i in range(iface_amount)] return [ LAGMember(interface_name=interface_name, interface_description=self.generator.sentence()) for interface_name in interface_names ] def link_members_nokia(self) -> LAGMemberList[LAGMember]: iface_amount = self.generator.random_int(min=2, max=5) return [ LAGMember(interface_name=f"Interface{i}", interface_description=self.generator.sentence()) for i in range(iface_amount) ] def vlan_id(self) -> int: return self.generator.random_int(min=1, max=4095) @pytest.fixture(scope="session") def faker() -> Faker: fake = Faker() fake.add_provider(FakerProvider) return fake @pytest.fixture(scope="session", autouse=True) def data_config_filename() -> str: """Set an environment variable to the path of the example OSS parameters file.""" config_filename = "gso/oss-params-example.json" os.environ["OSS_PARAMS_FILENAME"] = config_filename yield config_filename del os.environ["OSS_PARAMS_FILENAME"] @pytest.fixture(scope="session") def db_uri(): """Provide a unique database URI for each pytest-xdist worker, or a default URI if running without xdist.""" worker_id = os.getenv("PYTEST_XDIST_WORKER") database_host = os.getenv("DATABASE_HOST", "localhost") if worker_id: return f"postgresql://nwa:nwa@{database_host}/gso-test-db_{worker_id}" return os.environ.get("DATABASE_URI_TEST", f"postgresql://nwa:nwa@{database_host}/gso-test-db") def run_migrations(db_uri: str) -> None: """Configure the alembic migration and run the migration on the database. :param str db_uri: The database uri configuration to run the migration on. :return: None """ path = Path(__file__).resolve().parent app_settings.DATABASE_URI = db_uri alembic_cfg = Config(file_=path / "../gso/alembic.ini") alembic_cfg.set_main_option("sqlalchemy.url", db_uri) alembic_cfg.set_main_option("script_location", str(path / "../gso/migrations")) version_locations = alembic_cfg.get_main_option("version_locations") alembic_cfg.set_main_option( "version_locations", f"{version_locations} {Path(orchestrator.__file__).parent}/migrations/versions/schema", ) command.upgrade(alembic_cfg, "heads") @pytest.fixture(scope="session") def _database(db_uri): """Create database and run migrations and cleanup after wards. :param db_uri: The database uri configuration to run the migration on. """ db.update(Database(db_uri)) url = make_url(db_uri) db_to_create = url.database url = url.set(database="postgres") engine = create_engine(url) with engine.connect() as conn: conn.execute(text("COMMIT;")) conn.execute( text("SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname=:db_name").bindparams( db_name=db_to_create, ), ) conn.execute(text(f'DROP DATABASE IF EXISTS "{db_to_create}";')) conn.execute(text("COMMIT;")) conn.execute(text(f'CREATE DATABASE "{db_to_create}";')) run_migrations(db_uri) db.wrapped_database.engine = create_engine(db_uri, **ENGINE_ARGUMENTS) try: yield finally: db.wrapped_database.engine.dispose() with engine.connect() as conn: conn.execute(text("COMMIT;")) # Terminate all connections to the database conn.execute( text(f"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname='{db_to_create}';"), # noqa: S608 ) conn.execute(text(f'DROP DATABASE IF EXISTS "{db_to_create}";')) @pytest.fixture(autouse=True) def _db_session(_database): """Ensure that tests are executed within a transactional scope that automatically rolls back after completion. This fixture facilitates a pattern known as 'transactional tests'. At the start, it establishes a connection and begins an overarching transaction. Any database operations performed within the test function—whether they commit or not happen within the context of this master transaction. From the perspective of the test function, it seems as though changes are getting committed to the database, enabling the tests to query and assert the persistence of data. Yet, once the test completes, this fixture intervenes to roll back the master transaction. This ensures a clean slate after each test, preventing tests from polluting the database state for subsequent tests. Benefits: - Each test runs in isolation with a pristine database state. - Avoids the overhead of recreating the database schema or re-seeding data between tests. :param _database: A fixture reference that initializes the database. """ with contextlib.closing(db.wrapped_database.engine.connect()) as test_connection: # Create a new session factory for this context. session_factory = sessionmaker(bind=test_connection, **SESSION_ARGUMENTS) scoped_session_instance = scoped_session( session_factory, scopefunc=db.wrapped_database._scopefunc, # noqa: SLF001 ) # Point the database session to this new scoped session. db.wrapped_database.session_factory = session_factory db.wrapped_database.scoped_session = scoped_session_instance # Set the query for the base model. BaseModel.set_query(scoped_session_instance.query_property()) transaction = test_connection.begin() try: yield finally: transaction.rollback() scoped_session_instance.remove() @pytest.fixture(scope="session", autouse=True) def fastapi_app(_database, db_uri): """Load the GSO FastAPI app for testing purposes. This implementation is as close as possible to the one present in orchestrator-core. """ oauth2lib_settings.OAUTH2_ACTIVE = False oauth2lib_settings.ENVIRONMENT_IGNORE_MUTATION_DISABLED = ["local", "TESTING"] app_settings.DATABASE_URI = db_uri return init_gso_app() @pytest.fixture(scope="session") def test_client(fastapi_app): return TestClient(fastapi_app) @pytest.fixture(scope="session") def partner_factory(faker): def _create_partner( name: str | None = None, email: str | None = None, ) -> dict: return create_partner( PartnerSchema( name=name or faker.company(), email=email or faker.email(), ) ) return _create_partner @pytest.fixture() def geant_partner(partner_factory): return partner_factory(name="GEANT-TEST", email="goat-test@geant.org") @pytest.fixture() def generic_resource_type_1(): rt = ResourceTypeTable(description="Resource Type one", resource_type="rt_1") db.session.add(rt) db.session.commit() return rt @pytest.fixture() def generic_resource_type_2(): rt = ResourceTypeTable(description="Resource Type two", resource_type="rt_2") db.session.add(rt) db.session.commit() return rt @pytest.fixture() def generic_resource_type_3(): rt = ResourceTypeTable(description="Resource Type three", resource_type="rt_3") db.session.add(rt) db.session.commit() return rt @pytest.fixture() def generic_product_block_1(generic_resource_type_1): pb = ProductBlockTable( name="PB_1", description="Generic Product Block 1", tag="PB1", status="active", resource_types=[generic_resource_type_1], created_at=datetime.datetime.fromisoformat("2023-05-24T00:00:00+00:00"), ) db.session.add(pb) db.session.commit() return pb @pytest.fixture() def generic_product_block_2(generic_resource_type_2, generic_resource_type_3): pb = ProductBlockTable( name="PB_2", description="Generic Product Block 2", tag="PB2", status="active", resource_types=[generic_resource_type_2, generic_resource_type_3], created_at=datetime.datetime.fromisoformat("2023-05-24T00:00:00+00:00"), ) db.session.add(pb) db.session.commit() return pb @pytest.fixture() def generic_product_block_3(generic_resource_type_2): pb = ProductBlockTable( name="PB_3", description="Generic Product Block 3", tag="PB3", status="active", resource_types=[generic_resource_type_2], created_at=datetime.datetime.fromisoformat("2023-05-24T00:00:00+00:00"), ) db.session.add(pb) db.session.commit() return pb @pytest.fixture() def generic_product_1(generic_product_block_1, generic_product_block_2): workflow = db.session.scalar(select(WorkflowTable).where(WorkflowTable.name == "modify_note")) p = ProductTable( name="Product 1", description="Generic Product One", product_type="Generic", status="active", tag="GEN1", product_blocks=[generic_product_block_1, generic_product_block_2], workflows=[workflow], ) db.session.add(p) db.session.commit() return p @pytest.fixture() def generic_product_2(generic_product_block_3): workflow = db.session.scalar(select(WorkflowTable).where(WorkflowTable.name == "modify_note")) p = ProductTable( name="Product 2", description="Generic Product Two", product_type="Generic", status="active", tag="GEN2", product_blocks=[generic_product_block_3], workflows=[workflow], ) db.session.add(p) db.session.commit() return p @pytest.fixture() def generic_product_3(generic_product_block_2): p = ProductTable( name="Product 3", description="Generic Product Three", product_type="Generic", status="active", tag="GEN3", product_blocks=[generic_product_block_2], ) db.session.add(p) db.session.commit() return p @pytest.fixture() def generic_product_block_type_1(generic_product_block_1): class GenericProductBlockOneInactive(ProductBlockModel, product_block_name="PB_1"): rt_1: str | None = None class GenericProductBlockOne(GenericProductBlockOneInactive, lifecycle=[SubscriptionLifecycle.ACTIVE]): rt_1: str return GenericProductBlockOneInactive, GenericProductBlockOne @pytest.fixture() def generic_product_block_type_2(generic_product_block_2): class GenericProductBlockTwoInactive(ProductBlockModel, product_block_name="PB_2"): rt_2: int | None = None rt_3: str | None = None class GenericProductBlockTwo(GenericProductBlockTwoInactive, lifecycle=[SubscriptionLifecycle.ACTIVE]): rt_2: int rt_3: str return GenericProductBlockTwoInactive, GenericProductBlockTwo @pytest.fixture() def generic_product_block_type_3(generic_product_block_3): class GenericProductBlockThreeInactive(ProductBlockModel, product_block_name="PB_3"): rt_2: int | None = None class GenericProductBlockThree(GenericProductBlockThreeInactive, lifecycle=[SubscriptionLifecycle.ACTIVE]): rt_2: int return GenericProductBlockThreeInactive, GenericProductBlockThree @pytest.fixture() def generic_product_type_1(generic_product_1, generic_product_block_type_1, generic_product_block_type_2): generic_product_block_one_inactive, generic_product_block_one = generic_product_block_type_1 generic_product_block_two_inactive, generic_product_block_two = generic_product_block_type_2 # Test Product domain models class GenericProductOneInactive(SubscriptionModel, is_base=True): pb_1: generic_product_block_one_inactive pb_2: generic_product_block_two_inactive class GenericProductOne(GenericProductOneInactive, lifecycle=[SubscriptionLifecycle.ACTIVE]): pb_1: generic_product_block_one pb_2: generic_product_block_two SUBSCRIPTION_MODEL_REGISTRY["Product 1"] = GenericProductOne yield GenericProductOneInactive, GenericProductOne del SUBSCRIPTION_MODEL_REGISTRY["Product 1"] @pytest.fixture() def generic_product_type_2(generic_product_2, generic_product_block_type_3): generic_product_block_three_inactive, generic_product_block_three = generic_product_block_type_3 class GenericProductTwoInactive(SubscriptionModel, is_base=True): pb_3: generic_product_block_three_inactive class GenericProductTwo(GenericProductTwoInactive, lifecycle=[SubscriptionLifecycle.ACTIVE]): pb_3: generic_product_block_three SUBSCRIPTION_MODEL_REGISTRY["Product 2"] = GenericProductTwo yield GenericProductTwoInactive, GenericProductTwo del SUBSCRIPTION_MODEL_REGISTRY["Product 2"] @pytest.fixture() def product_type_1_subscription_factory(generic_product_1, generic_product_type_1, geant_partner): def subscription_create( description="Generic Subscription One", start_date="2023-05-24T00:00:00+00:00", rt_1="Value1", rt_2=42, rt_3="Value2", ): generic_product_one_inactive, _ = generic_product_type_1 gen_subscription = generic_product_one_inactive.from_product_id( generic_product_1.product_id, customer_id=geant_partner["partner_id"], insync=True ) gen_subscription.pb_1.rt_1 = rt_1 gen_subscription.pb_2.rt_2 = rt_2 gen_subscription.pb_2.rt_3 = rt_3 gen_subscription = SubscriptionModel.from_other_lifecycle(gen_subscription, SubscriptionLifecycle.ACTIVE) gen_subscription.description = description gen_subscription.start_date = start_date gen_subscription.save() gen_subscription_metadata = SubscriptionMetadataTable() gen_subscription_metadata.subscription_id = gen_subscription.subscription_id gen_subscription_metadata.metadata_ = {"description": "Some metadata description"} db.session.add(gen_subscription_metadata) db.session.commit() return str(gen_subscription.subscription_id) return subscription_create @pytest.fixture() def product_type_1_subscriptions_factory(product_type_1_subscription_factory): def subscriptions_create(amount=1): return [ product_type_1_subscription_factory( description=f"Subscription {i}", start_date=( datetime.datetime.fromisoformat("2023-05-24T00:00:00+00:00") + datetime.timedelta(days=i) ).replace(tzinfo=datetime.UTC), ) for i in range(amount) ] return subscriptions_create @pytest.fixture() def generic_subscription_1(product_type_1_subscription_factory): return product_type_1_subscription_factory() @pytest.fixture(autouse=True) def responses(): responses_mock = Responses("requests.packages.urllib3") def _find_request(call): mock_url = responses_mock._find_match(call.request) # noqa: SLF001 if not mock_url: pytest.fail(f"Call not mocked: {call.request}") return mock_url def _to_tuple(url_mock): return url_mock["url"], url_mock["method"], url_mock["match_querystring"] with responses_mock: yield responses_mock mocked_urls = map(_to_tuple, responses_mock._urls) # noqa: SLF001 used_urls = map(_to_tuple, map(_find_request, responses_mock.calls)) not_used = set(mocked_urls) - set(used_urls) if not_used: pytest.fail(f"Found unused responses mocks: {not_used}", pytrace=False) @pytest.fixture(autouse=True) def _no_mail(monkeypatch): """Remove sending mails from all tests.""" monkeypatch.delattr("smtplib.SMTP")