Skip to content
Snippets Groups Projects
Select Git revision
  • 53363f40b963cf64e6c33f5f2850100d2dccc031
  • develop default
  • master protected
  • feature/frontend-tests
  • 0.106
  • 0.105
  • 0.104
  • 0.103
  • 0.102
  • 0.101
  • 0.100
  • 0.99
  • 0.98
  • 0.97
  • 0.96
  • 0.95
  • 0.94
  • 0.93
  • 0.92
  • 0.91
  • 0.90
  • 0.89
  • 0.88
  • 0.87
24 results

SurveyManagementComponent.tsx

Blame
  • conftest.py 18.96 KiB
    import contextlib
    import datetime
    import ipaddress
    import logging
    import os
    from pathlib import Path
    
    import orchestrator
    import pytest
    from alembic import command
    from alembic.config import Config
    from faker import Faker
    from faker.providers import BaseProvider
    from oauth2_lib.settings import oauth2lib_settings
    from orchestrator import app_settings
    from orchestrator.db import (
        Database,
        ProductBlockTable,
        ProductTable,
        ResourceTypeTable,
        SubscriptionMetadataTable,
        WorkflowTable,
        db,
    )
    from orchestrator.db.database import ENGINE_ARGUMENTS, SESSION_ARGUMENTS, BaseModel
    from orchestrator.domain import SUBSCRIPTION_MODEL_REGISTRY, SubscriptionModel
    from orchestrator.domain.base import ProductBlockModel
    from orchestrator.types import SubscriptionLifecycle, strEnum
    from sqlalchemy import create_engine, select, text
    from sqlalchemy.engine import make_url
    from sqlalchemy.orm import scoped_session, sessionmaker
    from starlette.testclient import TestClient
    from urllib3_mock import Responses
    
    from gso.main import init_gso_app
    from gso.services.partners import PartnerSchema, create_partner
    from gso.utils.types.interfaces import LAGMember, LAGMemberList
    from test.fixtures import (  # noqa: F401
        iptrunk_side_subscription_factory,
        iptrunk_subscription_factory,
        juniper_router_subscription_factory,
        nokia_router_subscription_factory,
        office_router_subscription_factory,
        opengear_subscription_factory,
        site_subscription_factory,
        super_pop_switch_subscription_factory,
        test_workflow,
    )
    
    logging.getLogger("faker.factory").setLevel(logging.WARNING)
    
    
    def pytest_collection_modifyitems(config, items):
        if bool(os.environ.get("SKIP_ALL_TESTS")):
            for item in items:
                item.add_marker(pytest.mark.skip(reason="Skipped due to SKIP_ALL_TESTS env variable"))
    
    
    class UseJuniperSide(strEnum):
        """Define on tests on which side to use Juniper router."""
    
        NONE = "none"
        SIDE_A = "side_a"
        SIDE_B = "side_b"
        SIDE_BOTH = "side_both"
    
    
    class FakerProvider(BaseProvider):
        def ipv4_network(self, *, min_subnet=1, max_subnet=32) -> ipaddress.IPv4Network:
            subnet = str(self.generator.random_int(min=min_subnet, max=max_subnet))
            ipv4 = self.generator.ipv4()
            interface = ipaddress.IPv4Interface(ipv4 + "/" + subnet)
            # Extra step for converting ``10.53.92.39/24`` to ``10.53.92.0/24``
            network = interface.network.network_address
    
            return ipaddress.IPv4Network(str(network) + "/" + subnet)
    
        def ipv6_network(self, *, min_subnet=1, max_subnet=128) -> ipaddress.IPv6Network:
            subnet = str(self.generator.random_int(min=min_subnet, max=max_subnet))
            ipv6 = self.generator.ipv6()
            interface = ipaddress.IPv6Interface(ipv6 + "/" + subnet)
            network = interface.network.network_address
    
            return ipaddress.IPv6Network(str(network) + "/" + subnet)
    
        def tt_number(self) -> str:
            random_date = self.generator.date(pattern="%Y%m%d")
            random_int = self.generator.random_int(min=10000000, max=99999999)
    
            return f"TT#{random_date}{random_int}"
    
        def geant_gid(self) -> str:
            return self.generator.numerify("GID-#####")
    
        def geant_sid(self) -> str:
            return self.generator.numerify("SID-#####")
    
        def site_name(self) -> str:
            site_name = "".join(self.generator.random_letter().upper() for _ in range(3))
    
            if self.generator.boolean():
                digit = self.generator.random_int(min=1, max=9)
                site_name += str(digit)
    
            return site_name
    
        def network_interface(self) -> str:
            return self.generator.numerify("ge-@#/@#/@#")
    
        def link_members_juniper(self) -> LAGMemberList[LAGMember]:
            iface_amount = self.generator.random_int(min=2, max=5)
            interface_names = [f"{prefix}{i}" for prefix in ["xe-1/0/", "ge-3/0/", "xe-2/1/"] for i in range(iface_amount)]
            return [
                LAGMember(interface_name=interface_name, interface_description=self.generator.sentence())
                for interface_name in interface_names
            ]
    
        def link_members_nokia(self) -> LAGMemberList[LAGMember]:
            iface_amount = self.generator.random_int(min=2, max=5)
            return [
                LAGMember(interface_name=f"Interface{i}", interface_description=self.generator.sentence())
                for i in range(iface_amount)
            ]
    
    
    @pytest.fixture(scope="session")
    def faker() -> Faker:
        fake = Faker()
        fake.add_provider(FakerProvider)
        return fake
    
    
    @pytest.fixture(scope="session", autouse=True)
    def data_config_filename() -> str:
        """Set an environment variable to the path of the example OSS parameters file."""
        config_filename = "gso/oss-params-example.json"
    
        os.environ["OSS_PARAMS_FILENAME"] = config_filename
        yield config_filename
        del os.environ["OSS_PARAMS_FILENAME"]
    
    
    @pytest.fixture(scope="session")
    def db_uri():
        """Provide a unique database URI for each pytest-xdist worker, or a default URI if running without xdist."""
        worker_id = os.getenv("PYTEST_XDIST_WORKER")
        database_host = os.getenv("DATABASE_HOST", "localhost")
    
        if worker_id:
            return f"postgresql://nwa:nwa@{database_host}/gso-test-db_{worker_id}"
    
        return os.environ.get("DATABASE_URI_TEST", f"postgresql://nwa:nwa@{database_host}/gso-test-db")
    
    
    def run_migrations(db_uri: str) -> None:
        """Configure the alembic migration and run the migration on the database.
    
        :param str db_uri: The database uri configuration to run the migration on.
        :return: None
        """
        path = Path(__file__).resolve().parent
        app_settings.DATABASE_URI = db_uri
        alembic_cfg = Config(file_=path / "../gso/alembic.ini")
        alembic_cfg.set_main_option("sqlalchemy.url", db_uri)
    
        alembic_cfg.set_main_option("script_location", str(path / "../gso/migrations"))
        version_locations = alembic_cfg.get_main_option("version_locations")
        alembic_cfg.set_main_option(
            "version_locations",
            f"{version_locations} {Path(orchestrator.__file__).parent}/migrations/versions/schema",
        )
    
        command.upgrade(alembic_cfg, "heads")
    
    
    @pytest.fixture(scope="session")
    def _database(db_uri):
        """Create database and run migrations and cleanup after wards.
    
        :param db_uri: The database uri configuration to run the migration on.
        """
        db.update(Database(db_uri))
        url = make_url(db_uri)
        db_to_create = url.database
        url = url.set(database="postgres")
    
        engine = create_engine(url)
        with engine.connect() as conn:
            conn.execute(text("COMMIT;"))
            conn.execute(
                text("SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname=:db_name").bindparams(
                    db_name=db_to_create,
                ),
            )
    
            conn.execute(text(f'DROP DATABASE IF EXISTS "{db_to_create}";'))
            conn.execute(text("COMMIT;"))
            conn.execute(text(f'CREATE DATABASE "{db_to_create}";'))
    
        run_migrations(db_uri)
        db.wrapped_database.engine = create_engine(db_uri, **ENGINE_ARGUMENTS)
    
        try:
            yield
        finally:
            db.wrapped_database.engine.dispose()
            with engine.connect() as conn:
                conn.execute(text("COMMIT;"))
                # Terminate all connections to the database
                conn.execute(
                    text(f"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname='{db_to_create}';"),  # noqa: S608
                )
                conn.execute(text(f'DROP DATABASE IF EXISTS "{db_to_create}";'))
    
    
    @pytest.fixture(autouse=True)
    def _db_session(_database):
        """Ensure that tests are executed within a transactional scope that automatically rolls back after completion.
    
        This fixture facilitates a pattern known as 'transactional tests'. At the start, it establishes a connection and
        begins an overarching transaction. Any database operations performed within the test function—whether they commit
        or not happen within the context of this master transaction.
    
        From the perspective of the test function, it seems as though changes are getting committed to the database,
        enabling the tests to query and assert the persistence of data. Yet, once the test completes, this fixture
        intervenes to roll back the master transaction. This ensures a clean slate after each test, preventing tests from
        polluting the database state for subsequent tests.
    
        Benefits:
        - Each test runs in isolation with a pristine database state.
        - Avoids the overhead of recreating the database schema or re-seeding data between tests.
    
        :param _database: A fixture reference that initializes the database.
        """
        with contextlib.closing(db.wrapped_database.engine.connect()) as test_connection:
            # Create a new session factory for this context.
            session_factory = sessionmaker(bind=test_connection, **SESSION_ARGUMENTS)
            scoped_session_instance = scoped_session(
                session_factory,
                scopefunc=db.wrapped_database._scopefunc,  # noqa: SLF001
            )
    
            # Point the database session to this new scoped session.
            db.wrapped_database.session_factory = session_factory
            db.wrapped_database.scoped_session = scoped_session_instance
    
            # Set the query for the base model.
            BaseModel.set_query(scoped_session_instance.query_property())
            transaction = test_connection.begin()
            try:
                yield
            finally:
                transaction.rollback()
                scoped_session_instance.remove()
    
    
    @pytest.fixture(scope="session", autouse=True)
    def fastapi_app(_database, db_uri):
        """Load the GSO FastAPI app for testing purposes.
    
        This implementation is as close as possible to the one present in orchestrator-core.
        """
        oauth2lib_settings.OAUTH2_ACTIVE = False
        oauth2lib_settings.ENVIRONMENT_IGNORE_MUTATION_DISABLED = ["local", "TESTING"]
        app_settings.DATABASE_URI = db_uri
        return init_gso_app()
    
    
    @pytest.fixture(scope="session")
    def test_client(fastapi_app):
        return TestClient(fastapi_app)
    
    
    @pytest.fixture(scope="session")
    def partner_factory():
        def _create_partner(
            name: str,
            email: str,
        ) -> dict:
            return create_partner(
                PartnerSchema(
                    name=name,
                    email=email,
                )
            )
    
        return _create_partner
    
    
    @pytest.fixture()
    def geant_partner(partner_factory):
        return partner_factory(name="GEANT-TEST", email="goat-test@geant.org")
    
    
    @pytest.fixture()
    def generic_resource_type_1():
        rt = ResourceTypeTable(description="Resource Type one", resource_type="rt_1")
        db.session.add(rt)
        db.session.commit()
    
        return rt
    
    
    @pytest.fixture()
    def generic_resource_type_2():
        rt = ResourceTypeTable(description="Resource Type two", resource_type="rt_2")
        db.session.add(rt)
        db.session.commit()
        return rt
    
    
    @pytest.fixture()
    def generic_resource_type_3():
        rt = ResourceTypeTable(description="Resource Type three", resource_type="rt_3")
        db.session.add(rt)
        db.session.commit()
    
        return rt
    
    
    @pytest.fixture()
    def generic_product_block_1(generic_resource_type_1):
        pb = ProductBlockTable(
            name="PB_1",
            description="Generic Product Block 1",
            tag="PB1",
            status="active",
            resource_types=[generic_resource_type_1],
            created_at=datetime.datetime.fromisoformat("2023-05-24T00:00:00+00:00"),
        )
        db.session.add(pb)
        db.session.commit()
        return pb
    
    
    @pytest.fixture()
    def generic_product_block_2(generic_resource_type_2, generic_resource_type_3):
        pb = ProductBlockTable(
            name="PB_2",
            description="Generic Product Block 2",
            tag="PB2",
            status="active",
            resource_types=[generic_resource_type_2, generic_resource_type_3],
            created_at=datetime.datetime.fromisoformat("2023-05-24T00:00:00+00:00"),
        )
        db.session.add(pb)
        db.session.commit()
        return pb
    
    
    @pytest.fixture()
    def generic_product_block_3(generic_resource_type_2):
        pb = ProductBlockTable(
            name="PB_3",
            description="Generic Product Block 3",
            tag="PB3",
            status="active",
            resource_types=[generic_resource_type_2],
            created_at=datetime.datetime.fromisoformat("2023-05-24T00:00:00+00:00"),
        )
        db.session.add(pb)
        db.session.commit()
        return pb
    
    
    @pytest.fixture()
    def generic_product_1(generic_product_block_1, generic_product_block_2):
        workflow = db.session.scalar(select(WorkflowTable).where(WorkflowTable.name == "modify_note"))
        p = ProductTable(
            name="Product 1",
            description="Generic Product One",
            product_type="Generic",
            status="active",
            tag="GEN1",
            product_blocks=[generic_product_block_1, generic_product_block_2],
            workflows=[workflow],
        )
        db.session.add(p)
        db.session.commit()
        return p
    
    
    @pytest.fixture()
    def generic_product_2(generic_product_block_3):
        workflow = db.session.scalar(select(WorkflowTable).where(WorkflowTable.name == "modify_note"))
    
        p = ProductTable(
            name="Product 2",
            description="Generic Product Two",
            product_type="Generic",
            status="active",
            tag="GEN2",
            product_blocks=[generic_product_block_3],
            workflows=[workflow],
        )
        db.session.add(p)
        db.session.commit()
        return p
    
    
    @pytest.fixture()
    def generic_product_3(generic_product_block_2):
        p = ProductTable(
            name="Product 3",
            description="Generic Product Three",
            product_type="Generic",
            status="active",
            tag="GEN3",
            product_blocks=[generic_product_block_2],
        )
        db.session.add(p)
        db.session.commit()
        return p
    
    
    @pytest.fixture()
    def generic_product_block_type_1(generic_product_block_1):
        class GenericProductBlockOneInactive(ProductBlockModel, product_block_name="PB_1"):
            rt_1: str | None = None
    
        class GenericProductBlockOne(GenericProductBlockOneInactive, lifecycle=[SubscriptionLifecycle.ACTIVE]):
            rt_1: str
    
        return GenericProductBlockOneInactive, GenericProductBlockOne
    
    
    @pytest.fixture()
    def generic_product_block_type_2(generic_product_block_2):
        class GenericProductBlockTwoInactive(ProductBlockModel, product_block_name="PB_2"):
            rt_2: int | None = None
            rt_3: str | None = None
    
        class GenericProductBlockTwo(GenericProductBlockTwoInactive, lifecycle=[SubscriptionLifecycle.ACTIVE]):
            rt_2: int
            rt_3: str
    
        return GenericProductBlockTwoInactive, GenericProductBlockTwo
    
    
    @pytest.fixture()
    def generic_product_block_type_3(generic_product_block_3):
        class GenericProductBlockThreeInactive(ProductBlockModel, product_block_name="PB_3"):
            rt_2: int | None = None
    
        class GenericProductBlockThree(GenericProductBlockThreeInactive, lifecycle=[SubscriptionLifecycle.ACTIVE]):
            rt_2: int
    
        return GenericProductBlockThreeInactive, GenericProductBlockThree
    
    
    @pytest.fixture()
    def generic_product_type_1(generic_product_1, generic_product_block_type_1, generic_product_block_type_2):
        generic_product_block_one_inactive, generic_product_block_one = generic_product_block_type_1
        generic_product_block_two_inactive, generic_product_block_two = generic_product_block_type_2
    
        # Test Product domain models
    
        class GenericProductOneInactive(SubscriptionModel, is_base=True):
            pb_1: generic_product_block_one_inactive
            pb_2: generic_product_block_two_inactive
    
        class GenericProductOne(GenericProductOneInactive, lifecycle=[SubscriptionLifecycle.ACTIVE]):
            pb_1: generic_product_block_one
            pb_2: generic_product_block_two
    
        SUBSCRIPTION_MODEL_REGISTRY["Product 1"] = GenericProductOne
    
        yield GenericProductOneInactive, GenericProductOne
    
        del SUBSCRIPTION_MODEL_REGISTRY["Product 1"]
    
    
    @pytest.fixture()
    def generic_product_type_2(generic_product_2, generic_product_block_type_3):
        generic_product_block_three_inactive, generic_product_block_three = generic_product_block_type_3
    
        class GenericProductTwoInactive(SubscriptionModel, is_base=True):
            pb_3: generic_product_block_three_inactive
    
        class GenericProductTwo(GenericProductTwoInactive, lifecycle=[SubscriptionLifecycle.ACTIVE]):
            pb_3: generic_product_block_three
    
        SUBSCRIPTION_MODEL_REGISTRY["Product 2"] = GenericProductTwo
    
        yield GenericProductTwoInactive, GenericProductTwo
    
        del SUBSCRIPTION_MODEL_REGISTRY["Product 2"]
    
    
    @pytest.fixture()
    def product_type_1_subscription_factory(generic_product_1, generic_product_type_1, geant_partner):
        def subscription_create(
            description="Generic Subscription One",
            start_date="2023-05-24T00:00:00+00:00",
            rt_1="Value1",
            rt_2=42,
            rt_3="Value2",
        ):
            generic_product_one_inactive, _ = generic_product_type_1
            gen_subscription = generic_product_one_inactive.from_product_id(
                generic_product_1.product_id, customer_id=geant_partner["partner_id"], insync=True
            )
            gen_subscription.pb_1.rt_1 = rt_1
            gen_subscription.pb_2.rt_2 = rt_2
            gen_subscription.pb_2.rt_3 = rt_3
            gen_subscription = SubscriptionModel.from_other_lifecycle(gen_subscription, SubscriptionLifecycle.ACTIVE)
            gen_subscription.description = description
            gen_subscription.start_date = start_date
            gen_subscription.save()
    
            gen_subscription_metadata = SubscriptionMetadataTable()
            gen_subscription_metadata.subscription_id = gen_subscription.subscription_id
            gen_subscription_metadata.metadata_ = {"description": "Some metadata description"}
            db.session.add(gen_subscription_metadata)
            db.session.commit()
            return str(gen_subscription.subscription_id)
    
        return subscription_create
    
    
    @pytest.fixture()
    def product_type_1_subscriptions_factory(product_type_1_subscription_factory):
        def subscriptions_create(amount=1):
            return [
                product_type_1_subscription_factory(
                    description=f"Subscription {i}",
                    start_date=(
                        datetime.datetime.fromisoformat("2023-05-24T00:00:00+00:00") + datetime.timedelta(days=i)
                    ).replace(tzinfo=datetime.UTC),
                )
                for i in range(amount)
            ]
    
        return subscriptions_create
    
    
    @pytest.fixture()
    def generic_subscription_1(product_type_1_subscription_factory):
        return product_type_1_subscription_factory()
    
    
    @pytest.fixture(autouse=True)
    def responses():
        responses_mock = Responses("requests.packages.urllib3")
    
        def _find_request(call):
            mock_url = responses_mock._find_match(call.request)  # noqa: SLF001
            if not mock_url:
                pytest.fail(f"Call not mocked: {call.request}")
            return mock_url
    
        def _to_tuple(url_mock):
            return url_mock["url"], url_mock["method"], url_mock["match_querystring"]
    
        with responses_mock:
            yield responses_mock
    
            mocked_urls = map(_to_tuple, responses_mock._urls)  # noqa: SLF001
            used_urls = map(_to_tuple, map(_find_request, responses_mock.calls))
            not_used = set(mocked_urls) - set(used_urls)
            if not_used:
                pytest.fail(f"Found unused responses mocks: {not_used}", pytrace=False)
    
    
    @pytest.fixture(autouse=True)
    def _no_mail(monkeypatch):
        """Remove sending mails from all tests."""
        monkeypatch.delattr("smtplib.SMTP")