import contextlib import ipaddress import json import logging import os import socket import tempfile from pathlib import Path import orchestrator import pytest from alembic import command from alembic.config import Config from faker import Faker from faker.providers import BaseProvider from oauth2_lib.settings import oauth2lib_settings from orchestrator import app_settings from orchestrator.db import Database, db from orchestrator.db.database import ENGINE_ARGUMENTS, SESSION_ARGUMENTS, BaseModel from orchestrator.types import strEnum from sqlalchemy import create_engine, text from sqlalchemy.engine import make_url from sqlalchemy.orm import scoped_session, sessionmaker from starlette.testclient import TestClient from gso.main import init_gso_app from gso.utils.helpers import LAGMember logging.getLogger("faker.factory").setLevel(logging.WARNING) def pytest_collection_modifyitems(config, items): if bool(os.environ.get("SKIP_ALL_TESTS")): for item in items: item.add_marker(pytest.mark.skip(reason="Skipped due to SKIP_ALL_TESTS env variable")) class UseJuniperSide(strEnum): """Define on tests on which side to use Juniper router""" NONE = "none" SIDE_A = "side_a" SIDE_B = "side_b" SIDE_BOTH = "side_both" class FakerProvider(BaseProvider): def ipv4_network(self): ipv4 = self.generator.ipv4() interface = ipaddress.IPv4Interface(ipv4 + "/24") network = interface.network.network_address return ipaddress.IPv4Network(str(network) + "/24") def ipv6_network(self): ipv6 = self.generator.ipv6() interface = ipaddress.IPv6Interface(ipv6 + "/64") network = interface.network.network_address return ipaddress.IPv6Network(str(network) + "/64") def tt_number(self) -> str: random_date = self.generator.date(pattern="%Y%m%d") random_int = self.generator.random_int(min=10000000, max=99999999) return f"TT#{random_date}{random_int}" def geant_gid(self) -> str: return self.generator.numerify("GID-#####") def geant_sid(self) -> str: return self.generator.numerify("SID-#####") def site_name(self) -> str: site_name = "".join(self.generator.random_letter().upper() for _ in range(3)) if self.generator.boolean(): digit = self.generator.random_int(min=1, max=9) site_name += str(digit) return site_name def network_interface(self) -> str: return self.generator.numerify("ge-@#/@#/@#") def generate_junniper_members_list(self) -> list[LAGMember]: iface_amount = self.generator.random_int(min=2, max=5) interface_names = [f"{prefix}{i}" for prefix in ["xe-1/0/", "ge-3/0/", "xe-2/1/"] for i in range(iface_amount)] return [ LAGMember(interface_name=interface_name, interface_description=self.generator.sentence()) for interface_name in interface_names] def generate_nokia_members_list(self) -> list[LAGMember]: iface_amount = self.generator.random_int(min=2, max=5) return [ LAGMember(interface_name=f"Interface{i}", interface_description=self.generator.sentence()) for i in range(iface_amount)] @pytest.fixture(scope="session") def faker() -> Faker: fake = Faker() fake.add_provider(FakerProvider) return fake @pytest.fixture(scope="session") def configuration_data() -> dict: with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: s.bind(("", 0)) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) yield { "GENERAL": {"public_hostname": "https://gap.geant.org"}, "NETBOX": {"api": "https://127.0.0.1:8000", "token": "TOKEN"}, "IPAM": { "INFOBLOX": { "scheme": "https", "wapi_version": "2.12", "host": "10.0.0.1", "username": "robot-user", "password": "robot-user-password", }, "LO": { "V4": { "containers": [], "networks": ["10.255.255.0/26"], "mask": 32, }, "V6": { "containers": [], "networks": ["dead:beef::/80"], "mask": 128, }, "domain_name": ".geant.net", "dns_view": "default", }, "TRUNK": { "V4": { "containers": ["10.255.255.0/24", "10.255.254.0/24"], "networks": [], "mask": 31, }, "V6": { "containers": ["dead:beef::/64", "dead:beee::/64"], "networks": [], "mask": 126, }, "domain_name": ".trunk", "dns_view": "default", }, "GEANT_IP": { "V4": { "containers": ["10.255.255.0/24", "10.255.254.0/24"], "networks": [], "mask": 31, }, "V6": { "containers": ["dead:beef::/64", "dead:beee::/64"], "networks": [], "mask": 126, }, "domain_name": ".geantip", "dns_view": "default", }, "SI": { "V4": { "containers": ["10.255.253.128/25"], "networks": [], "mask": 31, }, "V6": {"containers": [], "networks": [], "mask": 126}, "domain_name": ".geantip", "dns_view": "default", }, "LT_IAS": { "V4": { "containers": ["10.255.255.0/24"], "networks": [], "mask": 31, }, "V6": { "containers": ["dead:beef:cc::/48"], "networks": [], "mask": 126, }, "domain_name": ".geantip", "dns_view": "default", }, }, "PROVISIONING_PROXY": { "scheme": "https", "api_base": "localhost:44444", "auth": "Bearer <token>", "api_version": 1123, }, "CELERY": { "broker_url": "redis://localhost:6379", "result_backend": "rpc://localhost:6379/0", "result_expires": 3600, }, } @pytest.fixture(scope="session", autouse=True) def data_config_filename(configuration_data) -> str: """Create a temporary file with configuration data and set an environment variable to its path.""" with tempfile.NamedTemporaryFile(mode="w+", delete=False) as f: json.dump(configuration_data, f, ensure_ascii=False) os.environ["OSS_PARAMS_FILENAME"] = f.name yield f.name del os.environ["OSS_PARAMS_FILENAME"] Path(f.name).unlink() @pytest.fixture(scope="session") def db_uri(): """Provide the database uri configuration to run the migration on.""" return os.environ.get("DATABASE_URI_TEST", "postgresql://nwa:nwa@localhost/gso-test-db") def run_migrations(db_uri: str) -> None: """Configure the alembic migration and run the migration on the database. Args: ---- db_uri: The database uri configuration to run the migration on. Returns: ------- None """ path = Path(__file__).resolve().parent app_settings.DATABASE_URI = db_uri alembic_cfg = Config(file_=path / "../gso/alembic.ini") alembic_cfg.set_main_option("sqlalchemy.url", db_uri) alembic_cfg.set_main_option("script_location", str(path / "../gso/migrations")) version_locations = alembic_cfg.get_main_option("version_locations") alembic_cfg.set_main_option( "version_locations", f"{version_locations} {Path(orchestrator.__file__).parent}/migrations/versions/schema", ) command.upgrade(alembic_cfg, "heads") @pytest.fixture(scope="session") def _database(db_uri): """Create database and run migrations and cleanup after wards. Args: ---- db_uri: The database uri configuration to run the migration on. """ db.update(Database(db_uri)) url = make_url(db_uri) db_to_create = url.database url = url.set(database="postgres") engine = create_engine(url) with engine.connect() as conn: conn.execute(text("COMMIT;")) conn.execute( text("SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname=:db_name").bindparams( db_name=db_to_create, ), ) conn.execute(text(f'DROP DATABASE IF EXISTS "{db_to_create}";')) conn.execute(text("COMMIT;")) conn.execute(text(f'CREATE DATABASE "{db_to_create}";')) run_migrations(db_uri) db.wrapped_database.engine = create_engine(db_uri, **ENGINE_ARGUMENTS) try: yield finally: db.wrapped_database.engine.dispose() with engine.connect() as conn: conn.execute(text("COMMIT;")) # Terminate all connections to the database conn.execute( text(f"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname='{db_to_create}';"), # noqa: S608 ) conn.execute(text(f'DROP DATABASE IF EXISTS "{db_to_create}";')) @pytest.fixture(autouse=True) def _db_session(_database): """Ensure that tests are executed within a transactional scope that automatically rolls back after completion. This fixture facilitates a pattern known as 'transactional tests'. At the start, it establishes a connection and begins an overarching transaction. Any database operations performed within the test function—whether they commit or not happen within the context of this master transaction. From the perspective of the test function, it seems as though changes are getting committed to the database, enabling the tests to query and assert the persistence of data. Yet, once the test completes, this fixture intervenes to roll back the master transaction. This ensures a clean slate after each test, preventing tests from polluting the database state for subsequent tests. Benefits: - Each test runs in isolation with a pristine database state. - Avoids the overhead of recreating the database schema or re-seeding data between tests. Args: ---- database: A fixture reference that initializes the database. """ with contextlib.closing(db.wrapped_database.engine.connect()) as test_connection: # Create a new session factory for this context. session_factory = sessionmaker(bind=test_connection, **SESSION_ARGUMENTS) scoped_session_instance = scoped_session( session_factory, scopefunc=db.wrapped_database._scopefunc, # noqa: SLF001 ) # Point the database session to this new scoped session. db.wrapped_database.session_factory = session_factory db.wrapped_database.scoped_session = scoped_session_instance # Set the query for the base model. BaseModel.set_query(scoped_session_instance.query_property()) transaction = test_connection.begin() try: yield finally: transaction.rollback() scoped_session_instance.remove() @pytest.fixture(scope="session", autouse=True) def fastapi_app(_database, db_uri): """Load the GSO FastAPI app for testing purposes. This implementation is as close as possible to the one present in orchestrator-core. """ oauth2lib_settings.OAUTH2_ACTIVE = False oauth2lib_settings.ENVIRONMENT_IGNORE_MUTATION_DISABLED = ["local", "TESTING"] app_settings.DATABASE_URI = db_uri return init_gso_app() @pytest.fixture(scope="session") def test_client(fastapi_app): return TestClient(fastapi_app)