Skip to content
Snippets Groups Projects
env.py 2.71 KiB
import logging

from alembic import context
from orchestrator.db.database import BaseModel
from orchestrator.settings import app_settings
from sqlalchemy import engine_from_config, pool, text

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

# Interpret the config file for Python logging.
# This line sets up loggers basically.
logger = logging.getLogger("alembic.env")

config.set_main_option("sqlalchemy.url", app_settings.DATABASE_URI)

target_metadata = BaseModel.metadata


def run_migrations_offline() -> None:
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url, target_metadata=target_metadata, literal_binds=True, dialect_opts={"paramstyle": "named"}
    )

    with context.begin_transaction():
        context.run_migrations()


def run_migrations_online() -> None:
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """

    # this callback is used to prevent an auto-migration from being generated
    # when there are no changes to the schema
    # reference: http://alembic.zzzcomputing.com/en/latest/cookbook.html
    def process_revision_directives(context, revision, directives):  # type: ignore[no-untyped-def]
        if getattr(config.cmd_opts, "autogenerate", False):
            script = directives[0]
            if script.upgrade_ops.is_empty():
                directives[:] = []
                logger.info("No changes in schema detected.")

    config_section = config.get_section(config.config_ini_section)
    if config_section is None:
        raise ValueError("Config section not found!")

    engine = engine_from_config(
        config_section, prefix="sqlalchemy.", poolclass=pool.NullPool
    )

    connection = engine.connect()
    context.configure(
        connection=connection,
        target_metadata=target_metadata,
        process_revision_directives=process_revision_directives,
        compare_type=True,
    )
    try:
        with context.begin_transaction():
            connection.execute(text("SELECT pg_advisory_xact_lock(1000);"))
            context.run_migrations()
    finally:
        connection.execute(text("SELECT pg_advisory_unlock(1000);"))
        connection.close()


if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()