Files
mockupAWS/prompt/prompt-backend-dev-be001-be005.md
Luca Sacchi Ricciardi ebefc323c3 feat(backend): implement database layer with models, schemas and repositories
Complete backend core implementation (BE-001 to BE-005):

BE-001: Database Connection & Session Management
- Create src/core/database.py with async SQLAlchemy 2.0
- Configure engine with pool_size=20
- Implement get_db() FastAPI dependency

BE-002: SQLAlchemy Models (5 models)
- Base model with TimestampMixin
- Scenario: status enum, relationships, cost tracking
- ScenarioLog: message hash, PII detection, metrics
- ScenarioMetric: time-series with extra_data (JSONB)
- AwsPricing: service pricing with region support
- Report: format enum, file tracking, extra_data

BE-003: Pydantic Schemas
- Scenario: Create, Update, Response, List schemas
- Log: Ingest, Response schemas
- Metric: Summary, CostBreakdown, MetricsResponse
- Common: PaginatedResponse generic type

BE-004: Base Repository Pattern
- Generic BaseRepository[T] with CRUD operations
- Methods: get, get_multi, count, create, update, delete
- Dynamic filter support

BE-005: Scenario Repository
- Extends BaseRepository[Scenario]
- Specific methods: get_by_name, list_by_status, list_by_region
- Business methods: update_status, increment_total_requests, update_total_cost
- ScenarioStatus enum
- Singleton instance: scenario_repository

All models, schemas and repositories tested and working.

Tasks: BE-001, BE-002, BE-003, BE-004, BE-005 complete
2026-04-07 14:20:02 +02:00

21 KiB

🚀 @backend-dev - Backend Core Implementation

📊 Stato Progetto

Data: 2026-04-07
Fase: 1 - Database & Backend Core
Database: COMPLETATO

Cosa è pronto

  • Database PostgreSQL funzionante con tutte le tabelle
  • Alembic migrations complete (6 migrazioni)
  • AWS Pricing dati seedati (10 record)
  • Schema SQL documentato in export/architecture.md sezione 3.2

🎯 I tuoi task (Priorità P1)


BE-001: Database Connection & Session Management

Stima: M (1-2 ore)
Dipende da: DB-001 completato

Obiettivo

Configurare SQLAlchemy 2.0 async con gestione sessioni per FastAPI.

Files da creare/modificare

backend/src/
├── core/
│   ├── __init__.py
│   └── database.py          # NUOVO
└── config.py                # NUOVO (opzionale, può essere in core/)

Implementazione richiesta

1. backend/src/core/database.py

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
import os

# URL dal environment o default per dev
DATABASE_URL = os.getenv(
    "DATABASE_URL", 
    "postgresql+asyncpg://app:changeme@localhost:5432/mockupaws"
)

# Engine async
engine = create_async_engine(
    DATABASE_URL,
    echo=True,  # Set to False in production
    future=True,
    pool_size=20,
    max_overflow=0,
)

# Session factory
AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
    autocommit=False,
    autoflush=False,
)

# Base per i modelli
Base = declarative_base()

# Dependency per FastAPI
async def get_db() -> AsyncSession:
    """Dependency that provides a database session."""
    async with AsyncSessionLocal() as session:
        try:
            yield session
        finally:
            await session.close()

Criteri di accettazione

  • Connessione async a PostgreSQL funzionante
  • get_db() dependency pronta per FastAPI
  • Pool configurato (size: 20)
  • Test: connessione apre e chiude correttamente

BE-002: SQLAlchemy Models (5 modelli)

Stima: L (2-4 ore)
Dipende da: BE-001

Obiettivo

Creare tutti i modelli SQLAlchemy corrispondenti alle tabelle del database.

Files da creare

backend/src/models/
├── __init__.py              # Esporta tutti i modelli
├── base.py                  # Base model con TimestampMixin
├── scenario.py              # Modello Scenario
├── scenario_log.py          # Modello ScenarioLog
├── scenario_metric.py       # Modello ScenarioMetric
├── aws_pricing.py           # Modello AwsPricing
└── report.py                # Modello Report

Implementazione richiesta

1. backend/src/models/base.py

from datetime import datetime
from sqlalchemy import Column, DateTime
from sqlalchemy.orm import declarative_base
from sqlalchemy.sql import func

