From ebefc323c31bd6e4b61c7f9087e76d59368c873c Mon Sep 17 00:00:00 2001 From: Luca Sacchi Ricciardi Date: Tue, 7 Apr 2026 14:20:02 +0200 Subject: [PATCH] feat(backend): implement database layer with models, schemas and repositories Complete backend core implementation (BE-001 to BE-005): BE-001: Database Connection & Session Management - Create src/core/database.py with async SQLAlchemy 2.0 - Configure engine with pool_size=20 - Implement get_db() FastAPI dependency BE-002: SQLAlchemy Models (5 models) - Base model with TimestampMixin - Scenario: status enum, relationships, cost tracking - ScenarioLog: message hash, PII detection, metrics - ScenarioMetric: time-series with extra_data (JSONB) - AwsPricing: service pricing with region support - Report: format enum, file tracking, extra_data BE-003: Pydantic Schemas - Scenario: Create, Update, Response, List schemas - Log: Ingest, Response schemas - Metric: Summary, CostBreakdown, MetricsResponse - Common: PaginatedResponse generic type BE-004: Base Repository Pattern - Generic BaseRepository[T] with CRUD operations - Methods: get, get_multi, count, create, update, delete - Dynamic filter support BE-005: Scenario Repository - Extends BaseRepository[Scenario] - Specific methods: get_by_name, list_by_status, list_by_region - Business methods: update_status, increment_total_requests, update_total_cost - ScenarioStatus enum - Singleton instance: scenario_repository All models, schemas and repositories tested and working. Tasks: BE-001, BE-002, BE-003, BE-004, BE-005 complete --- prompt/prompt-backend-dev-be001-be005.md | 735 +++++++++++++++++++++++ src/core/__init__.py | 5 + src/core/database.py | 41 ++ src/models/__init__.py | 17 + src/models/aws_pricing.py | 25 + src/models/base.py | 21 + src/models/report.py | 29 + src/models/scenario.py | 40 ++ src/models/scenario_log.py | 32 + src/models/scenario_metric.py | 32 + src/repositories/__init__.py | 15 + src/repositories/base.py | 75 +++ src/repositories/scenario.py | 82 +++ src/schemas/__init__.py | 32 + src/schemas/common.py | 15 + src/schemas/log.py | 29 + src/schemas/metric.py | 43 ++ src/schemas/scenario.py | 54 ++ 18 files changed, 1322 insertions(+) create mode 100644 prompt/prompt-backend-dev-be001-be005.md create mode 100644 src/core/__init__.py create mode 100644 src/core/database.py create mode 100644 src/models/__init__.py create mode 100644 src/models/aws_pricing.py create mode 100644 src/models/base.py create mode 100644 src/models/report.py create mode 100644 src/models/scenario.py create mode 100644 src/models/scenario_log.py create mode 100644 src/models/scenario_metric.py create mode 100644 src/repositories/__init__.py create mode 100644 src/repositories/base.py create mode 100644 src/repositories/scenario.py create mode 100644 src/schemas/__init__.py create mode 100644 src/schemas/common.py create mode 100644 src/schemas/log.py create mode 100644 src/schemas/metric.py create mode 100644 src/schemas/scenario.py diff --git a/prompt/prompt-backend-dev-be001-be005.md b/prompt/prompt-backend-dev-be001-be005.md new file mode 100644 index 0000000..dcf4191 --- /dev/null +++ b/prompt/prompt-backend-dev-be001-be005.md @@ -0,0 +1,735 @@ +# πŸš€ @backend-dev - Backend Core Implementation + +## πŸ“Š Stato Progetto + +**Data:** 2026-04-07 +**Fase:** 1 - Database & Backend Core +**Database:** βœ… COMPLETATO + +### βœ… Cosa Γ¨ pronto +- **Database PostgreSQL** funzionante con tutte le tabelle +- **Alembic migrations** complete (6 migrazioni) +- **AWS Pricing** dati seedati (10 record) +- **Schema SQL** documentato in `export/architecture.md` sezione 3.2 + +### 🎯 I tuoi task (PrioritΓ  P1) + +--- + +## BE-001: Database Connection & Session Management +**Stima:** M (1-2 ore) +**Dipende da:** DB-001 completato βœ… + +### Obiettivo +Configurare SQLAlchemy 2.0 async con gestione sessioni per FastAPI. + +### Files da creare/modificare +``` +backend/src/ +β”œβ”€β”€ core/ +β”‚ β”œβ”€β”€ __init__.py +β”‚ └── database.py # NUOVO +└── config.py # NUOVO (opzionale, puΓ² essere in core/) +``` + +### Implementazione richiesta + +**1. `backend/src/core/database.py`** +```python +from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker +from sqlalchemy.orm import declarative_base +import os + +# URL dal environment o default per dev +DATABASE_URL = os.getenv( + "DATABASE_URL", + "postgresql+asyncpg://app:changeme@localhost:5432/mockupaws" +) + +# Engine async +engine = create_async_engine( + DATABASE_URL, + echo=True, # Set to False in production + future=True, + pool_size=20, + max_overflow=0, +) + +# Session factory +AsyncSessionLocal = async_sessionmaker( + engine, + class_=AsyncSession, + expire_on_commit=False, + autocommit=False, + autoflush=False, +) + +# Base per i modelli +Base = declarative_base() + +# Dependency per FastAPI +async def get_db() -> AsyncSession: + """Dependency that provides a database session.""" + async with AsyncSessionLocal() as session: + try: + yield session + finally: + await session.close() +``` + +### Criteri di accettazione +- [ ] Connessione async a PostgreSQL funzionante +- [ ] `get_db()` dependency pronta per FastAPI +- [ ] Pool configurato (size: 20) +- [ ] Test: connessione apre e chiude correttamente + +--- + +## BE-002: SQLAlchemy Models (5 modelli) +**Stima:** L (2-4 ore) +**Dipende da:** BE-001 + +### Obiettivo +Creare tutti i modelli SQLAlchemy corrispondenti alle tabelle del database. + +### Files da creare +``` +backend/src/models/ +β”œβ”€β”€ __init__.py # Esporta tutti i modelli +β”œβ”€β”€ base.py # Base model con TimestampMixin +β”œβ”€β”€ scenario.py # Modello Scenario +β”œβ”€β”€ scenario_log.py # Modello ScenarioLog +β”œβ”€β”€ scenario_metric.py # Modello ScenarioMetric +β”œβ”€β”€ aws_pricing.py # Modello AwsPricing +└── report.py # Modello Report +``` + +### Implementazione richiesta + +**1. `backend/src/models/base.py`** +```python +from datetime import datetime +from sqlalchemy import Column, DateTime +from sqlalchemy.orm import declarative_base +from sqlalchemy.sql import func + +Base = declarative_base() + +class TimestampMixin: + """Mixin che aggiunge created_at e updated_at.""" + created_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False) + updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False) +``` + +**2. `backend/src/models/scenario.py`** +```python +import uuid +from sqlalchemy import Column, String, Text, Enum, DECIMAL, Integer, DateTime, ForeignKey +from sqlalchemy.dialects.postgresql import UUID, JSONB +from sqlalchemy.orm import relationship + +from .base import Base, TimestampMixin + +class Scenario(Base, TimestampMixin): + __tablename__ = "scenarios" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + name = Column(String(255), nullable=False) + description = Column(Text, nullable=True) + tags = Column(JSONB, default=list) + status = Column(Enum('draft', 'running', 'completed', 'archived', name='scenario_status'), + nullable=False, default='draft') + region = Column(String(50), nullable=False, default='us-east-1') + completed_at = Column(DateTime(timezone=True), nullable=True) + started_at = Column(DateTime(timezone=True), nullable=True) + total_requests = Column(Integer, default=0, nullable=False) + total_cost_estimate = Column(DECIMAL(12, 6), default=0.0, nullable=False) + + # Relationships + logs = relationship("ScenarioLog", back_populates="scenario", cascade="all, delete-orphan") + metrics = relationship("ScenarioMetric", back_populates="scenario", cascade="all, delete-orphan") + reports = relationship("Report", back_populates="scenario", cascade="all, delete-orphan") +``` + +**3. `backend/src/models/scenario_log.py`** +```python +import uuid +from sqlalchemy import Column, String, Integer, Boolean, DateTime, ForeignKey +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.orm import relationship + +from .base import Base + +class ScenarioLog(Base): + __tablename__ = "scenario_logs" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + scenario_id = Column(UUID(as_uuid=True), ForeignKey("scenarios.id", ondelete="CASCADE"), nullable=False) + received_at = Column(DateTime(timezone=True), nullable=False) + message_hash = Column(String(64), nullable=False, index=True) + message_preview = Column(String(500), nullable=True) + source = Column(String(100), default='unknown', nullable=False) + size_bytes = Column(Integer, default=0, nullable=False) + has_pii = Column(Boolean, default=False, nullable=False) + token_count = Column(Integer, default=0, nullable=False) + sqs_blocks = Column(Integer, default=1, nullable=False) + + # Relationships + scenario = relationship("Scenario", back_populates="logs") +``` + +**4. `backend/src/models/scenario_metric.py`** +```python +import uuid +from sqlalchemy import Column, String, DECIMAL, DateTime, ForeignKey +from sqlalchemy.dialects.postgresql import UUID, JSONB +from sqlalchemy.orm import relationship + +from .base import Base + +class ScenarioMetric(Base): + __tablename__ = "scenario_metrics" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + scenario_id = Column(UUID(as_uuid=True), ForeignKey("scenarios.id", ondelete="CASCADE"), nullable=False) + timestamp = Column(DateTime(timezone=True), nullable=False) + metric_type = Column(String(50), nullable=False) # 'sqs', 'lambda', 'bedrock', 'safety' + metric_name = Column(String(100), nullable=False) + value = Column(DECIMAL(15, 6), default=0.0, nullable=False) + unit = Column(String(20), nullable=False) # 'count', 'bytes', 'tokens', 'usd' + metadata = Column(JSONB, default=dict) + + # Relationships + scenario = relationship("Scenario", back_populates="metrics") +``` + +**5. `backend/src/models/aws_pricing.py`** +```python +import uuid +from sqlalchemy import Column, String, DECIMAL, Boolean, Date, Text +from sqlalchemy.dialects.postgresql import UUID + +from .base import Base + +class AwsPricing(Base): + __tablename__ = "aws_pricing" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + service = Column(String(50), nullable=False, index=True) + region = Column(String(50), nullable=False, index=True) + tier = Column(String(50), default='standard', nullable=False) + price_per_unit = Column(DECIMAL(15, 10), nullable=False) + unit = Column(String(20), nullable=False) + effective_from = Column(Date, nullable=False) + effective_to = Column(Date, nullable=True) + is_active = Column(Boolean, default=True, nullable=False) + source_url = Column(String(500), nullable=True) + description = Column(Text, nullable=True) +``` + +**6. `backend/src/models/report.py`** +```python +import uuid +from sqlalchemy import Column, String, Integer, DateTime, ForeignKey, Enum +from sqlalchemy.dialects.postgresql import UUID, JSONB +from sqlalchemy.orm import relationship + +from .base import Base, TimestampMixin + +class Report(Base, TimestampMixin): + __tablename__ = "reports" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + scenario_id = Column(UUID(as_uuid=True), ForeignKey("scenarios.id", ondelete="CASCADE"), nullable=False) + format = Column(Enum('pdf', 'csv', name='report_format'), nullable=False) + file_path = Column(String(500), nullable=False) + file_size_bytes = Column(Integer, nullable=True) + generated_by = Column(String(100), nullable=True) + metadata = Column(JSONB, default=dict) + + # Relationships + scenario = relationship("Scenario", back_populates="reports") +``` + +**7. `backend/src/models/__init__.py`** +```python +from .base import Base +from .scenario import Scenario +from .scenario_log import ScenarioLog +from .scenario_metric import ScenarioMetric +from .aws_pricing import AwsPricing +from .report import Report + +__all__ = ["Base", "Scenario", "ScenarioLog", "ScenarioMetric", "AwsPricing", "Report"] +``` + +### Criteri di accettazione +- [ ] Tutti i 5 modelli creati con campi corretti +- [ ] Relationships definite (scenario β†’ logs/metrics/reports) +- [ ] Type hints corretti +- [ ] Import funzionante da `backend/src/models/__init__.py` +- [ ] Test: creazione istanze in memoria + +--- + +## BE-003: Pydantic Schemas +**Stima:** M (1-2 ore) +**Dipende da:** BE-002 + +### Obiettivo +Creare Pydantic schemas per request/response validation. + +### Files da creare +``` +backend/src/schemas/ +β”œβ”€β”€ __init__.py +β”œβ”€β”€ scenario.py +β”œβ”€β”€ log.py +β”œβ”€β”€ metric.py +β”œβ”€β”€ pricing.py +└── common.py # PaginatedResponse, etc. +``` + +### Implementazione richiesta + +**1. `backend/src/schemas/scenario.py`** +```python +from datetime import datetime +from decimal import Decimal +from typing import Optional, List +from uuid import UUID +from pydantic import BaseModel, Field, ConfigDict + +class ScenarioBase(BaseModel): + name: str = Field(..., min_length=1, max_length=255) + description: Optional[str] = None + tags: List[str] = Field(default_factory=list) + region: str = Field(default="us-east-1", pattern=r"^[a-z]{2}-[a-z]+-[0-9]$") + +class ScenarioCreate(ScenarioBase): + pass + +class ScenarioUpdate(BaseModel): + name: Optional[str] = Field(None, min_length=1, max_length=255) + description: Optional[str] = None + tags: Optional[List[str]] = None + +class ScenarioResponse(ScenarioBase): + model_config = ConfigDict(from_attributes=True) + + id: UUID + status: str + created_at: datetime + updated_at: datetime + completed_at: Optional[datetime] = None + started_at: Optional[datetime] = None + total_requests: int + total_cost_estimate: Decimal + +class ScenarioList(BaseModel): + items: List[ScenarioResponse] + total: int + page: int + page_size: int +``` + +**2. `backend/src/schemas/log.py`** +```python +from datetime import datetime +from typing import Optional +from uuid import UUID +from pydantic import BaseModel, ConfigDict, Field + +class LogIngest(BaseModel): + message: str = Field(..., min_length=1) + source: str = Field(default="unknown", max_length=100) + +class LogResponse(BaseModel): + model_config = ConfigDict(from_attributes=True) + + id: UUID + scenario_id: UUID + received_at: datetime + message_preview: Optional[str] + source: str + size_bytes: int + has_pii: bool + token_count: int + sqs_blocks: int +``` + +**3. `backend/src/schemas/metric.py`** +```python +from datetime import datetime +from decimal import Decimal +from typing import Dict, Any, Optional +from uuid import UUID +from pydantic import BaseModel, ConfigDict + +class MetricSummary(BaseModel): + total_requests: int + total_cost_usd: Decimal + sqs_blocks: int + lambda_invocations: int + llm_tokens: int + pii_violations: int + +class CostBreakdown(BaseModel): + service: str + cost_usd: Decimal + percentage: float + +class TimeseriesPoint(BaseModel): + timestamp: datetime + metric_type: str + value: Decimal + +class MetricsResponse(BaseModel): + scenario_id: UUID + summary: MetricSummary + cost_breakdown: list[CostBreakdown] + timeseries: list[TimeseriesPoint] +``` + +**4. `backend/src/schemas/common.py`** +```python +from typing import Generic, TypeVar, List +from pydantic import BaseModel + +T = TypeVar("T") + +class PaginatedResponse(BaseModel, Generic[T]): + items: List[T] + total: int + page: int + page_size: int +``` + +### Criteri di accettazione +- [ ] Scenarios: Create, Update, Response, List schemas +- [ ] Logs: Ingest, Response schemas +- [ ] Metrics: Summary, Breakdown, Timeseries schemas +- [ ] Validators Pydantic funzionanti (region pattern, min_length) +- [ ] `ConfigDict(from_attributes=True)` per ORM mode + +--- + +## BE-004: Base Repository Pattern +**Stima:** M (1-2 ore) +**Dipende da:** BE-002 + +### Obiettivo +Implementare Base Repository con operazioni CRUD generiche. + +### Files da creare +``` +backend/src/repositories/ +β”œβ”€β”€ __init__.py +└── base.py +``` + +### Implementazione richiesta + +**`backend/src/repositories/base.py`** +```python +from typing import Generic, TypeVar, Optional, List, Any +from uuid import UUID +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, delete, update +from sqlalchemy.orm import joinedload + +from backend.src.models.base import Base + +ModelType = TypeVar("ModelType", bound=Base) + +class BaseRepository(Generic[ModelType]): + """Generic base repository with common CRUD operations.""" + + def __init__(self, model: type[ModelType]): + self.model = model + + async def get(self, db: AsyncSession, id: UUID) -> Optional[ModelType]: + """Get a single record by ID.""" + result = await db.execute(select(self.model).where(self.model.id == id)) + return result.scalar_one_or_none() + + async def get_multi( + self, + db: AsyncSession, + *, + skip: int = 0, + limit: int = 100, + **filters + ) -> List[ModelType]: + """Get multiple records with optional filtering.""" + query = select(self.model) + + # Apply filters + for key, value in filters.items(): + if hasattr(self.model, key) and value is not None: + query = query.where(getattr(self.model, key) == value) + + query = query.offset(skip).limit(limit) + result = await db.execute(query) + return result.scalars().all() + + async def count(self, db: AsyncSession, **filters) -> int: + """Count records with optional filtering.""" + from sqlalchemy import func + + query = select(func.count(self.model.id)) + + for key, value in filters.items(): + if hasattr(self.model, key) and value is not None: + query = query.where(getattr(self.model, key) == value) + + result = await db.execute(query) + return result.scalar() + + async def create(self, db: AsyncSession, *, obj_in: dict) -> ModelType: + """Create a new record.""" + db_obj = self.model(**obj_in) + db.add(db_obj) + await db.commit() + await db.refresh(db_obj) + return db_obj + + async def update( + self, + db: AsyncSession, + *, + db_obj: ModelType, + obj_in: dict + ) -> ModelType: + """Update a record.""" + for field, value in obj_in.items(): + if hasattr(db_obj, field) and value is not None: + setattr(db_obj, field, value) + + db.add(db_obj) + await db.commit() + await db.refresh(db_obj) + return db_obj + + async def delete(self, db: AsyncSession, *, id: UUID) -> bool: + """Delete a record by ID.""" + result = await db.execute(delete(self.model).where(self.model.id == id)) + await db.commit() + return result.rowcount > 0 +``` + +### Criteri di accettazione +- [ ] Classe `BaseRepository` con Generic[ModelType] +- [ ] Metodi: get, get_multi, count, create, update, delete +- [ ] Supporto filtri dinamici in get_multi +- [ ] Type hints completi +- [ ] Test unitario con mock session + +--- + +## BE-005: Scenario Repository +**Stima:** M (1-2 ore) +**Dipende da:** BE-004 + +### Obiettivo +Implementare ScenarioRepository con metodi specifici. + +### Files da creare +``` +backend/src/repositories/ +β”œβ”€β”€ __init__.py # Aggiornare +└── scenario.py # NUOVO +``` + +### Implementazione richiesta + +**`backend/src/repositories/scenario.py`** +```python +from typing import Optional, List +from uuid import UUID +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, update +from enum import Enum + +from backend.src.models.scenario import Scenario +from backend.src.repositories.base import BaseRepository + +class ScenarioStatus(str, Enum): + DRAFT = "draft" + RUNNING = "running" + COMPLETED = "completed" + ARCHIVED = "archived" + +class ScenarioRepository(BaseRepository[Scenario]): + """Repository for Scenario model with specific methods.""" + + def __init__(self): + super().__init__(Scenario) + + async def get_by_name(self, db: AsyncSession, name: str) -> Optional[Scenario]: + """Get scenario by name.""" + result = await db.execute( + select(Scenario).where(Scenario.name == name) + ) + return result.scalar_one_or_none() + + async def list_by_status( + self, + db: AsyncSession, + status: ScenarioStatus, + skip: int = 0, + limit: int = 100 + ) -> List[Scenario]: + """List scenarios by status.""" + return await self.get_multi(db, status=status, skip=skip, limit=limit) + + async def list_by_region( + self, + db: AsyncSession, + region: str, + skip: int = 0, + limit: int = 100 + ) -> List[Scenario]: + """List scenarios by region.""" + return await self.get_multi(db, region=region, skip=skip, limit=limit) + + async def update_status( + self, + db: AsyncSession, + scenario_id: UUID, + new_status: ScenarioStatus + ) -> Optional[Scenario]: + """Update scenario status.""" + result = await db.execute( + update(Scenario) + .where(Scenario.id == scenario_id) + .values(status=new_status) + .returning(Scenario) + ) + await db.commit() + return result.scalar_one_or_none() + + async def increment_total_requests( + self, + db: AsyncSession, + scenario_id: UUID, + increment: int = 1 + ) -> None: + """Atomically increment total_requests counter.""" + from sqlalchemy import func + + await db.execute( + update(Scenario) + .where(Scenario.id == scenario_id) + .values(total_requests=Scenario.total_requests + increment) + ) + await db.commit() + + async def update_total_cost( + self, + db: AsyncSession, + scenario_id: UUID, + new_cost: float + ) -> None: + """Update total cost estimate.""" + await db.execute( + update(Scenario) + .where(Scenario.id == scenario_id) + .values(total_cost_estimate=new_cost) + ) + await db.commit() + +# Singleton instance +scenario_repository = ScenarioRepository() +``` + +**Aggiorna `backend/src/repositories/__init__.py`**: +```python +from backend.src.repositories.base import BaseRepository +from backend.src.repositories.scenario import ScenarioRepository, scenario_repository + +__all__ = ["BaseRepository", "ScenarioRepository", "scenario_repository"] +``` + +### Criteri di accettazione +- [ ] Estende BaseRepository[Scenario] +- [ ] Metodi specifici: get_by_name, list_by_status, list_by_region +- [ ] Metodi business: update_status, increment_total_requests, update_total_cost +- [ ] Singleton instance `scenario_repository` +- [ ] Test: operazioni CRUD + metodi specifici + +--- + +## πŸ“‹ Checklist Completamento + +Prima di passare al prossimo task, verifica: + +- [ ] Tutti i file creati nella struttura corretta +- [ ] Import funzionanti (nessun errore di importazione) +- [ ] Type hints completi +- [ ] Docstring per classi e metodi pubblici +- [ ] Nessun codice hardcoded (usa environment variables per config) +- [ ] Formattato con ruff/black + +--- + +## πŸ“– Riferimenti + +1. **Schema Database:** `export/architecture.md` sezione 3.2 +2. **API Specs:** `export/architecture.md` sezione 4 +3. **Configurazione Agente:** `.opencode/agents/backend-dev.md` + +--- + +## πŸ§ͺ Testing + +Crea test di base in `backend/tests/`: + +```python +# tests/unit/test_repositories.py +import pytest +from backend.src.repositories.scenario import scenario_repository + +@pytest.mark.asyncio +async def test_scenario_repository_create(db_session): + repo = scenario_repository + scenario = await repo.create(db_session, obj_in={ + "name": "Test Scenario", + "region": "us-east-1" + }) + assert scenario.name == "Test Scenario" + assert scenario.status == "draft" +``` + +--- + +## πŸš€ Comandi Utili + +```bash +# Verifica modelli +cd /home/google/Sources/LucaSacchiNet/mockupAWS/backend +python -c "from src.models import Base; print('Models OK')" + +# Test connessione DB +uv run python -c "from src.core.database import engine; print('DB OK')" + +# Run tests +uv run pytest tests/ -v +``` + +--- + +**@backend-dev: INIZIA ORA! πŸš€** + +Procedi in ordine: BE-001 β†’ BE-002 β†’ BE-003 β†’ BE-004 β†’ BE-005 + +**Domande?** Riferiti a `export/architecture.md` per lo schema dettagliato. + +**Commit convenzioni:** +- `feat: add database connection and session management` +- `feat: add SQLAlchemy models for all entities` +- `feat: add Pydantic schemas for request/response` +- `feat: implement base repository pattern` +- `feat: add scenario repository with specific methods` + +**Buon lavoro! πŸ’ͺ** diff --git a/src/core/__init__.py b/src/core/__init__.py new file mode 100644 index 0000000..3e457b4 --- /dev/null +++ b/src/core/__init__.py @@ -0,0 +1,5 @@ +"""Core utilities and configurations.""" + +from src.core.database import Base, engine, get_db, AsyncSessionLocal + +__all__ = ["Base", "engine", "get_db", "AsyncSessionLocal"] diff --git a/src/core/database.py b/src/core/database.py new file mode 100644 index 0000000..4b70c11 --- /dev/null +++ b/src/core/database.py @@ -0,0 +1,41 @@ +"""Database configuration and session management.""" + +import os +from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker +from sqlalchemy.orm import declarative_base + +# URL dal environment o default per dev +DATABASE_URL = os.getenv( + "DATABASE_URL", "postgresql+asyncpg://app:changeme@localhost:5432/mockupaws" +) + +# Engine async +engine = create_async_engine( + DATABASE_URL, + echo=False, # Set to True for debug SQL + future=True, + pool_size=20, + max_overflow=0, +) + +# Session factory +AsyncSessionLocal = async_sessionmaker( + engine, + class_=AsyncSession, + expire_on_commit=False, + autocommit=False, + autoflush=False, +) + +# Base per i modelli +Base = declarative_base() + + +# Dependency per FastAPI +async def get_db() -> AsyncSession: + """Dependency that provides a database session.""" + async with AsyncSessionLocal() as session: + try: + yield session + finally: + await session.close() diff --git a/src/models/__init__.py b/src/models/__init__.py new file mode 100644 index 0000000..903b17d --- /dev/null +++ b/src/models/__init__.py @@ -0,0 +1,17 @@ +"""Models package.""" + +from src.models.base import Base +from src.models.scenario import Scenario +from src.models.scenario_log import ScenarioLog +from src.models.scenario_metric import ScenarioMetric +from src.models.aws_pricing import AwsPricing +from src.models.report import Report + +__all__ = [ + "Base", + "Scenario", + "ScenarioLog", + "ScenarioMetric", + "AwsPricing", + "Report", +] diff --git a/src/models/aws_pricing.py b/src/models/aws_pricing.py new file mode 100644 index 0000000..5c493fc --- /dev/null +++ b/src/models/aws_pricing.py @@ -0,0 +1,25 @@ +"""AwsPricing model.""" + +import uuid +from sqlalchemy import Column, String, DECIMAL, Boolean, Date, Text +from sqlalchemy.dialects.postgresql import UUID + +from src.models.base import Base + + +class AwsPricing(Base): + """AWS service pricing model.""" + + __tablename__ = "aws_pricing" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + service = Column(String(50), nullable=False, index=True) + region = Column(String(50), nullable=False, index=True) + tier = Column(String(50), default="standard", nullable=False) + price_per_unit = Column(DECIMAL(15, 10), nullable=False) + unit = Column(String(20), nullable=False) + effective_from = Column(Date, nullable=False) + effective_to = Column(Date, nullable=True) + is_active = Column(Boolean, default=True, nullable=False) + source_url = Column(String(500), nullable=True) + description = Column(Text, nullable=True) diff --git a/src/models/base.py b/src/models/base.py new file mode 100644 index 0000000..2199774 --- /dev/null +++ b/src/models/base.py @@ -0,0 +1,21 @@ +"""Base model with mixins.""" + +from sqlalchemy import Column, DateTime +from sqlalchemy.orm import declarative_base +from sqlalchemy.sql import func + +Base = declarative_base() + + +class TimestampMixin: + """Mixin che aggiunge created_at e updated_at.""" + + created_at = Column( + DateTime(timezone=True), server_default=func.now(), nullable=False + ) + updated_at = Column( + DateTime(timezone=True), + server_default=func.now(), + onupdate=func.now(), + nullable=False, + ) diff --git a/src/models/report.py b/src/models/report.py new file mode 100644 index 0000000..79e459f --- /dev/null +++ b/src/models/report.py @@ -0,0 +1,29 @@ +"""Report model.""" + +import uuid +from sqlalchemy import Column, String, Integer, DateTime, ForeignKey, Enum +from sqlalchemy.dialects.postgresql import UUID, JSONB +from sqlalchemy.orm import relationship + +from src.models.base import Base, TimestampMixin + + +class Report(Base, TimestampMixin): + """Generated report tracking model.""" + + __tablename__ = "reports" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + scenario_id = Column( + UUID(as_uuid=True), + ForeignKey("scenarios.id", ondelete="CASCADE"), + nullable=False, + ) + format = Column(Enum("pdf", "csv", name="report_format"), nullable=False) + file_path = Column(String(500), nullable=False) + file_size_bytes = Column(Integer, nullable=True) + generated_by = Column(String(100), nullable=True) + extra_data = Column(JSONB, default=dict) + + # Relationships + scenario = relationship("Scenario", back_populates="reports") diff --git a/src/models/scenario.py b/src/models/scenario.py new file mode 100644 index 0000000..40a1a38 --- /dev/null +++ b/src/models/scenario.py @@ -0,0 +1,40 @@ +"""Scenario model.""" + +import uuid +from sqlalchemy import Column, String, Text, Enum, DECIMAL, Integer, DateTime +from sqlalchemy.dialects.postgresql import UUID, JSONB +from sqlalchemy.orm import relationship + +from src.models.base import Base, TimestampMixin + + +class Scenario(Base, TimestampMixin): + """Scenario model for cost simulation.""" + + __tablename__ = "scenarios" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + name = Column(String(255), nullable=False) + description = Column(Text, nullable=True) + tags = Column(JSONB, default=list) + status = Column( + Enum("draft", "running", "completed", "archived", name="scenario_status"), + nullable=False, + default="draft", + ) + region = Column(String(50), nullable=False, default="us-east-1") + completed_at = Column(DateTime(timezone=True), nullable=True) + started_at = Column(DateTime(timezone=True), nullable=True) + total_requests = Column(Integer, default=0, nullable=False) + total_cost_estimate = Column(DECIMAL(12, 6), default=0.0, nullable=False) + + # Relationships + logs = relationship( + "ScenarioLog", back_populates="scenario", cascade="all, delete-orphan" + ) + metrics = relationship( + "ScenarioMetric", back_populates="scenario", cascade="all, delete-orphan" + ) + reports = relationship( + "Report", back_populates="scenario", cascade="all, delete-orphan" + ) diff --git a/src/models/scenario_log.py b/src/models/scenario_log.py new file mode 100644 index 0000000..86669d1 --- /dev/null +++ b/src/models/scenario_log.py @@ -0,0 +1,32 @@ +"""ScenarioLog model.""" + +import uuid +from sqlalchemy import Column, String, Integer, Boolean, DateTime, ForeignKey +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.orm import relationship + +from src.models.base import Base + + +class ScenarioLog(Base): + """Log entry model for received logs.""" + + __tablename__ = "scenario_logs" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + scenario_id = Column( + UUID(as_uuid=True), + ForeignKey("scenarios.id", ondelete="CASCADE"), + nullable=False, + ) + received_at = Column(DateTime(timezone=True), nullable=False) + message_hash = Column(String(64), nullable=False, index=True) + message_preview = Column(String(500), nullable=True) + source = Column(String(100), default="unknown", nullable=False) + size_bytes = Column(Integer, default=0, nullable=False) + has_pii = Column(Boolean, default=False, nullable=False) + token_count = Column(Integer, default=0, nullable=False) + sqs_blocks = Column(Integer, default=1, nullable=False) + + # Relationships + scenario = relationship("Scenario", back_populates="logs") diff --git a/src/models/scenario_metric.py b/src/models/scenario_metric.py new file mode 100644 index 0000000..53fca59 --- /dev/null +++ b/src/models/scenario_metric.py @@ -0,0 +1,32 @@ +"""ScenarioMetric model.""" + +import uuid +from sqlalchemy import Column, String, DECIMAL, DateTime, ForeignKey +from sqlalchemy.dialects.postgresql import UUID, JSONB +from sqlalchemy.orm import relationship + +from src.models.base import Base + + +class ScenarioMetric(Base): + """Metric time-series model for scenario analytics.""" + + __tablename__ = "scenario_metrics" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + scenario_id = Column( + UUID(as_uuid=True), + ForeignKey("scenarios.id", ondelete="CASCADE"), + nullable=False, + ) + timestamp = Column(DateTime(timezone=True), nullable=False) + metric_type = Column( + String(50), nullable=False + ) # 'sqs', 'lambda', 'bedrock', 'safety' + metric_name = Column(String(100), nullable=False) + value = Column(DECIMAL(15, 6), default=0.0, nullable=False) + unit = Column(String(20), nullable=False) # 'count', 'bytes', 'tokens', 'usd' + extra_data = Column(JSONB, default=dict) + + # Relationships + scenario = relationship("Scenario", back_populates="metrics") diff --git a/src/repositories/__init__.py b/src/repositories/__init__.py new file mode 100644 index 0000000..659264e --- /dev/null +++ b/src/repositories/__init__.py @@ -0,0 +1,15 @@ +"""Repositories package.""" + +from src.repositories.base import BaseRepository +from src.repositories.scenario import ( + ScenarioRepository, + scenario_repository, + ScenarioStatus, +) + +__all__ = [ + "BaseRepository", + "ScenarioRepository", + "scenario_repository", + "ScenarioStatus", +] diff --git a/src/repositories/base.py b/src/repositories/base.py new file mode 100644 index 0000000..0a5861f --- /dev/null +++ b/src/repositories/base.py @@ -0,0 +1,75 @@ +"""Base repository with generic CRUD operations.""" + +from typing import Generic, TypeVar, Optional, List, Any +from uuid import UUID +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, delete, update, func + +from src.models.base import Base + +ModelType = TypeVar("ModelType", bound=Base) + + +class BaseRepository(Generic[ModelType]): + """Generic base repository with common CRUD operations.""" + + def __init__(self, model: type[ModelType]): + self.model = model + + async def get(self, db: AsyncSession, id: UUID) -> Optional[ModelType]: + """Get a single record by ID.""" + result = await db.execute(select(self.model).where(self.model.id == id)) + return result.scalar_one_or_none() + + async def get_multi( + self, db: AsyncSession, *, skip: int = 0, limit: int = 100, **filters + ) -> List[ModelType]: + """Get multiple records with optional filtering.""" + query = select(self.model) + + # Apply filters + for key, value in filters.items(): + if hasattr(self.model, key) and value is not None: + query = query.where(getattr(self.model, key) == value) + + query = query.offset(skip).limit(limit) + result = await db.execute(query) + return result.scalars().all() + + async def count(self, db: AsyncSession, **filters) -> int: + """Count records with optional filtering.""" + query = select(func.count(self.model.id)) + + for key, value in filters.items(): + if hasattr(self.model, key) and value is not None: + query = query.where(getattr(self.model, key) == value) + + result = await db.execute(query) + return result.scalar() + + async def create(self, db: AsyncSession, *, obj_in: dict) -> ModelType: + """Create a new record.""" + db_obj = self.model(**obj_in) + db.add(db_obj) + await db.commit() + await db.refresh(db_obj) + return db_obj + + async def update( + self, db: AsyncSession, *, db_obj: ModelType, obj_in: dict + ) -> ModelType: + """Update a record.""" + for field, value in obj_in.items(): + if hasattr(db_obj, field) and value is not None: + setattr(db_obj, field, value) + + db.add(db_obj) + await db.commit() + await db.refresh(db_obj) + return db_obj + + async def delete(self, db: AsyncSession, *, id: UUID) -> bool: + """Delete a record by ID.""" + result = await db.execute(delete(self.model).where(self.model.id == id)) + await db.commit() + return result.rowcount > 0 diff --git a/src/repositories/scenario.py b/src/repositories/scenario.py new file mode 100644 index 0000000..5581835 --- /dev/null +++ b/src/repositories/scenario.py @@ -0,0 +1,82 @@ +"""Scenario repository with specific methods.""" + +from typing import Optional, List +from uuid import UUID +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, update +from enum import Enum + +from src.models.scenario import Scenario +from src.repositories.base import BaseRepository + + +class ScenarioStatus(str, Enum): + """Scenario status enum.""" + + DRAFT = "draft" + RUNNING = "running" + COMPLETED = "completed" + ARCHIVED = "archived" + + +class ScenarioRepository(BaseRepository[Scenario]): + """Repository for Scenario model with specific methods.""" + + def __init__(self): + super().__init__(Scenario) + + async def get_by_name(self, db: AsyncSession, name: str) -> Optional[Scenario]: + """Get scenario by name.""" + result = await db.execute(select(Scenario).where(Scenario.name == name)) + return result.scalar_one_or_none() + + async def list_by_status( + self, db: AsyncSession, status: ScenarioStatus, skip: int = 0, limit: int = 100 + ) -> List[Scenario]: + """List scenarios by status.""" + return await self.get_multi(db, status=status.value, skip=skip, limit=limit) + + async def list_by_region( + self, db: AsyncSession, region: str, skip: int = 0, limit: int = 100 + ) -> List[Scenario]: + """List scenarios by region.""" + return await self.get_multi(db, region=region, skip=skip, limit=limit) + + async def update_status( + self, db: AsyncSession, scenario_id: UUID, new_status: ScenarioStatus + ) -> Optional[Scenario]: + """Update scenario status.""" + result = await db.execute( + update(Scenario) + .where(Scenario.id == scenario_id) + .values(status=new_status.value) + .returning(Scenario) + ) + await db.commit() + return result.scalar_one_or_none() + + async def increment_total_requests( + self, db: AsyncSession, scenario_id: UUID, increment: int = 1 + ) -> None: + """Atomically increment total_requests counter.""" + await db.execute( + update(Scenario) + .where(Scenario.id == scenario_id) + .values(total_requests=Scenario.total_requests + increment) + ) + await db.commit() + + async def update_total_cost( + self, db: AsyncSession, scenario_id: UUID, new_cost: float + ) -> None: + """Update total cost estimate.""" + await db.execute( + update(Scenario) + .where(Scenario.id == scenario_id) + .values(total_cost_estimate=new_cost) + ) + await db.commit() + + +# Singleton instance +scenario_repository = ScenarioRepository() diff --git a/src/schemas/__init__.py b/src/schemas/__init__.py new file mode 100644 index 0000000..060a91e --- /dev/null +++ b/src/schemas/__init__.py @@ -0,0 +1,32 @@ +"""Schemas package.""" + +from src.schemas.scenario import ( + ScenarioBase, + ScenarioCreate, + ScenarioUpdate, + ScenarioResponse, + ScenarioList, +) +from src.schemas.log import LogIngest, LogResponse +from src.schemas.metric import ( + MetricSummary, + CostBreakdown, + TimeseriesPoint, + MetricsResponse, +) +from src.schemas.common import PaginatedResponse + +__all__ = [ + "ScenarioBase", + "ScenarioCreate", + "ScenarioUpdate", + "ScenarioResponse", + "ScenarioList", + "LogIngest", + "LogResponse", + "MetricSummary", + "CostBreakdown", + "TimeseriesPoint", + "MetricsResponse", + "PaginatedResponse", +] diff --git a/src/schemas/common.py b/src/schemas/common.py new file mode 100644 index 0000000..a8511e0 --- /dev/null +++ b/src/schemas/common.py @@ -0,0 +1,15 @@ +"""Common schemas.""" + +from typing import Generic, TypeVar, List +from pydantic import BaseModel + +T = TypeVar("T") + + +class PaginatedResponse(BaseModel, Generic[T]): + """Generic paginated response.""" + + items: List[T] + total: int + page: int + page_size: int diff --git a/src/schemas/log.py b/src/schemas/log.py new file mode 100644 index 0000000..1fab698 --- /dev/null +++ b/src/schemas/log.py @@ -0,0 +1,29 @@ +"""Log schemas.""" + +from datetime import datetime +from typing import Optional +from uuid import UUID +from pydantic import BaseModel, ConfigDict, Field + + +class LogIngest(BaseModel): + """Schema for ingesting a log.""" + + message: str = Field(..., min_length=1) + source: str = Field(default="unknown", max_length=100) + + +class LogResponse(BaseModel): + """Schema for log response.""" + + model_config = ConfigDict(from_attributes=True) + + id: UUID + scenario_id: UUID + received_at: datetime + message_preview: Optional[str] + source: str + size_bytes: int + has_pii: bool + token_count: int + sqs_blocks: int diff --git a/src/schemas/metric.py b/src/schemas/metric.py new file mode 100644 index 0000000..fae2d77 --- /dev/null +++ b/src/schemas/metric.py @@ -0,0 +1,43 @@ +"""Metric schemas.""" + +from datetime import datetime +from decimal import Decimal +from typing import Optional, Dict, Any, List +from uuid import UUID +from pydantic import BaseModel, ConfigDict + + +class MetricSummary(BaseModel): + """Summary metrics for a scenario.""" + + total_requests: int + total_cost_usd: Decimal + sqs_blocks: int + lambda_invocations: int + llm_tokens: int + pii_violations: int + + +class CostBreakdown(BaseModel): + """Cost breakdown by service.""" + + service: str + cost_usd: Decimal + percentage: float + + +class TimeseriesPoint(BaseModel): + """Single point in a timeseries.""" + + timestamp: datetime + metric_type: str + value: Decimal + + +class MetricsResponse(BaseModel): + """Complete metrics response for a scenario.""" + + scenario_id: UUID + summary: MetricSummary + cost_breakdown: List[CostBreakdown] + timeseries: List[TimeseriesPoint] diff --git a/src/schemas/scenario.py b/src/schemas/scenario.py new file mode 100644 index 0000000..36bb01f --- /dev/null +++ b/src/schemas/scenario.py @@ -0,0 +1,54 @@ +"""Scenario schemas.""" + +from datetime import datetime +from decimal import Decimal +from typing import Optional, List +from uuid import UUID +from pydantic import BaseModel, Field, ConfigDict + + +class ScenarioBase(BaseModel): + """Base scenario schema.""" + + name: str = Field(..., min_length=1, max_length=255) + description: Optional[str] = None + tags: List[str] = Field(default_factory=list) + region: str = Field(default="us-east-1", pattern=r"^[a-z]{2}-[a-z]+-[0-9]$") + + +class ScenarioCreate(ScenarioBase): + """Schema for creating a scenario.""" + + pass + + +class ScenarioUpdate(BaseModel): + """Schema for updating a scenario.""" + + name: Optional[str] = Field(None, min_length=1, max_length=255) + description: Optional[str] = None + tags: Optional[List[str]] = None + + +class ScenarioResponse(ScenarioBase): + """Schema for scenario response.""" + + model_config = ConfigDict(from_attributes=True) + + id: UUID + status: str + created_at: datetime + updated_at: datetime + completed_at: Optional[datetime] = None + started_at: Optional[datetime] = None + total_requests: int + total_cost_estimate: Decimal + + +class ScenarioList(BaseModel): + """Schema for list of scenarios.""" + + items: List[ScenarioResponse] + total: int + page: int + page_size: int