-
Karel van Klink authored
This reverts commit 4bdcafb2
Karel van Klink authoredThis reverts commit 4bdcafb2
env.py 2.71 KiB
import logging
from alembic import context
from orchestrator.db.database import BaseModel
from orchestrator.settings import app_settings
from sqlalchemy import engine_from_config, pool, text
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
logger = logging.getLogger("alembic.env")
config.set_main_option("sqlalchemy.url", app_settings.DATABASE_URI)
target_metadata = BaseModel.metadata
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url, target_metadata=target_metadata, literal_binds=True, dialect_opts={"paramstyle": "named"}
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
# this callback is used to prevent an auto-migration from being generated
# when there are no changes to the schema
# reference: http://alembic.zzzcomputing.com/en/latest/cookbook.html
def process_revision_directives(context, revision, directives): # type: ignore[no-untyped-def]
if getattr(config.cmd_opts, "autogenerate", False):
script = directives[0]
if script.upgrade_ops.is_empty():
directives[:] = []
logger.info("No changes in schema detected.")
config_section = config.get_section(config.config_ini_section)
if config_section is None:
raise ValueError("Config section not found!")
engine = engine_from_config(
config_section, prefix="sqlalchemy.", poolclass=pool.NullPool
)
connection = engine.connect()
context.configure(
connection=connection,
target_metadata=target_metadata,
process_revision_directives=process_revision_directives,
compare_type=True,
)
try:
with context.begin_transaction():
connection.execute(text("SELECT pg_advisory_xact_lock(1000);"))
context.run_migrations()
finally:
connection.execute(text("SELECT pg_advisory_unlock(1000);"))
connection.close()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()