Base = declarative_base()

class TimestampMixin:
    """Mixin che aggiunge created_at e updated_at."""
    created_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
    updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False)

2. backend/src/models/scenario.py

import uuid
from sqlalchemy import Column, String, Text, Enum, DECIMAL, Integer, DateTime, ForeignKey
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.orm import relationship

from .base import Base, TimestampMixin

class Scenario(Base, TimestampMixin):
    __tablename__ = "scenarios"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    name = Column(String(255), nullable=False)
    description = Column(Text, nullable=True)
    tags = Column(JSONB, default=list)
    status = Column(Enum('draft', 'running', 'completed', 'archived', name='scenario_status'), 
                    nullable=False, default='draft')
    region = Column(String(50), nullable=False, default='us-east-1')
    completed_at = Column(DateTime(timezone=True), nullable=True)
    started_at = Column(DateTime(timezone=True), nullable=True)
    total_requests = Column(Integer, default=0, nullable=False)
    total_cost_estimate = Column(DECIMAL(12, 6), default=0.0, nullable=False)
    
    # Relationships
    logs = relationship("ScenarioLog", back_populates="scenario", cascade="all, delete-orphan")
    metrics = relationship("ScenarioMetric", back_populates="scenario", cascade="all, delete-orphan")
    reports = relationship("Report", back_populates="scenario", cascade="all, delete-orphan")

3. backend/src/models/scenario_log.py

import uuid
from sqlalchemy import Column, String, Integer, Boolean, DateTime, ForeignKey
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship

from .base import Base

class ScenarioLog(Base):
    __tablename__ = "scenario_logs"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    scenario_id = Column(UUID(as_uuid=True), ForeignKey("scenarios.id", ondelete="CASCADE"), nullable=False)
    received_at = Column(DateTime(timezone=True), nullable=False)
    message_hash = Column(String(64), nullable=False, index=True)
    message_preview = Column(String(500), nullable=True)
    source = Column(String(100), default='unknown', nullable=False)
    size_bytes = Column(Integer, default=0, nullable=False)
    has_pii = Column(Boolean, default=False, nullable=False)
    token_count = Column(Integer, default=0, nullable=False)
    sqs_blocks = Column(Integer, default=1, nullable=False)
    
    # Relationships
    scenario = relationship("Scenario", back_populates="logs")

4. backend/src/models/scenario_metric.py

import uuid
from sqlalchemy import Column, String, DECIMAL, DateTime, ForeignKey
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.orm import relationship

from .base import Base

class ScenarioMetric(Base):
    __tablename__ = "scenario_metrics"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    scenario_id = Column(UUID(as_uuid=True), ForeignKey("scenarios.id", ondelete="CASCADE"), nullable=False)
    timestamp = Column(DateTime(timezone=True), nullable=False)
    metric_type = Column(String(50), nullable=False)  # 'sqs', 'lambda', 'bedrock', 'safety'
    metric_name = Column(String(100), nullable=False)
    value = Column(DECIMAL(15, 6), default=0.0, nullable=False)
    unit = Column(String(20), nullable=False)  # 'count', 'bytes', 'tokens', 'usd'
    metadata = Column(JSONB, default=dict)
    
    # Relationships
    scenario = relationship("Scenario", back_populates="metrics")

5. backend/src/models/aws_pricing.py

import uuid
from sqlalchemy import Column, String, DECIMAL, Boolean, Date, Text
from sqlalchemy.dialects.postgresql import UUID

from .base import Base

class AwsPricing(Base):
    __tablename__ = "aws_pricing"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    service = Column(String(50), nullable=False, index=True)
    region = Column(String(50), nullable=False, index=True)
    tier = Column(String(50), default='standard', nullable=False)
    price_per_unit = Column(DECIMAL(15, 10), nullable=False)
    unit = Column(String(20), nullable=False)
    effective_from = Column(Date, nullable=False)
    effective_to = Column(Date, nullable=True)
    is_active = Column(Boolean, default=True, nullable=False)
    source_url = Column(String(500), nullable=True)
    description = Column(Text, nullable=True)

6. backend/src/models/report.py

import uuid
from sqlalchemy import Column, String, Integer, DateTime, ForeignKey, Enum
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.orm import relationship

from .base import Base, TimestampMixin

