Files
mockupAWS/alembic/versions/8c29fdcbbf85_create_scenarios_table.py
Luca Sacchi Ricciardi 6f03c33ab5 feat(database): setup alembic and create scenarios table
- Install alembic and asyncpg for database migrations
- Configure alembic for async SQLAlchemy 2.0
- Create initial migration for scenarios table:
  * UUID primary key with auto-generation
  * Status enum (draft, running, completed, archived)
  * JSONB tags with GIN index
  * Timestamps with auto-update trigger
  * Check constraints for name/region validation
  * Indexes on status, region, created_at
- Test database connection and migration

Task: DB-001, DB-002 complete
2026-04-07 13:48:05 +02:00

128 lines
3.9 KiB
Python

"""create scenarios table
Revision ID: 8c29fdcbbf85
Revises:
Create Date: 2026-04-07 13:45:17.403252
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = "8c29fdcbbf85"
down_revision: Union[str, Sequence[str], None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# Create uuid extension
op.execute('CREATE EXTENSION IF NOT EXISTS "uuid-ossp";')
# Create scenarios table - the enum type will be created automatically
op.create_table(
"scenarios",
sa.Column(
"id",
postgresql.UUID(as_uuid=True),
primary_key=True,
server_default=sa.text("uuid_generate_v4()"),
),
sa.Column("name", sa.String(255), nullable=False),
sa.Column("description", sa.Text(), nullable=True),
sa.Column("tags", postgresql.JSONB(), server_default="[]"),
sa.Column(
"status",
sa.Enum(
"draft", "running", "completed", "archived", name="scenario_status"
),
nullable=False,
server_default="draft",
),
sa.Column("region", sa.String(50), nullable=False, server_default="us-east-1"),
sa.Column(
"created_at",
sa.TIMESTAMP(timezone=True),
server_default=sa.text("NOW()"),
nullable=False,
),
sa.Column(
"updated_at",
sa.TIMESTAMP(timezone=True),
server_default=sa.text("NOW()"),
nullable=False,
),
sa.Column("completed_at", sa.TIMESTAMP(timezone=True), nullable=True),
sa.Column("started_at", sa.TIMESTAMP(timezone=True), nullable=True),
sa.Column("total_requests", sa.Integer(), server_default="0", nullable=False),
sa.Column(
"total_cost_estimate",
sa.DECIMAL(12, 6),
server_default="0.000000",
nullable=False,
),
)
# Add constraints
op.create_check_constraint(
"chk_name_not_empty",
"scenarios",
sa.func.char_length(sa.func.trim(sa.column("name"))) > 0,
)
op.create_check_constraint(
"chk_region_not_empty",
"scenarios",
sa.func.char_length(sa.func.trim(sa.column("region"))) > 0,
)
# Add indexes
op.create_index("idx_scenarios_status", "scenarios", ["status"])
op.create_index("idx_scenarios_region", "scenarios", ["region"])
op.create_index(
"idx_scenarios_created_at", "scenarios", ["created_at"], postgresql_using="brin"
)
op.create_index("idx_scenarios_tags", "scenarios", ["tags"], postgresql_using="gin")
# Create trigger for updated_at
op.execute("""
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ language 'plpgsql';
""")
op.execute("""
CREATE TRIGGER update_scenarios_updated_at
BEFORE UPDATE ON scenarios
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column();
""")
def downgrade() -> None:
"""Downgrade schema."""
# Drop trigger
op.execute("DROP TRIGGER IF EXISTS update_scenarios_updated_at ON scenarios;")
op.execute("DROP FUNCTION IF EXISTS update_updated_at_column();")
# Drop indexes
op.drop_index("idx_scenarios_tags", table_name="scenarios")
op.drop_index("idx_scenarios_created_at", table_name="scenarios")
op.drop_index("idx_scenarios_region", table_name="scenarios")
op.drop_index("idx_scenarios_status", table_name="scenarios")
# Drop table
op.drop_table("scenarios")
# Drop enum type
op.execute("DROP TYPE IF EXISTS scenario_status;")