feat(database): setup alembic and create scenarios table

- Install alembic and asyncpg for database migrations
- Configure alembic for async SQLAlchemy 2.0
- Create initial migration for scenarios table:
  * UUID primary key with auto-generation
  * Status enum (draft, running, completed, archived)
  * JSONB tags with GIN index
  * Timestamps with auto-update trigger
  * Check constraints for name/region validation
  * Indexes on status, region, created_at
- Test database connection and migration

Task: DB-001, DB-002 complete
This commit is contained in:
Luca Sacchi Ricciardi
2026-04-07 13:48:05 +02:00
parent 18ce380a6d
commit 6f03c33ab5
7 changed files with 649 additions and 0 deletions

1
alembic/README Normal file
View File

@@ -0,0 +1 @@
Generic single-database configuration.

89
alembic/env.py Normal file
View File

@@ -0,0 +1,89 @@
import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = None
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection: Connection) -> None:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
"""In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode."""
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

28
alembic/script.py.mako Normal file
View File

@@ -0,0 +1,28 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
"""Upgrade schema."""
${upgrades if upgrades else "pass"}
def downgrade() -> None:
"""Downgrade schema."""
${downgrades if downgrades else "pass"}

View File

@@ -0,0 +1,127 @@
"""create scenarios table
Revision ID: 8c29fdcbbf85
Revises:
Create Date: 2026-04-07 13:45:17.403252
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = "8c29fdcbbf85"
down_revision: Union[str, Sequence[str], None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# Create uuid extension
op.execute('CREATE EXTENSION IF NOT EXISTS "uuid-ossp";')
# Create scenarios table - the enum type will be created automatically
op.create_table(
"scenarios",
sa.Column(
"id",
postgresql.UUID(as_uuid=True),
primary_key=True,
server_default=sa.text("uuid_generate_v4()"),
),
sa.Column("name", sa.String(255), nullable=False),
sa.Column("description", sa.Text(), nullable=True),
sa.Column("tags", postgresql.JSONB(), server_default="[]"),
sa.Column(
"status",
sa.Enum(
"draft", "running", "completed", "archived", name="scenario_status"
),
nullable=False,
server_default="draft",
),
sa.Column("region", sa.String(50), nullable=False, server_default="us-east-1"),
sa.Column(
"created_at",
sa.TIMESTAMP(timezone=True),
server_default=sa.text("NOW()"),
nullable=False,
),
sa.Column(
"updated_at",
sa.TIMESTAMP(timezone=True),
server_default=sa.text("NOW()"),
nullable=False,
),
sa.Column("completed_at", sa.TIMESTAMP(timezone=True), nullable=True),
sa.Column("started_at", sa.TIMESTAMP(timezone=True), nullable=True),
sa.Column("total_requests", sa.Integer(), server_default="0", nullable=False),
sa.Column(
"total_cost_estimate",
sa.DECIMAL(12, 6),
server_default="0.000000",
nullable=False,
),
)
# Add constraints
op.create_check_constraint(
"chk_name_not_empty",
"scenarios",
sa.func.char_length(sa.func.trim(sa.column("name"))) > 0,
)
op.create_check_constraint(
"chk_region_not_empty",
"scenarios",
sa.func.char_length(sa.func.trim(sa.column("region"))) > 0,
)
# Add indexes
op.create_index("idx_scenarios_status", "scenarios", ["status"])
op.create_index("idx_scenarios_region", "scenarios", ["region"])
op.create_index(
"idx_scenarios_created_at", "scenarios", ["created_at"], postgresql_using="brin"
)
op.create_index("idx_scenarios_tags", "scenarios", ["tags"], postgresql_using="gin")
# Create trigger for updated_at
op.execute("""
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ language 'plpgsql';
""")
op.execute("""
CREATE TRIGGER update_scenarios_updated_at
BEFORE UPDATE ON scenarios
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column();
""")
def downgrade() -> None:
"""Downgrade schema."""
# Drop trigger
op.execute("DROP TRIGGER IF EXISTS update_scenarios_updated_at ON scenarios;")
op.execute("DROP FUNCTION IF EXISTS update_updated_at_column();")
# Drop indexes
op.drop_index("idx_scenarios_tags", table_name="scenarios")
op.drop_index("idx_scenarios_created_at", table_name="scenarios")
op.drop_index("idx_scenarios_region", table_name="scenarios")
op.drop_index("idx_scenarios_status", table_name="scenarios")
# Drop table
op.drop_table("scenarios")
# Drop enum type
op.execute("DROP TYPE IF EXISTS scenario_status;")