class Report(Base, TimestampMixin):
    __tablename__ = "reports"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    scenario_id = Column(UUID(as_uuid=True), ForeignKey("scenarios.id", ondelete="CASCADE"), nullable=False)
    format = Column(Enum('pdf', 'csv', name='report_format'), nullable=False)
    file_path = Column(String(500), nullable=False)
    file_size_bytes = Column(Integer, nullable=True)
    generated_by = Column(String(100), nullable=True)
    metadata = Column(JSONB, default=dict)
    
    # Relationships
    scenario = relationship("Scenario", back_populates="reports")

7. backend/src/models/__init__.py

from .base import Base
from .scenario import Scenario
from .scenario_log import ScenarioLog
from .scenario_metric import ScenarioMetric
from .aws_pricing import AwsPricing
from .report import Report

__all__ = ["Base", "Scenario", "ScenarioLog", "ScenarioMetric", "AwsPricing", "Report"]

Criteri di accettazione

  • Tutti i 5 modelli creati con campi corretti
  • Relationships definite (scenario → logs/metrics/reports)
  • Type hints corretti
  • Import funzionante da backend/src/models/__init__.py
  • Test: creazione istanze in memoria

BE-003: Pydantic Schemas

Stima: M (1-2 ore)
Dipende da: BE-002

Obiettivo

Creare Pydantic schemas per request/response validation.

Files da creare

backend/src/schemas/
├── __init__.py
├── scenario.py
├── log.py
├── metric.py
├── pricing.py
└── common.py              # PaginatedResponse, etc.

Implementazione richiesta

1. backend/src/schemas/scenario.py

from datetime import datetime
from decimal import Decimal
from typing import Optional, List
from uuid import UUID
from pydantic import BaseModel, Field, ConfigDict

class ScenarioBase(BaseModel):
    name: str = Field(..., min_length=1, max_length=255)
    description: Optional[str] = None
    tags: List[str] = Field(default_factory=list)
    region: str = Field(default="us-east-1", pattern=r"^[a-z]{2}-[a-z]+-[0-9]$")

class ScenarioCreate(ScenarioBase):
    pass

class ScenarioUpdate(BaseModel):
    name: Optional[str] = Field(None, min_length=1, max_length=255)
    description: Optional[str] = None
    tags: Optional[List[str]] = None

class ScenarioResponse(ScenarioBase):
    model_config = ConfigDict(from_attributes=True)
    
    id: UUID
    status: str
    created_at: datetime
    updated_at: datetime
    completed_at: Optional[datetime] = None
    started_at: Optional[datetime] = None
    total_requests: int
    total_cost_estimate: Decimal

class ScenarioList(BaseModel):
    items: List[ScenarioResponse]
    total: int
    page: int
    page_size: int

2. backend/src/schemas/log.py

from datetime import datetime
from typing import Optional
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field

class LogIngest(BaseModel):
    message: str = Field(..., min_length=1)
    source: str = Field(default="unknown", max_length=100)

class LogResponse(BaseModel):
    model_config = ConfigDict(from_attributes=True)
    
    id: UUID
    scenario_id: UUID
    received_at: datetime
    message_preview: Optional[str]
    source: str
    size_bytes: int
    has_pii: bool
    token_count: int
    sqs_blocks: int

3. backend/src/schemas/metric.py

from datetime import datetime
from decimal import Decimal
from typing import Dict, Any, Optional
from uuid import UUID
from pydantic import BaseModel, ConfigDict

class MetricSummary(BaseModel):
    total_requests: int
    total_cost_usd: Decimal
    sqs_blocks: int
    lambda_invocations: int
    llm_tokens: int
    pii_violations: int

class CostBreakdown(BaseModel):
    service: str
    cost_usd: Decimal
    percentage: float

class TimeseriesPoint(BaseModel):
    timestamp: datetime
    metric_type: str
    value: Decimal

class MetricsResponse(BaseModel):
    scenario_id: UUID
    summary: MetricSummary
    cost_breakdown: list[CostBreakdown]
    timeseries: list[TimeseriesPoint]

4. backend/src/schemas/common.py

from typing import Generic, TypeVar, List
from pydantic import BaseModel

T = TypeVar("T")

class PaginatedResponse(BaseModel, Generic[T]):
    items: List[T]
    total: int
    page: int
    page_size: int

