# πŸš€ @backend-dev - Backend Core Implementation ## πŸ“Š Stato Progetto **Data:** 2026-04-07 **Fase:** 1 - Database & Backend Core **Database:** βœ… COMPLETATO ### βœ… Cosa Γ¨ pronto - **Database PostgreSQL** funzionante con tutte le tabelle - **Alembic migrations** complete (6 migrazioni) - **AWS Pricing** dati seedati (10 record) - **Schema SQL** documentato in `export/architecture.md` sezione 3.2 ### 🎯 I tuoi task (PrioritΓ  P1) --- ## BE-001: Database Connection & Session Management **Stima:** M (1-2 ore) **Dipende da:** DB-001 completato βœ… ### Obiettivo Configurare SQLAlchemy 2.0 async con gestione sessioni per FastAPI. ### Files da creare/modificare ``` backend/src/ β”œβ”€β”€ core/ β”‚ β”œβ”€β”€ __init__.py β”‚ └── database.py # NUOVO └── config.py # NUOVO (opzionale, puΓ² essere in core/) ``` ### Implementazione richiesta **1. `backend/src/core/database.py`** ```python from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker from sqlalchemy.orm import declarative_base import os # URL dal environment o default per dev DATABASE_URL = os.getenv( "DATABASE_URL", "postgresql+asyncpg://app:changeme@localhost:5432/mockupaws" ) # Engine async engine = create_async_engine( DATABASE_URL, echo=True, # Set to False in production future=True, pool_size=20, max_overflow=0, ) # Session factory AsyncSessionLocal = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False, autocommit=False, autoflush=False, ) # Base per i modelli Base = declarative_base() # Dependency per FastAPI async def get_db() -> AsyncSession: """Dependency that provides a database session.""" async with AsyncSessionLocal() as session: try: yield session finally: await session.close() ``` ### Criteri di accettazione - [ ] Connessione async a PostgreSQL funzionante - [ ] `get_db()` dependency pronta per FastAPI - [ ] Pool configurato (size: 20) - [ ] Test: connessione apre e chiude correttamente --- ## BE-002: SQLAlchemy Models (5 modelli) **Stima:** L (2-4 ore) **Dipende da:** BE-001 ### Obiettivo Creare tutti i modelli SQLAlchemy corrispondenti alle tabelle del database. ### Files da creare ``` backend/src/models/ β”œβ”€β”€ __init__.py # Esporta tutti i modelli β”œβ”€β”€ base.py # Base model con TimestampMixin β”œβ”€β”€ scenario.py # Modello Scenario β”œβ”€β”€ scenario_log.py # Modello ScenarioLog β”œβ”€β”€ scenario_metric.py # Modello ScenarioMetric β”œβ”€β”€ aws_pricing.py # Modello AwsPricing └── report.py # Modello Report ``` ### Implementazione richiesta **1. `backend/src/models/base.py`** ```python from datetime import datetime from sqlalchemy import Column, DateTime from sqlalchemy.orm import declarative_base from sqlalchemy.sql import func Base = declarative_base() class TimestampMixin: """Mixin che aggiunge created_at e updated_at.""" created_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False) updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False) ``` **2. `backend/src/models/scenario.py`** ```python import uuid from sqlalchemy import Column, String, Text, Enum, DECIMAL, Integer, DateTime, ForeignKey from sqlalchemy.dialects.postgresql import UUID, JSONB from sqlalchemy.orm import relationship from .base import Base, TimestampMixin class Scenario(Base, TimestampMixin): __tablename__ = "scenarios" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) name = Column(String(255), nullable=False) description = Column(Text, nullable=True) tags = Column(JSONB, default=list) status = Column(Enum('draft', 'running', 'completed', 'archived', name='scenario_status'), nullable=False, default='draft') region = Column(String(50), nullable=False, default='us-east-1') completed_at = Column(DateTime(timezone=True), nullable=True) started_at = Column(DateTime(timezone=True), nullable=True) total_requests = Column(Integer, default=0, nullable=False) total_cost_estimate = Column(DECIMAL(12, 6), default=0.0, nullable=False) # Relationships logs = relationship("ScenarioLog", back_populates="scenario", cascade="all, delete-orphan") metrics = relationship("ScenarioMetric", back_populates="scenario", cascade="all, delete-orphan") reports = relationship("Report", back_populates="scenario", cascade="all, delete-orphan") ``` **3. `backend/src/models/scenario_log.py`** ```python import uuid from sqlalchemy import Column, String, Integer, Boolean, DateTime, ForeignKey from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.orm import relationship from .base import Base class ScenarioLog(Base): __tablename__ = "scenario_logs" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) scenario_id = Column(UUID(as_uuid=True), ForeignKey("scenarios.id", ondelete="CASCADE"), nullable=False) received_at = Column(DateTime(timezone=True), nullable=False) message_hash = Column(String(64), nullable=False, index=True) message_preview = Column(String(500), nullable=True) source = Column(String(100), default='unknown', nullable=False) size_bytes = Column(Integer, default=0, nullable=False) has_pii = Column(Boolean, default=False, nullable=False) token_count = Column(Integer, default=0, nullable=False) sqs_blocks = Column(Integer, default=1, nullable=False) # Relationships scenario = relationship("Scenario", back_populates="logs") ``` **4. `backend/src/models/scenario_metric.py`** ```python import uuid from sqlalchemy import Column, String, DECIMAL, DateTime, ForeignKey from sqlalchemy.dialects.postgresql import UUID, JSONB from sqlalchemy.orm import relationship from .base import Base class ScenarioMetric(Base): __tablename__ = "scenario_metrics" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) scenario_id = Column(UUID(as_uuid=True), ForeignKey("scenarios.id", ondelete="CASCADE"), nullable=False) timestamp = Column(DateTime(timezone=True), nullable=False) metric_type = Column(String(50), nullable=False) # 'sqs', 'lambda', 'bedrock', 'safety' metric_name = Column(String(100), nullable=False) value = Column(DECIMAL(15, 6), default=0.0, nullable=False) unit = Column(String(20), nullable=False) # 'count', 'bytes', 'tokens', 'usd' metadata = Column(JSONB, default=dict) # Relationships scenario = relationship("Scenario", back_populates="metrics") ``` **5. `backend/src/models/aws_pricing.py`** ```python import uuid from sqlalchemy import Column, String, DECIMAL, Boolean, Date, Text from sqlalchemy.dialects.postgresql import UUID from .base import Base class AwsPricing(Base): __tablename__ = "aws_pricing" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) service = Column(String(50), nullable=False, index=True) region = Column(String(50), nullable=False, index=True) tier = Column(String(50), default='standard', nullable=False) price_per_unit = Column(DECIMAL(15, 10), nullable=False) unit = Column(String(20), nullable=False) effective_from = Column(Date, nullable=False) effective_to = Column(Date, nullable=True) is_active = Column(Boolean, default=True, nullable=False) source_url = Column(String(500), nullable=True) description = Column(Text, nullable=True) ``` **6. `backend/src/models/report.py`** ```python import uuid from sqlalchemy import Column, String, Integer, DateTime, ForeignKey, Enum from sqlalchemy.dialects.postgresql import UUID, JSONB from sqlalchemy.orm import relationship from .base import Base, TimestampMixin class Report(Base, TimestampMixin): __tablename__ = "reports" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) scenario_id = Column(UUID(as_uuid=True), ForeignKey("scenarios.id", ondelete="CASCADE"), nullable=False) format = Column(Enum('pdf', 'csv', name='report_format'), nullable=False) file_path = Column(String(500), nullable=False) file_size_bytes = Column(Integer, nullable=True) generated_by = Column(String(100), nullable=True) metadata = Column(JSONB, default=dict) # Relationships scenario = relationship("Scenario", back_populates="reports") ``` **7. `backend/src/models/__init__.py`** ```python from .base import Base from .scenario import Scenario from .scenario_log import ScenarioLog from .scenario_metric import ScenarioMetric from .aws_pricing import AwsPricing from .report import Report __all__ = ["Base", "Scenario", "ScenarioLog", "ScenarioMetric", "AwsPricing", "Report"] ``` ### Criteri di accettazione - [ ] Tutti i 5 modelli creati con campi corretti - [ ] Relationships definite (scenario β†’ logs/metrics/reports) - [ ] Type hints corretti - [ ] Import funzionante da `backend/src/models/__init__.py` - [ ] Test: creazione istanze in memoria --- ## BE-003: Pydantic Schemas **Stima:** M (1-2 ore) **Dipende da:** BE-002 ### Obiettivo Creare Pydantic schemas per request/response validation. ### Files da creare ``` backend/src/schemas/ β”œβ”€β”€ __init__.py β”œβ”€β”€ scenario.py β”œβ”€β”€ log.py β”œβ”€β”€ metric.py β”œβ”€β”€ pricing.py └── common.py # PaginatedResponse, etc. ``` ### Implementazione richiesta **1. `backend/src/schemas/scenario.py`** ```python from datetime import datetime from decimal import Decimal from typing import Optional, List from uuid import UUID from pydantic import BaseModel, Field, ConfigDict class ScenarioBase(BaseModel): name: str = Field(..., min_length=1, max_length=255) description: Optional[str] = None tags: List[str] = Field(default_factory=list) region: str = Field(default="us-east-1", pattern=r"^[a-z]{2}-[a-z]+-[0-9]$") class ScenarioCreate(ScenarioBase): pass class ScenarioUpdate(BaseModel): name: Optional[str] = Field(None, min_length=1, max_length=255) description: Optional[str] = None tags: Optional[List[str]] = None class ScenarioResponse(ScenarioBase): model_config = ConfigDict(from_attributes=True) id: UUID status: str created_at: datetime updated_at: datetime completed_at: Optional[datetime] = None started_at: Optional[datetime] = None total_requests: int total_cost_estimate: Decimal class ScenarioList(BaseModel): items: List[ScenarioResponse] total: int page: int page_size: int ``` **2. `backend/src/schemas/log.py`** ```python from datetime import datetime from typing import Optional from uuid import UUID from pydantic import BaseModel, ConfigDict, Field class LogIngest(BaseModel): message: str = Field(..., min_length=1) source: str = Field(default="unknown", max_length=100) class LogResponse(BaseModel): model_config = ConfigDict(from_attributes=True) id: UUID scenario_id: UUID received_at: datetime message_preview: Optional[str] source: str size_bytes: int has_pii: bool token_count: int sqs_blocks: int ``` **3. `backend/src/schemas/metric.py`** ```python from datetime import datetime from decimal import Decimal from typing import Dict, Any, Optional from uuid import UUID from pydantic import BaseModel, ConfigDict class MetricSummary(BaseModel): total_requests: int total_cost_usd: Decimal sqs_blocks: int lambda_invocations: int llm_tokens: int pii_violations: int class CostBreakdown(BaseModel): service: str cost_usd: Decimal percentage: float class TimeseriesPoint(BaseModel): timestamp: datetime metric_type: str value: Decimal class MetricsResponse(BaseModel): scenario_id: UUID summary: MetricSummary cost_breakdown: list[CostBreakdown] timeseries: list[TimeseriesPoint] ``` **4. `backend/src/schemas/common.py`** ```python from typing import Generic, TypeVar, List from pydantic import BaseModel T = TypeVar("T") class PaginatedResponse(BaseModel, Generic[T]): items: List[T] total: int page: int page_size: int ``` ### Criteri di accettazione - [ ] Scenarios: Create, Update, Response, List schemas - [ ] Logs: Ingest, Response schemas - [ ] Metrics: Summary, Breakdown, Timeseries schemas - [ ] Validators Pydantic funzionanti (region pattern, min_length) - [ ] `ConfigDict(from_attributes=True)` per ORM mode --- ## BE-004: Base Repository Pattern **Stima:** M (1-2 ore) **Dipende da:** BE-002 ### Obiettivo Implementare Base Repository con operazioni CRUD generiche. ### Files da creare ``` backend/src/repositories/ β”œβ”€β”€ __init__.py └── base.py ``` ### Implementazione richiesta **`backend/src/repositories/base.py`** ```python from typing import Generic, TypeVar, Optional, List, Any from uuid import UUID from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, delete, update from sqlalchemy.orm import joinedload from backend.src.models.base import Base ModelType = TypeVar("ModelType", bound=Base) class BaseRepository(Generic[ModelType]): """Generic base repository with common CRUD operations.""" def __init__(self, model: type[ModelType]): self.model = model async def get(self, db: AsyncSession, id: UUID) -> Optional[ModelType]: """Get a single record by ID.""" result = await db.execute(select(self.model).where(self.model.id == id)) return result.scalar_one_or_none() async def get_multi( self, db: AsyncSession, *, skip: int = 0, limit: int = 100, **filters ) -> List[ModelType]: """Get multiple records with optional filtering.""" query = select(self.model) # Apply filters for key, value in filters.items(): if hasattr(self.model, key) and value is not None: query = query.where(getattr(self.model, key) == value) query = query.offset(skip).limit(limit) result = await db.execute(query) return result.scalars().all() async def count(self, db: AsyncSession, **filters) -> int: """Count records with optional filtering.""" from sqlalchemy import func query = select(func.count(self.model.id)) for key, value in filters.items(): if hasattr(self.model, key) and value is not None: query = query.where(getattr(self.model, key) == value) result = await db.execute(query) return result.scalar() async def create(self, db: AsyncSession, *, obj_in: dict) -> ModelType: """Create a new record.""" db_obj = self.model(**obj_in) db.add(db_obj) await db.commit() await db.refresh(db_obj) return db_obj async def update( self, db: AsyncSession, *, db_obj: ModelType, obj_in: dict ) -> ModelType: """Update a record.""" for field, value in obj_in.items(): if hasattr(db_obj, field) and value is not None: setattr(db_obj, field, value) db.add(db_obj) await db.commit() await db.refresh(db_obj) return db_obj async def delete(self, db: AsyncSession, *, id: UUID) -> bool: """Delete a record by ID.""" result = await db.execute(delete(self.model).where(self.model.id == id)) await db.commit() return result.rowcount > 0 ``` ### Criteri di accettazione - [ ] Classe `BaseRepository` con Generic[ModelType] - [ ] Metodi: get, get_multi, count, create, update, delete - [ ] Supporto filtri dinamici in get_multi - [ ] Type hints completi - [ ] Test unitario con mock session --- ## BE-005: Scenario Repository **Stima:** M (1-2 ore) **Dipende da:** BE-004 ### Obiettivo Implementare ScenarioRepository con metodi specifici. ### Files da creare ``` backend/src/repositories/ β”œβ”€β”€ __init__.py # Aggiornare └── scenario.py # NUOVO ``` ### Implementazione richiesta **`backend/src/repositories/scenario.py`** ```python from typing import Optional, List from uuid import UUID from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, update from enum import Enum from backend.src.models.scenario import Scenario from backend.src.repositories.base import BaseRepository class ScenarioStatus(str, Enum): DRAFT = "draft" RUNNING = "running" COMPLETED = "completed" ARCHIVED = "archived" class ScenarioRepository(BaseRepository[Scenario]): """Repository for Scenario model with specific methods.""" def __init__(self): super().__init__(Scenario) async def get_by_name(self, db: AsyncSession, name: str) -> Optional[Scenario]: """Get scenario by name.""" result = await db.execute( select(Scenario).where(Scenario.name == name) ) return result.scalar_one_or_none() async def list_by_status( self, db: AsyncSession, status: ScenarioStatus, skip: int = 0, limit: int = 100 ) -> List[Scenario]: """List scenarios by status.""" return await self.get_multi(db, status=status, skip=skip, limit=limit) async def list_by_region( self, db: AsyncSession, region: str, skip: int = 0, limit: int = 100 ) -> List[Scenario]: """List scenarios by region.""" return await self.get_multi(db, region=region, skip=skip, limit=limit) async def update_status( self, db: AsyncSession, scenario_id: UUID, new_status: ScenarioStatus ) -> Optional[Scenario]: """Update scenario status.""" result = await db.execute( update(Scenario) .where(Scenario.id == scenario_id) .values(status=new_status) .returning(Scenario) ) await db.commit() return result.scalar_one_or_none() async def increment_total_requests( self, db: AsyncSession, scenario_id: UUID, increment: int = 1 ) -> None: """Atomically increment total_requests counter.""" from sqlalchemy import func await db.execute( update(Scenario) .where(Scenario.id == scenario_id) .values(total_requests=Scenario.total_requests + increment) ) await db.commit() async def update_total_cost( self, db: AsyncSession, scenario_id: UUID, new_cost: float ) -> None: """Update total cost estimate.""" await db.execute( update(Scenario) .where(Scenario.id == scenario_id) .values(total_cost_estimate=new_cost) ) await db.commit() # Singleton instance scenario_repository = ScenarioRepository() ``` **Aggiorna `backend/src/repositories/__init__.py`**: ```python from backend.src.repositories.base import BaseRepository from backend.src.repositories.scenario import ScenarioRepository, scenario_repository __all__ = ["BaseRepository", "ScenarioRepository", "scenario_repository"] ``` ### Criteri di accettazione - [ ] Estende BaseRepository[Scenario] - [ ] Metodi specifici: get_by_name, list_by_status, list_by_region - [ ] Metodi business: update_status, increment_total_requests, update_total_cost - [ ] Singleton instance `scenario_repository` - [ ] Test: operazioni CRUD + metodi specifici --- ## πŸ“‹ Checklist Completamento Prima di passare al prossimo task, verifica: - [ ] Tutti i file creati nella struttura corretta - [ ] Import funzionanti (nessun errore di importazione) - [ ] Type hints completi - [ ] Docstring per classi e metodi pubblici - [ ] Nessun codice hardcoded (usa environment variables per config) - [ ] Formattato con ruff/black --- ## πŸ“– Riferimenti 1. **Schema Database:** `export/architecture.md` sezione 3.2 2. **API Specs:** `export/architecture.md` sezione 4 3. **Configurazione Agente:** `.opencode/agents/backend-dev.md` --- ## πŸ§ͺ Testing Crea test di base in `backend/tests/`: ```python # tests/unit/test_repositories.py import pytest from backend.src.repositories.scenario import scenario_repository @pytest.mark.asyncio async def test_scenario_repository_create(db_session): repo = scenario_repository scenario = await repo.create(db_session, obj_in={ "name": "Test Scenario", "region": "us-east-1" }) assert scenario.name == "Test Scenario" assert scenario.status == "draft" ``` --- ## πŸš€ Comandi Utili ```bash # Verifica modelli cd /home/google/Sources/LucaSacchiNet/mockupAWS/backend python -c "from src.models import Base; print('Models OK')" # Test connessione DB uv run python -c "from src.core.database import engine; print('DB OK')" # Run tests uv run pytest tests/ -v ``` --- **@backend-dev: INIZIA ORA! πŸš€** Procedi in ordine: BE-001 β†’ BE-002 β†’ BE-003 β†’ BE-004 β†’ BE-005 **Domande?** Riferiti a `export/architecture.md` per lo schema dettagliato. **Commit convenzioni:** - `feat: add database connection and session management` - `feat: add SQLAlchemy models for all entities` - `feat: add Pydantic schemas for request/response` - `feat: implement base repository pattern` - `feat: add scenario repository with specific methods` **Buon lavoro! πŸ’ͺ**