"""create_archive_tables_v1_0_0 Data archiving strategy migration for mockupAWS v1.0.0 - Archive tables for old data - Partitioning by date - Archive tracking and statistics Revision ID: b2c3d4e5f6a7 Revises: a1b2c3d4e5f6 Create Date: 2026-04-07 21:00:00.000000 """ from typing import Sequence, Union from alembic import op import sqlalchemy as sa from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. revision: str = "b2c3d4e5f6a7" down_revision: Union[str, Sequence[str], None] = "a1b2c3d4e5f6" branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None def upgrade() -> None: """Upgrade schema with archive tables.""" # ========================================================================= # 1. CREATE ARCHIVE TABLES # ========================================================================= # Scenario logs archive (> 1 year) op.create_table( "scenario_logs_archive", sa.Column( "id", postgresql.UUID(as_uuid=True), primary_key=True, ), sa.Column( "scenario_id", postgresql.UUID(as_uuid=True), nullable=False, ), sa.Column( "received_at", sa.TIMESTAMP(timezone=True), nullable=False, ), sa.Column("message_hash", sa.String(64), nullable=False), sa.Column("message_preview", sa.String(500), nullable=True), sa.Column("source", sa.String(100), nullable=False), sa.Column("size_bytes", sa.Integer(), nullable=False), sa.Column("has_pii", sa.Boolean(), nullable=False), sa.Column("token_count", sa.Integer(), nullable=False), sa.Column("sqs_blocks", sa.Integer(), nullable=False), sa.Column( "archived_at", sa.TIMESTAMP(timezone=True), server_default=sa.text("NOW()"), nullable=False, ), sa.Column( "archive_batch_id", postgresql.UUID(as_uuid=True), nullable=True, ), # Note: Partitioning removed - DATE_TRUNC is not IMMUTABLE # For large datasets, consider adding a computed 'month' column ) # Create indexes for archive table op.create_index( "idx_logs_archive_scenario", "scenario_logs_archive", ["scenario_id", "received_at"], postgresql_using="btree", ) op.create_index( "idx_logs_archive_received", "scenario_logs_archive", ["received_at"], postgresql_using="brin", ) op.create_index( "idx_logs_archive_batch", "scenario_logs_archive", ["archive_batch_id"], postgresql_using="btree", ) # Scenario metrics archive (> 2 years) op.create_table( "scenario_metrics_archive", sa.Column( "id", postgresql.UUID(as_uuid=True), primary_key=True, ), sa.Column( "scenario_id", postgresql.UUID(as_uuid=True), nullable=False, ), sa.Column( "timestamp", sa.TIMESTAMP(timezone=True), nullable=False, ), sa.Column("metric_type", sa.String(50), nullable=False), sa.Column("metric_name", sa.String(100), nullable=False), sa.Column("value", sa.DECIMAL(15, 6), nullable=False), sa.Column("unit", sa.String(20), nullable=False), sa.Column("extra_data", postgresql.JSONB(), server_default="{}"), sa.Column( "archived_at", sa.TIMESTAMP(timezone=True), server_default=sa.text("NOW()"), nullable=False, ), sa.Column( "archive_batch_id", postgresql.UUID(as_uuid=True), nullable=True, ), # Pre-aggregated data for archived metrics sa.Column( "is_aggregated", sa.Boolean(), server_default="false", nullable=False, ), sa.Column( "aggregation_period", sa.String(20), nullable=True, # 'day', 'week', 'month' ), sa.Column( "sample_count", sa.Integer(), nullable=True, ), # Note: Partitioning removed - DATE_TRUNC is not IMMUTABLE ) # Create indexes for metrics archive op.create_index( "idx_metrics_archive_scenario", "scenario_metrics_archive", ["scenario_id", "timestamp"], postgresql_using="btree", ) op.create_index( "idx_metrics_archive_timestamp", "scenario_metrics_archive", ["timestamp"], postgresql_using="brin", ) op.create_index( "idx_metrics_archive_type", "scenario_metrics_archive", ["scenario_id", "metric_type", "timestamp"], postgresql_using="btree", ) # Reports archive (> 6 months - compressed metadata only) op.create_table( "reports_archive", sa.Column( "id", postgresql.UUID(as_uuid=True), primary_key=True, ), sa.Column( "scenario_id", postgresql.UUID(as_uuid=True), nullable=False, ), sa.Column("format", sa.String(10), nullable=False), sa.Column("file_path", sa.String(500), nullable=False), sa.Column("file_size_bytes", sa.Integer(), nullable=True), sa.Column("generated_by", sa.String(100), nullable=True), sa.Column("extra_data", postgresql.JSONB(), server_default="{}"), sa.Column( "created_at", sa.TIMESTAMP(timezone=True), nullable=False, ), sa.Column( "archived_at", sa.TIMESTAMP(timezone=True), server_default=sa.text("NOW()"), nullable=False, ), sa.Column( "s3_location", sa.String(500), nullable=True, ), sa.Column( "deleted_locally", sa.Boolean(), server_default="false", nullable=False, ), sa.Column( "archive_batch_id", postgresql.UUID(as_uuid=True), nullable=True, ), ) op.create_index( "idx_reports_archive_scenario", "reports_archive", ["scenario_id", "created_at"], postgresql_using="btree", ) op.create_index( "idx_reports_archive_created", "reports_archive", ["created_at"], postgresql_using="brin", ) # ========================================================================= # 2. CREATE ARCHIVE TRACKING TABLE # ========================================================================= op.create_table( "archive_jobs", sa.Column( "id", postgresql.UUID(as_uuid=True), primary_key=True, server_default=sa.text("uuid_generate_v4()"), ), sa.Column( "job_type", sa.Enum( "logs", "metrics", "reports", "cleanup", name="archive_job_type", ), nullable=False, ), sa.Column( "status", sa.Enum( "pending", "running", "completed", "failed", "partial", name="archive_job_status", ), server_default="pending", nullable=False, ), sa.Column( "started_at", sa.TIMESTAMP(timezone=True), nullable=True, ), sa.Column( "completed_at", sa.TIMESTAMP(timezone=True), nullable=True, ), sa.Column( "records_processed", sa.Integer(), server_default="0", nullable=False, ), sa.Column( "records_archived", sa.Integer(), server_default="0", nullable=False, ), sa.Column( "records_deleted", sa.Integer(), server_default="0", nullable=False, ), sa.Column( "bytes_archived", sa.BigInteger(), server_default="0", nullable=False, ), sa.Column( "error_message", sa.Text(), nullable=True, ), sa.Column( "created_at", sa.TIMESTAMP(timezone=True), server_default=sa.text("NOW()"), nullable=False, ), ) op.create_index( "idx_archive_jobs_status", "archive_jobs", ["status", "created_at"], postgresql_using="btree", ) op.create_index( "idx_archive_jobs_type", "archive_jobs", ["job_type", "created_at"], postgresql_using="btree", ) # ========================================================================= # 3. CREATE ARCHIVE STATISTICS VIEW # ========================================================================= op.execute(""" CREATE OR REPLACE VIEW v_archive_statistics AS SELECT 'logs' as archive_type, COUNT(*) as total_records, MIN(received_at) as oldest_record, MAX(received_at) as newest_record, MIN(archived_at) as oldest_archive, MAX(archived_at) as newest_archive, SUM(size_bytes) as total_bytes FROM scenario_logs_archive UNION ALL SELECT 'metrics' as archive_type, COUNT(*) as total_records, MIN(timestamp) as oldest_record, MAX(timestamp) as newest_record, MIN(archived_at) as oldest_archive, MAX(archived_at) as newest_archive, 0 as total_bytes -- metrics don't have size FROM scenario_metrics_archive UNION ALL SELECT 'reports' as archive_type, COUNT(*) as total_records, MIN(created_at) as oldest_record, MAX(created_at) as newest_record, MIN(archived_at) as oldest_archive, MAX(archived_at) as newest_archive, SUM(file_size_bytes) as total_bytes FROM reports_archive """) # ========================================================================= # 4. CREATE ARCHIVE POLICY CONFIGURATION TABLE # ========================================================================= op.create_table( "archive_policies", sa.Column( "id", sa.Integer(), primary_key=True, ), sa.Column( "table_name", sa.String(100), nullable=False, unique=True, ), sa.Column( "archive_after_days", sa.Integer(), nullable=False, ), sa.Column( "aggregate_before_archive", sa.Boolean(), server_default="false", nullable=False, ), sa.Column( "aggregation_period", sa.String(20), nullable=True, ), sa.Column( "compress_files", sa.Boolean(), server_default="false", nullable=False, ), sa.Column( "s3_bucket", sa.String(255), nullable=True, ), sa.Column( "s3_prefix", sa.String(255), nullable=True, ), sa.Column( "enabled", sa.Boolean(), server_default="true", nullable=False, ), sa.Column( "created_at", sa.TIMESTAMP(timezone=True), server_default=sa.text("NOW()"), nullable=False, ), sa.Column( "updated_at", sa.TIMESTAMP(timezone=True), server_default=sa.text("NOW()"), nullable=False, ), ) # Insert default policies op.execute(""" INSERT INTO archive_policies (id, table_name, archive_after_days, aggregate_before_archive, aggregation_period, compress_files, s3_bucket, s3_prefix, enabled) VALUES (1, 'scenario_logs', 365, false, null, false, null, null, true), (2, 'scenario_metrics', 730, true, 'day', false, null, null, true), (3, 'reports', 180, false, null, true, 'mockupaws-reports-archive', 'archived-reports/', true) """) # Create trigger for updated_at op.execute(""" CREATE OR REPLACE FUNCTION update_archive_policies_updated_at() RETURNS TRIGGER AS $$ BEGIN NEW.updated_at = NOW(); RETURN NEW; END; $$ LANGUAGE plpgsql """) op.execute(""" CREATE TRIGGER update_archive_policies_updated_at BEFORE UPDATE ON archive_policies FOR EACH ROW EXECUTE FUNCTION update_archive_policies_updated_at() """) # ========================================================================= # 5. CREATE UNION VIEW FOR TRANSPARENT ARCHIVE ACCESS # ========================================================================= # This view allows querying both live and archived logs transparently op.execute(""" CREATE OR REPLACE VIEW v_scenario_logs_all AS SELECT id, scenario_id, received_at, message_hash, message_preview, source, size_bytes, has_pii, token_count, sqs_blocks, NULL::timestamp with time zone as archived_at, false as is_archived FROM scenario_logs UNION ALL SELECT id, scenario_id, received_at, message_hash, message_preview, source, size_bytes, has_pii, token_count, sqs_blocks, archived_at, true as is_archived FROM scenario_logs_archive """) op.execute(""" CREATE OR REPLACE VIEW v_scenario_metrics_all AS SELECT id, scenario_id, timestamp, metric_type, metric_name, value, unit, extra_data, NULL::timestamp with time zone as archived_at, false as is_aggregated, false as is_archived FROM scenario_metrics UNION ALL SELECT id, scenario_id, timestamp, metric_type, metric_name, value, unit, extra_data, archived_at, is_aggregated, true as is_archived FROM scenario_metrics_archive """) def downgrade() -> None: """Downgrade schema.""" # Drop union views op.execute("DROP VIEW IF EXISTS v_scenario_metrics_all") op.execute("DROP VIEW IF EXISTS v_scenario_logs_all") # Drop trigger and function op.execute( "DROP TRIGGER IF EXISTS update_archive_policies_updated_at ON archive_policies" ) op.execute("DROP FUNCTION IF EXISTS update_archive_policies_updated_at()") # Drop statistics view op.execute("DROP VIEW IF EXISTS v_archive_statistics") # Drop archive tracking table op.drop_index("idx_archive_jobs_type", table_name="archive_jobs") op.drop_index("idx_archive_jobs_status", table_name="archive_jobs") op.drop_table("archive_jobs") # Drop enum types op.execute("DROP TYPE IF EXISTS archive_job_status") op.execute("DROP TYPE IF EXISTS archive_job_type") # Drop archive tables op.drop_index("idx_reports_archive_created", table_name="reports_archive") op.drop_index("idx_reports_archive_scenario", table_name="reports_archive") op.drop_table("reports_archive") op.drop_index("idx_metrics_archive_type", table_name="scenario_metrics_archive") op.drop_index( "idx_metrics_archive_timestamp", table_name="scenario_metrics_archive" ) op.drop_index("idx_metrics_archive_scenario", table_name="scenario_metrics_archive") op.drop_table("scenario_metrics_archive") op.drop_index("idx_logs_archive_batch", table_name="scenario_logs_archive") op.drop_index("idx_logs_archive_received", table_name="scenario_logs_archive") op.drop_index("idx_logs_archive_scenario", table_name="scenario_logs_archive") op.drop_table("scenario_logs_archive") # Drop policies table op.drop_table("archive_policies")