Criteri di accettazione

  • Scenarios: Create, Update, Response, List schemas
  • Logs: Ingest, Response schemas
  • Metrics: Summary, Breakdown, Timeseries schemas
  • Validators Pydantic funzionanti (region pattern, min_length)
  • ConfigDict(from_attributes=True) per ORM mode

BE-004: Base Repository Pattern

Stima: M (1-2 ore)
Dipende da: BE-002

Obiettivo

Implementare Base Repository con operazioni CRUD generiche.

Files da creare

backend/src/repositories/
├── __init__.py
└── base.py

Implementazione richiesta

backend/src/repositories/base.py

from typing import Generic, TypeVar, Optional, List, Any
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, delete, update
from sqlalchemy.orm import joinedload

from backend.src.models.base import Base

ModelType = TypeVar("ModelType", bound=Base)

class BaseRepository(Generic[ModelType]):
    """Generic base repository with common CRUD operations."""
    
    def __init__(self, model: type[ModelType]):
        self.model = model
    
    async def get(self, db: AsyncSession, id: UUID) -> Optional[ModelType]:
        """Get a single record by ID."""
        result = await db.execute(select(self.model).where(self.model.id == id))
        return result.scalar_one_or_none()
    
    async def get_multi(
        self, 
        db: AsyncSession, 
        *, 
        skip: int = 0, 
        limit: int = 100,
        **filters
    ) -> List[ModelType]:
        """Get multiple records with optional filtering."""
        query = select(self.model)
        
        # Apply filters
        for key, value in filters.items():
            if hasattr(self.model, key) and value is not None:
                query = query.where(getattr(self.model, key) == value)
        
        query = query.offset(skip).limit(limit)
        result = await db.execute(query)
        return result.scalars().all()
    
    async def count(self, db: AsyncSession, **filters) -> int:
        """Count records with optional filtering."""
        from sqlalchemy import func
        
        query = select(func.count(self.model.id))
        
        for key, value in filters.items():
            if hasattr(self.model, key) and value is not None:
                query = query.where(getattr(self.model, key) == value)
        
        result = await db.execute(query)
        return result.scalar()
    
    async def create(self, db: AsyncSession, *, obj_in: dict) -> ModelType:
        """Create a new record."""
        db_obj = self.model(**obj_in)
        db.add(db_obj)
        await db.commit()
        await db.refresh(db_obj)
        return db_obj
    
    async def update(
        self, 
        db: AsyncSession, 
        *, 
        db_obj: ModelType, 
        obj_in: dict
    ) -> ModelType:
        """Update a record."""
        for field, value in obj_in.items():
            if hasattr(db_obj, field) and value is not None:
                setattr(db_obj, field, value)
        
        db.add(db_obj)
        await db.commit()
        await db.refresh(db_obj)
        return db_obj
    
    async def delete(self, db: AsyncSession, *, id: UUID) -> bool:
        """Delete a record by ID."""
        result = await db.execute(delete(self.model).where(self.model.id == id))
        await db.commit()
        return result.rowcount > 0

Criteri di accettazione

  • Classe BaseRepository con Generic[ModelType]
  • Metodi: get, get_multi, count, create, update, delete
  • Supporto filtri dinamici in get_multi
  • Type hints completi
  • Test unitario con mock session

BE-005: Scenario Repository

Stima: M (1-2 ore)
Dipende da: BE-004

Obiettivo

Implementare ScenarioRepository con metodi specifici.

Files da creare

backend/src/repositories/
├── __init__.py              # Aggiornare
└── scenario.py              # NUOVO

Implementazione richiesta

backend/src/repositories/scenario.py

from typing import Optional, List
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update
from enum import Enum

from backend.src.models.scenario import Scenario
from backend.src.repositories.base import BaseRepository

class ScenarioStatus(str, Enum):
    DRAFT = "draft"
    RUNNING = "running"
    COMPLETED = "completed"
    ARCHIVED = "archived"

class ScenarioRepository(BaseRepository[Scenario]):
    """Repository for Scenario model with specific methods."""
    
    def __init__(self):
        super().__init__(Scenario)
    
    async def get_by_name(self, db: AsyncSession, name: str) -> Optional[Scenario]:
        """Get scenario by name."""
        result = await db.execute(
            select(Scenario).where(Scenario.name == name)
        )
        return result.scalar_one_or_none()
    
    async def list_by_status(
        self, 
        db: AsyncSession, 
        status: ScenarioStatus,
        skip: int = 0,
        limit: int = 100
    ) -> List[Scenario]:
        """List scenarios by status."""
        return await self.get_multi(db, status=status, skip=skip, limit=limit)
    
    async def list_by_region(
        self,
        db: AsyncSession,
        region: str,
        skip: int = 0,
        limit: int = 100
    ) -> List[Scenario]:
        """List scenarios by region."""
        return await self.get_multi(db, region=region, skip=skip, limit=limit)
    
    async def update_status(
        self,
        db: AsyncSession,
        scenario_id: UUID,
        new_status: ScenarioStatus
    ) -> Optional[Scenario]:
        """Update scenario status."""
        result = await db.execute(
            update(Scenario)
            .where(Scenario.id == scenario_id)
            .values(status=new_status)
            .returning(Scenario)
        )
        await db.commit()
        return result.scalar_one_or_none()
    
    async def increment_total_requests(
        self,
        db: AsyncSession,
        scenario_id: UUID,
        increment: int = 1
    ) -> None:
        """Atomically increment total_requests counter."""
        from sqlalchemy import func
        
        await db.execute(
            update(Scenario)
            .where(Scenario.id == scenario_id)
            .values(total_requests=Scenario.total_requests + increment)
        )
        await db.commit()
    
    async def update_total_cost(
        self,
        db: AsyncSession,
        scenario_id: UUID,
        new_cost: float
    ) -> None:
        """Update total cost estimate."""
        await db.execute(
            update(Scenario)
            .where(Scenario.id == scenario_id)
            .values(total_cost_estimate=new_cost)
        )
        await db.commit()

# Singleton instance
scenario_repository = ScenarioRepository()

Aggiorna backend/src/repositories/__init__.py:

from backend.src.repositories.base import BaseRepository
from backend.src.repositories.scenario import ScenarioRepository, scenario_repository

__all__ = ["BaseRepository", "ScenarioRepository", "scenario_repository"]

Criteri di accettazione

  • Estende BaseRepository[Scenario]
  • Metodi specifici: get_by_name, list_by_status, list_by_region
  • Metodi business: update_status, increment_total_requests, update_total_cost
  • Singleton instance scenario_repository
  • Test: operazioni CRUD + metodi specifici

📋 Checklist Completamento

Prima di passare al prossimo task, verifica:

  • Tutti i file creati nella struttura corretta
  • Import funzionanti (nessun errore di importazione)
  • Type hints completi
  • Docstring per classi e metodi pubblici
  • Nessun codice hardcoded (usa environment variables per config)
  • Formattato con ruff/black

📖 Riferimenti

  1. Schema Database: export/architecture.md sezione 3.2
  2. API Specs: export/architecture.md sezione 4
  3. Configurazione Agente: .opencode/agents/backend-dev.md

🧪 Testing

Crea test di base in backend/tests/:

# tests/unit/test_repositories.py
import pytest
from backend.src.repositories.scenario import scenario_repository

@pytest.mark.asyncio
async def test_scenario_repository_create(db_session):
    repo = scenario_repository
    scenario = await repo.create(db_session, obj_in={
        "name": "Test Scenario",
        "region": "us-east-1"
    })
    assert scenario.name == "Test Scenario"
    assert scenario.status == "draft"

🚀 Comandi Utili

# Verifica modelli
cd /home/google/Sources/LucaSacchiNet/mockupAWS/backend
python -c "from src.models import Base; print('Models OK')"

# Test connessione DB
uv run python -c "from src.core.database import engine; print('DB OK')"

# Run tests
uv run pytest tests/ -v

@backend-dev: INIZIA ORA! 🚀

Procedi in ordine: BE-001 → BE-002 → BE-003 → BE-004 → BE-005

Domande? Riferiti a export/architecture.md per lo schema dettagliato.

Commit convenzioni:

  • feat: add database connection and session management
  • feat: add SQLAlchemy models for all entities
  • feat: add Pydantic schemas for request/response
  • feat: implement base repository pattern
  • feat: add scenario repository with specific methods

Buon lavoro! 💪