Complete API implementation (BE-006 to BE-010):
BE-006: API Dependencies & Configuration
- Add core/config.py with Settings and environment variables
- Add core/exceptions.py with AppException hierarchy
- Add api/deps.py with get_db() and get_running_scenario() dependencies
- Add pydantic-settings dependency
BE-007: Services Layer
- Add services/pii_detector.py: PIIDetector with email/SSN/credit card patterns
- Add services/cost_calculator.py: AWS cost calculation (SQS, Lambda, Bedrock)
- Add services/ingest_service.py: Log processing with hash, PII detection, metrics
BE-008: Scenarios API Endpoints
- POST /api/v1/scenarios - Create scenario
- GET /api/v1/scenarios - List with filters and pagination
- GET /api/v1/scenarios/{id} - Get single scenario
- PUT /api/v1/scenarios/{id} - Update scenario
- DELETE /api/v1/scenarios/{id} - Delete scenario
- POST /api/v1/scenarios/{id}/start - Start (draft->running)
- POST /api/v1/scenarios/{id}/stop - Stop (running->completed)
- POST /api/v1/scenarios/{id}/archive - Archive (completed->archived)
BE-009: Ingest API
- POST /ingest with X-Scenario-ID header validation
- Depends on get_running_scenario() for status check
- Returns LogResponse with processed metrics
- POST /flush for backward compatibility
BE-010: Metrics API
- GET /api/v1/scenarios/{id}/metrics - Full metrics endpoint
- Aggregates data from scenario_logs
- Calculates costs using CostCalculator
- Returns cost breakdown (SQS/Lambda/Bedrock)
- Returns timeseries data grouped by hour
Refactored main.py:
- Simplified to use api_router
- Added exception handlers
- Added health check endpoint
All endpoints tested and working.
Tasks: BE-006, BE-007, BE-008, BE-009, BE-010 complete
1015 lines
28 KiB
Markdown
1015 lines
28 KiB
Markdown
# 🚀 @backend-dev - API Implementation
|
|
|
|
## 📊 Stato Progetto
|
|
|
|
**Data:** 2026-04-07
|
|
**Fase:** 1 - Database & Backend Core
|
|
**Database:** ✅ COMPLETATO
|
|
**Models/Schemas:** ✅ COMPLETATI
|
|
**Repositories:** ✅ COMPLETI
|
|
|
|
### ✅ Cosa è pronto
|
|
- Database PostgreSQL con tutte le tabelle
|
|
- SQLAlchemy Models (5 modelli)
|
|
- Pydantic Schemas
|
|
- Repository Pattern (Base + Scenario)
|
|
- Alembic migrations
|
|
|
|
### 🎯 I tuoi task (Priorità P1)
|
|
|
|
---
|
|
|
|
## BE-006: API Dependencies & Configuration
|
|
**Stima:** S (30-60 min)
|
|
**Dipende da:** BE-005 completato ✅
|
|
|
|
### Obiettivo
|
|
Configurare dependencies FastAPI per database e preparare struttura API.
|
|
|
|
### Files da creare/modificare
|
|
```
|
|
src/
|
|
├── api/
|
|
│ ├── __init__.py
|
|
│ └── deps.py # NUOVO
|
|
└── core/
|
|
├── config.py # NUOVO
|
|
└── exceptions.py # NUOVO
|
|
```
|
|
|
|
### Implementazione richiesta
|
|
|
|
**1. `src/core/config.py`**
|
|
```python
|
|
"""Application configuration."""
|
|
|
|
from pydantic_settings import BaseSettings
|
|
from functools import lru_cache
|
|
|
|
|
|
class Settings(BaseSettings):
|
|
"""Application settings from environment variables."""
|
|
|
|
# Database
|
|
database_url: str = "postgresql+asyncpg://app:changeme@localhost:5432/mockupaws"
|
|
|
|
# Application
|
|
app_name: str = "mockupAWS"
|
|
debug: bool = False
|
|
|
|
# Pagination
|
|
default_page_size: int = 20
|
|
max_page_size: int = 100
|
|
|
|
class Config:
|
|
env_file = ".env"
|
|
case_sensitive = False
|
|
|
|
|
|
@lru_cache()
|
|
def get_settings() -> Settings:
|
|
"""Get cached settings instance."""
|
|
return Settings()
|
|
|
|
|
|
settings = get_settings()
|
|
```
|
|
|
|
**2. `src/core/exceptions.py`**
|
|
```python
|
|
"""Custom exceptions for the application."""
|
|
|
|
from fastapi import HTTPException, status
|
|
|
|
|
|
class AppException(Exception):
|
|
"""Base application exception."""
|
|
status_code: int = status.HTTP_500_INTERNAL_SERVER_ERROR
|
|
code: str = "internal_error"
|
|
|
|
def __init__(self, message: str = None):
|
|
self.message = message or "An internal error occurred"
|
|
super().__init__(self.message)
|
|
|
|
|
|
class NotFoundException(AppException):
|
|
"""Resource not found exception."""
|
|
status_code = status.HTTP_404_NOT_FOUND
|
|
code = "not_found"
|
|
|
|
def __init__(self, resource: str = "Resource"):
|
|
super().__init__(f"{resource} not found")
|
|
|
|
|
|
class ValidationException(AppException):
|
|
"""Validation error exception."""
|
|
status_code = status.HTTP_400_BAD_REQUEST
|
|
code = "validation_error"
|
|
|
|
|
|
class ConflictException(AppException):
|
|
"""Conflict error exception."""
|
|
status_code = status.HTTP_409_CONFLICT
|
|
code = "conflict"
|
|
|
|
|
|
class ScenarioNotRunningException(AppException):
|
|
"""Scenario is not in running state."""
|
|
status_code = status.HTTP_400_BAD_REQUEST
|
|
code = "scenario_not_running"
|
|
|
|
def __init__(self):
|
|
super().__init__("Scenario is not in 'running' state. Cannot ingest logs.")
|
|
|
|
|
|
def setup_exception_handlers(app):
|
|
"""Setup exception handlers for FastAPI app."""
|
|
from fastapi import Request
|
|
from fastapi.responses import JSONResponse
|
|
|
|
@app.exception_handler(AppException)
|
|
async def app_exception_handler(request: Request, exc: AppException):
|
|
return JSONResponse(
|
|
status_code=exc.status_code,
|
|
content={
|
|
"error": exc.code,
|
|
"message": exc.message,
|
|
"status_code": exc.status_code
|
|
}
|
|
)
|
|
```
|
|
|
|
**3. `src/api/deps.py`**
|
|
```python
|
|
"""API dependencies."""
|
|
|
|
from typing import AsyncGenerator
|
|
from fastapi import Depends, HTTPException, status
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from src.core.database import AsyncSessionLocal
|
|
from src.repositories.scenario import scenario_repository, ScenarioStatus
|
|
from src.core.exceptions import NotFoundException, ScenarioNotRunningException
|
|
|
|
|
|
async def get_db() -> AsyncGenerator[AsyncSession, None]:
|
|
"""Dependency that provides a database session."""
|
|
async with AsyncSessionLocal() as session:
|
|
try:
|
|
yield session
|
|
finally:
|
|
await session.close()
|
|
|
|
|
|
async def get_running_scenario(
|
|
scenario_id: str,
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
"""Dependency that validates scenario exists and is running."""
|
|
from uuid import UUID
|
|
|
|
try:
|
|
scenario_uuid = UUID(scenario_id)
|
|
except ValueError:
|
|
raise NotFoundException("Scenario")
|
|
|
|
scenario = await scenario_repository.get(db, scenario_uuid)
|
|
if not scenario:
|
|
raise NotFoundException("Scenario")
|
|
|
|
if scenario.status != ScenarioStatus.RUNNING.value:
|
|
raise ScenarioNotRunningException()
|
|
|
|
return scenario
|
|
```
|
|
|
|
### Criteri di accettazione
|
|
- [ ] Configurazione da environment variables
|
|
- [ ] Exception hierarchy completa
|
|
- [ ] Global exception handlers
|
|
- [ ] get_db() dependency funzionante
|
|
- [ ] get_running_scenario() dependency
|
|
|
|
---
|
|
|
|
## BE-007: Services Layer
|
|
**Stima:** L (2-4 ore)
|
|
**Dipende da:** BE-006
|
|
|
|
### Obiettivo
|
|
Implementare Services per business logic (Cost Calculator, Ingest Processor, PII Detector).
|
|
|
|
### Files da creare
|
|
```
|
|
src/services/
|
|
├── __init__.py
|
|
├── cost_calculator.py # Calcolo costi AWS
|
|
├── pii_detector.py # Rilevamento PII
|
|
└── ingest_service.py # Logica ingestione log
|
|
```
|
|
|
|
### Implementazione richiesta
|
|
|
|
**1. `src/services/pii_detector.py`**
|
|
```python
|
|
"""PII detection service."""
|
|
|
|
import re
|
|
from typing import Dict, List, Optional
|
|
from dataclasses import dataclass
|
|
|
|
|
|
@dataclass
|
|
class PIIDetectionResult:
|
|
"""Result of PII detection."""
|
|
has_pii: bool
|
|
pii_types: List[str]
|
|
total_matches: int
|
|
details: Dict[str, List[str]]
|
|
|
|
|
|
class PIIDetector:
|
|
"""Service for detecting PII in messages."""
|
|
|
|
# Regex patterns for common PII
|
|
PATTERNS = {
|
|
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
|
|
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
|
|
'credit_card': r'\b(?:\d[ -]*?){13,16}\b',
|
|
'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
|
|
'ip_address': r'\b(?:\d{1,3}\.){3}\d{1,3}\b',
|
|
}
|
|
|
|
def detect(self, message: str) -> PIIDetectionResult:
|
|
"""Detect PII in a message."""
|
|
results = {}
|
|
|
|
for pii_type, pattern in self.PATTERNS.items():
|
|
matches = re.findall(pattern, message)
|
|
if matches:
|
|
results[pii_type] = matches
|
|
|
|
return PIIDetectionResult(
|
|
has_pii=len(results) > 0,
|
|
pii_types=list(results.keys()),
|
|
total_matches=sum(len(matches) for matches in results.values()),
|
|
details=results
|
|
)
|
|
|
|
def detect_simple(self, message: str) -> bool:
|
|
"""Simple PII detection - returns True if PII found."""
|
|
# Quick check for email (most common)
|
|
return '@' in message and '.com' in message
|
|
|
|
|
|
# Singleton instance
|
|
pii_detector = PIIDetector()
|
|
```
|
|
|
|
**2. `src/services/cost_calculator.py`**
|
|
```python
|
|
"""Cost calculation service."""
|
|
|
|
from decimal import Decimal
|
|
from typing import Optional
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from src.repositories.base import BaseRepository
|
|
from src.models.aws_pricing import AwsPricing
|
|
|
|
|
|
class CostCalculator:
|
|
"""Service for calculating AWS costs."""
|
|
|
|
def __init__(self):
|
|
self.pricing_repo = BaseRepository(AwsPricing)
|
|
|
|
async def get_pricing(
|
|
self,
|
|
db: AsyncSession,
|
|
service: str,
|
|
region: str,
|
|
tier: str
|
|
) -> Optional[Decimal]:
|
|
"""Get active pricing for a service/region/tier."""
|
|
from sqlalchemy import select, and_
|
|
from datetime import date
|
|
|
|
result = await db.execute(
|
|
select(AwsPricing).where(
|
|
and_(
|
|
AwsPricing.service == service,
|
|
AwsPricing.region == region,
|
|
AwsPricing.tier == tier,
|
|
AwsPricing.is_active == True
|
|
)
|
|
)
|
|
)
|
|
pricing = result.scalar_one_or_none()
|
|
return pricing.price_per_unit if pricing else None
|
|
|
|
async def calculate_sqs_cost(
|
|
self,
|
|
db: AsyncSession,
|
|
blocks: int,
|
|
region: str
|
|
) -> Decimal:
|
|
"""Calculate SQS cost."""
|
|
price = await self.get_pricing(db, 'sqs', region, 'standard')
|
|
if price is None:
|
|
# Default price
|
|
price = Decimal('0.40')
|
|
|
|
# Formula: blocks * price_per_million / 1,000,000
|
|
return Decimal(blocks) * price / Decimal('1000000')
|
|
|
|
async def calculate_lambda_cost(
|
|
self,
|
|
db: AsyncSession,
|
|
invocations: int,
|
|
gb_seconds: float,
|
|
region: str
|
|
) -> Decimal:
|
|
"""Calculate Lambda cost (requests + compute)."""
|
|
request_price = await self.get_pricing(db, 'lambda', region, 'x86_request')
|
|
compute_price = await self.get_pricing(db, 'lambda', region, 'x86_compute')
|
|
|
|
if request_price is None:
|
|
request_price = Decimal('0.20')
|
|
if compute_price is None:
|
|
compute_price = Decimal('0.0000166667')
|
|
|
|
# Formula: (invocations * price_per_million / 1M) + (gb_seconds * price_per_gb_second)
|
|
request_cost = Decimal(invocations) * request_price / Decimal('1000000')
|
|
compute_cost = Decimal(str(gb_seconds)) * compute_price
|
|
return request_cost + compute_cost
|
|
|
|
async def calculate_bedrock_cost(
|
|
self,
|
|
db: AsyncSession,
|
|
input_tokens: int,
|
|
output_tokens: int,
|
|
region: str,
|
|
model: str = 'claude_3_sonnet'
|
|
) -> Decimal:
|
|
"""Calculate Bedrock LLM cost."""
|
|
input_price = await self.get_pricing(db, 'bedrock', region, f'{model}_input')
|
|
output_price = await self.get_pricing(db, 'bedrock', region, f'{model}_output')
|
|
|
|
if input_price is None:
|
|
input_price = Decimal('0.003')
|
|
if output_price is None:
|
|
output_price = Decimal('0.015')
|
|
|
|
# Formula: (tokens * price_per_1k / 1000)
|
|
input_cost = Decimal(input_tokens) * input_price / Decimal('1000')
|
|
output_cost = Decimal(output_tokens) * output_price / Decimal('1000')
|
|
return input_cost + output_cost
|
|
|
|
|
|
# Singleton instance
|
|
cost_calculator = CostCalculator()
|
|
```
|
|
|
|
**3. `src/services/ingest_service.py`**
|
|
```python
|
|
"""Log ingestion service."""
|
|
|
|
import hashlib
|
|
from datetime import datetime
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from src.models.scenario_log import ScenarioLog
|
|
from src.models.scenario import Scenario
|
|
from src.repositories.scenario import scenario_repository
|
|
from src.services.pii_detector import pii_detector
|
|
from src.services.cost_calculator import cost_calculator
|
|
from src.profiler import count_tokens, calculate_sqs_blocks
|
|
|
|
|
|
class IngestService:
|
|
"""Service for processing and ingesting logs."""
|
|
|
|
async def ingest_log(
|
|
self,
|
|
db: AsyncSession,
|
|
scenario: Scenario,
|
|
message: str,
|
|
source: str = "unknown"
|
|
) -> ScenarioLog:
|
|
"""Process and save a log entry."""
|
|
|
|
# Calculate message hash for deduplication
|
|
message_hash = hashlib.sha256(message.encode()).hexdigest()
|
|
|
|
# Truncate message for preview (privacy)
|
|
message_preview = message[:500] if len(message) > 500 else message
|
|
|
|
# Detect PII
|
|
has_pii = pii_detector.detect_simple(message)
|
|
|
|
# Calculate metrics
|
|
token_count = count_tokens(message)
|
|
payload_size = len(message.encode('utf-8'))
|
|
sqs_blocks = calculate_sqs_blocks(message)
|
|
|
|
# Create log entry
|
|
log_entry = ScenarioLog(
|
|
scenario_id=scenario.id,
|
|
received_at=datetime.utcnow(),
|
|
message_hash=message_hash,
|
|
message_preview=message_preview,
|
|
source=source,
|
|
size_bytes=payload_size,
|
|
has_pii=has_pii,
|
|
token_count=token_count,
|
|
sqs_blocks=sqs_blocks
|
|
)
|
|
|
|
db.add(log_entry)
|
|
|
|
# Update scenario metrics
|
|
await scenario_repository.increment_total_requests(db, scenario.id)
|
|
|
|
await db.commit()
|
|
await db.refresh(log_entry)
|
|
|
|
return log_entry
|
|
|
|
|
|
# Singleton instance
|
|
ingest_service = IngestService()
|
|
```
|
|
|
|
**4. `src/services/__init__.py`**
|
|
```python
|
|
"""Services package."""
|
|
|
|
from src.services.pii_detector import PIIDetector, pii_detector, PIIDetectionResult
|
|
from src.services.cost_calculator import CostCalculator, cost_calculator
|
|
from src.services.ingest_service import IngestService, ingest_service
|
|
|
|
__all__ = [
|
|
"PIIDetector",
|
|
"pii_detector",
|
|
"PIIDetectionResult",
|
|
"CostCalculator",
|
|
"cost_calculator",
|
|
"IngestService",
|
|
"ingest_service",
|
|
]
|
|
```
|
|
|
|
### Criteri di accettazione
|
|
- [ ] PIIDetector con multipli pattern
|
|
- [ ] CostCalculator con tutte le formule AWS
|
|
- [ ] IngestService con hash, metrics, PII detection
|
|
- [ ] Integrazione con repositories esistenti
|
|
- [ ] Test unitari per services
|
|
|
|
---
|
|
|
|
## BE-008: Scenarios API Endpoints
|
|
**Stima:** L (2-4 ore)
|
|
**Dipende da:** BE-007
|
|
|
|
### Obiettivo
|
|
Implementare CRUD API per scenarios.
|
|
|
|
### Files da creare
|
|
```
|
|
src/api/
|
|
├── __init__.py
|
|
└── v1/
|
|
├── __init__.py
|
|
└── scenarios.py # CRUD endpoints
|
|
```
|
|
|
|
### Implementazione richiesta
|
|
|
|
**1. `src/api/__init__.py`**
|
|
```python
|
|
"""API package."""
|
|
```
|
|
|
|
**2. `src/api/v1/__init__.py`**
|
|
```python
|
|
"""API v1 routes."""
|
|
|
|
from fastapi import APIRouter
|
|
|
|
from src.api.v1.scenarios import router as scenarios_router
|
|
|
|
api_router = APIRouter()
|
|
api_router.include_router(scenarios_router, prefix="/scenarios", tags=["scenarios"])
|
|
```
|
|
|
|
**3. `src/api/v1/scenarios.py`**
|
|
```python
|
|
"""Scenario API endpoints."""
|
|
|
|
from typing import List
|
|
from uuid import UUID
|
|
from fastapi import APIRouter, Depends, Query, status
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from src.api.deps import get_db
|
|
from src.repositories.scenario import scenario_repository, ScenarioStatus
|
|
from src.schemas.scenario import (
|
|
ScenarioCreate,
|
|
ScenarioUpdate,
|
|
ScenarioResponse,
|
|
ScenarioList
|
|
)
|
|
from src.core.exceptions import NotFoundException, ValidationException
|
|
from src.core.config import settings
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.post("", response_model=ScenarioResponse, status_code=status.HTTP_201_CREATED)
|
|
async def create_scenario(
|
|
scenario_in: ScenarioCreate,
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
"""Create a new scenario."""
|
|
# Check for duplicate name
|
|
existing = await scenario_repository.get_by_name(db, scenario_in.name)
|
|
if existing:
|
|
raise ValidationException(f"Scenario with name '{scenario_in.name}' already exists")
|
|
|
|
scenario = await scenario_repository.create(db, obj_in=scenario_in.model_dump())
|
|
return scenario
|
|
|
|
|
|
@router.get("", response_model=ScenarioList)
|
|
async def list_scenarios(
|
|
status: str = Query(None, description="Filter by status"),
|
|
region: str = Query(None, description="Filter by region"),
|
|
page: int = Query(1, ge=1, description="Page number"),
|
|
page_size: int = Query(
|
|
settings.default_page_size,
|
|
ge=1,
|
|
le=settings.max_page_size,
|
|
description="Items per page"
|
|
),
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
"""List scenarios with optional filtering."""
|
|
skip = (page - 1) * page_size
|
|
|
|
filters = {}
|
|
if status:
|
|
filters['status'] = status
|
|
if region:
|
|
filters['region'] = region
|
|
|
|
scenarios = await scenario_repository.get_multi(
|
|
db, skip=skip, limit=page_size, **filters
|
|
)
|
|
total = await scenario_repository.count(db, **filters)
|
|
|
|
return ScenarioList(
|
|
items=scenarios,
|
|
total=total,
|
|
page=page,
|
|
page_size=page_size
|
|
)
|
|
|
|
|
|
@router.get("/{scenario_id}", response_model=ScenarioResponse)
|
|
async def get_scenario(
|
|
scenario_id: UUID,
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
"""Get a specific scenario by ID."""
|
|
scenario = await scenario_repository.get(db, scenario_id)
|
|
if not scenario:
|
|
raise NotFoundException("Scenario")
|
|
return scenario
|
|
|
|
|
|
@router.put("/{scenario_id}", response_model=ScenarioResponse)
|
|
async def update_scenario(
|
|
scenario_id: UUID,
|
|
scenario_in: ScenarioUpdate,
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
"""Update a scenario."""
|
|
scenario = await scenario_repository.get(db, scenario_id)
|
|
if not scenario:
|
|
raise NotFoundException("Scenario")
|
|
|
|
# Check name uniqueness if updating name
|
|
if scenario_in.name and scenario_in.name != scenario.name:
|
|
existing = await scenario_repository.get_by_name(db, scenario_in.name)
|
|
if existing:
|
|
raise ValidationException(f"Scenario with name '{scenario_in.name}' already exists")
|
|
|
|
updated = await scenario_repository.update(
|
|
db,
|
|
db_obj=scenario,
|
|
obj_in=scenario_in.model_dump(exclude_unset=True)
|
|
)
|
|
return updated
|
|
|
|
|
|
@router.delete("/{scenario_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
async def delete_scenario(
|
|
scenario_id: UUID,
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
"""Delete a scenario."""
|
|
scenario = await scenario_repository.get(db, scenario_id)
|
|
if not scenario:
|
|
raise NotFoundException("Scenario")
|
|
|
|
await scenario_repository.delete(db, id=scenario_id)
|
|
return None
|
|
|
|
|
|
@router.post("/{scenario_id}/start", response_model=ScenarioResponse)
|
|
async def start_scenario(
|
|
scenario_id: UUID,
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
"""Start a scenario (draft -> running)."""
|
|
scenario = await scenario_repository.get(db, scenario_id)
|
|
if not scenario:
|
|
raise NotFoundException("Scenario")
|
|
|
|
if scenario.status != ScenarioStatus.DRAFT.value:
|
|
raise ValidationException(f"Cannot start scenario with status '{scenario.status}'")
|
|
|
|
from datetime import datetime
|
|
await scenario_repository.update(
|
|
db,
|
|
db_obj=scenario,
|
|
obj_in={
|
|
'status': ScenarioStatus.RUNNING.value,
|
|
'started_at': datetime.utcnow()
|
|
}
|
|
)
|
|
await db.refresh(scenario)
|
|
return scenario
|
|
|
|
|
|
@router.post("/{scenario_id}/stop", response_model=ScenarioResponse)
|
|
async def stop_scenario(
|
|
scenario_id: UUID,
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
"""Stop a scenario (running -> completed)."""
|
|
scenario = await scenario_repository.get(db, scenario_id)
|
|
if not scenario:
|
|
raise NotFoundException("Scenario")
|
|
|
|
if scenario.status != ScenarioStatus.RUNNING.value:
|
|
raise ValidationException(f"Cannot stop scenario with status '{scenario.status}'")
|
|
|
|
from datetime import datetime
|
|
await scenario_repository.update(
|
|
db,
|
|
db_obj=scenario,
|
|
obj_in={
|
|
'status': ScenarioStatus.COMPLETED.value,
|
|
'completed_at': datetime.utcnow()
|
|
}
|
|
)
|
|
await db.refresh(scenario)
|
|
return scenario
|
|
|
|
|
|
@router.post("/{scenario_id}/archive", response_model=ScenarioResponse)
|
|
async def archive_scenario(
|
|
scenario_id: UUID,
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
"""Archive a scenario (completed -> archived)."""
|
|
scenario = await scenario_repository.get(db, scenario_id)
|
|
if not scenario:
|
|
raise NotFoundException("Scenario")
|
|
|
|
if scenario.status != ScenarioStatus.COMPLETED.value:
|
|
raise ValidationException(f"Cannot archive scenario with status '{scenario.status}'")
|
|
|
|
await scenario_repository.update_status(db, scenario_id, ScenarioStatus.ARCHIVED)
|
|
await db.refresh(scenario)
|
|
return scenario
|
|
```
|
|
|
|
### Criteri di accettazione
|
|
- [ ] POST /api/v1/scenarios - Create
|
|
- [ ] GET /api/v1/scenarios - List (con filtri e pagination)
|
|
- [ ] GET /api/v1/scenarios/{id} - Get
|
|
- [ ] PUT /api/v1/scenarios/{id} - Update
|
|
- [ ] DELETE /api/v1/scenarios/{id} - Delete
|
|
- [ ] POST /api/v1/scenarios/{id}/start - Start
|
|
- [ ] POST /api/v1/scenarios/{id}/stop - Stop
|
|
- [ ] POST /api/v1/scenarios/{id}/archive - Archive
|
|
- [ ] Validazione errori appropriata
|
|
- [ ] Response models corretti
|
|
|
|
---
|
|
|
|
## BE-009: Ingest API (Updated)
|
|
**Stima:** M (1-2 ore)
|
|
**Dipende da:** BE-008
|
|
|
|
### Obiettivo
|
|
Aggiornare endpoint /ingest per usare il database.
|
|
|
|
### Files da modificare
|
|
```
|
|
src/api/v1/
|
|
├── __init__.py # Aggiornare
|
|
└── ingest.py # NUOVO
|
|
```
|
|
|
|
### Implementazione richiesta
|
|
|
|
**1. `src/api/v1/ingest.py`**
|
|
```python
|
|
"""Ingest API endpoints."""
|
|
|
|
from uuid import UUID
|
|
from fastapi import APIRouter, Depends, Header, HTTPException, status
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from src.api.deps import get_db, get_running_scenario
|
|
from src.schemas.log import LogIngest, LogResponse
|
|
from src.services.ingest_service import ingest_service
|
|
from src.core.exceptions import ScenarioNotRunningException
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.post("/ingest", response_model=LogResponse, status_code=status.HTTP_202_ACCEPTED)
|
|
async def ingest_log(
|
|
log_data: LogIngest,
|
|
x_scenario_id: str = Header(..., alias="X-Scenario-ID"),
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
"""
|
|
Ingest a log message for processing.
|
|
|
|
- **message**: The log message content
|
|
- **source**: Optional source identifier
|
|
- **X-Scenario-ID**: Header with the scenario UUID
|
|
"""
|
|
# Validate scenario is running
|
|
scenario = await get_running_scenario(x_scenario_id, db)
|
|
|
|
# Process and save log
|
|
log_entry = await ingest_service.ingest_log(
|
|
db=db,
|
|
scenario=scenario,
|
|
message=log_data.message,
|
|
source=log_data.source
|
|
)
|
|
|
|
return LogResponse(
|
|
id=log_entry.id,
|
|
scenario_id=log_entry.scenario_id,
|
|
received_at=log_entry.received_at,
|
|
message_preview=log_entry.message_preview,
|
|
source=log_entry.source,
|
|
size_bytes=log_entry.size_bytes,
|
|
has_pii=log_entry.has_pii,
|
|
token_count=log_entry.token_count,
|
|
sqs_blocks=log_entry.sqs_blocks
|
|
)
|
|
|
|
|
|
@router.post("/flush")
|
|
async def flush_queue():
|
|
"""Force immediate processing of queued messages."""
|
|
# Since we're now synchronous in DB, this is a no-op
|
|
# Kept for backward compatibility
|
|
return {"status": "flushed", "message": "Processing is now synchronous"}
|
|
```
|
|
|
|
**2. Aggiorna `src/api/v1/__init__.py`**:
|
|
```python
|
|
"""API v1 routes."""
|
|
|
|
from fastapi import APIRouter
|
|
|
|
from src.api.v1.scenarios import router as scenarios_router
|
|
from src.api.v1.ingest import router as ingest_router
|
|
|
|
api_router = APIRouter()
|
|
api_router.include_router(scenarios_router, prefix="/scenarios", tags=["scenarios"])
|
|
api_router.include_router(ingest_router, tags=["ingest"])
|
|
```
|
|
|
|
### Criteri di accettazione
|
|
- [ ] Endpoint /ingest usa X-Scenario-ID header
|
|
- [ ] Valida scenario esiste e status=running
|
|
- [ ] Usa IngestService per processare log
|
|
- [ ] Salva log in database
|
|
- [ ] Ritorna LogResponse
|
|
- [ ] Mantiene compatibilità con /flush
|
|
|
|
---
|
|
|
|
## BE-010: Metrics API
|
|
**Stima:** M (1-2 ore)
|
|
**Dipende da:** BE-009
|
|
|
|
### Obiettivo
|
|
Implementare endpoint per ottenere metriche di uno scenario.
|
|
|
|
### Files da creare
|
|
```
|
|
src/api/v1/
|
|
└── metrics.py # Metrics endpoints
|
|
```
|
|
|
|
### Implementazione richiesta
|
|
|
|
**`src/api/v1/metrics.py`**:
|
|
```python
|
|
"""Metrics API endpoints."""
|
|
|
|
from uuid import UUID
|
|
from decimal import Decimal
|
|
from fastapi import APIRouter, Depends, Query
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select, func, and_
|
|
|
|
from src.api.deps import get_db
|
|
from src.repositories.scenario import scenario_repository
|
|
from src.schemas.metric import MetricsResponse, MetricSummary, CostBreakdown, TimeseriesPoint
|
|
from src.core.exceptions import NotFoundException
|
|
from src.services.cost_calculator import cost_calculator
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.get("/scenarios/{scenario_id}/metrics", response_model=MetricsResponse)
|
|
async def get_scenario_metrics(
|
|
scenario_id: UUID,
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
"""Get aggregated metrics for a scenario."""
|
|
|
|
# Verify scenario exists
|
|
scenario = await scenario_repository.get(db, scenario_id)
|
|
if not scenario:
|
|
raise NotFoundException("Scenario")
|
|
|
|
from src.models.scenario_log import ScenarioLog
|
|
from src.models.scenario_metric import ScenarioMetric
|
|
|
|
# Get summary metrics
|
|
summary_result = await db.execute(
|
|
select(
|
|
func.count(ScenarioLog.id).label('total_logs'),
|
|
func.sum(ScenarioLog.sqs_blocks).label('total_sqs_blocks'),
|
|
func.sum(ScenarioLog.token_count).label('total_tokens'),
|
|
func.count(ScenarioLog.id).filter(ScenarioLog.has_pii == True).label('pii_violations')
|
|
).where(ScenarioLog.scenario_id == scenario_id)
|
|
)
|
|
summary_row = summary_result.one()
|
|
|
|
# Calculate costs
|
|
region = scenario.region
|
|
sqs_cost = await cost_calculator.calculate_sqs_cost(
|
|
db, summary_row.total_sqs_blocks or 0, region
|
|
)
|
|
|
|
# For demo: assume 1 lambda invocation per 100 logs
|
|
lambda_invocations = (summary_row.total_logs or 0) // 100 + 1
|
|
lambda_cost = await cost_calculator.calculate_lambda_cost(
|
|
db, lambda_invocations, 1.0, region # 1 GB-second per invocation
|
|
)
|
|
|
|
# Bedrock cost for tokens
|
|
bedrock_cost = await cost_calculator.calculate_bedrock_cost(
|
|
db, summary_row.total_tokens or 0, 0, region # No output tokens for now
|
|
)
|
|
|
|
total_cost = sqs_cost + lambda_cost + bedrock_cost
|
|
|
|
# Cost breakdown
|
|
cost_breakdown = [
|
|
CostBreakdown(service='SQS', cost_usd=sqs_cost, percentage=float(sqs_cost / total_cost * 100) if total_cost > 0 else 0),
|
|
CostBreakdown(service='Lambda', cost_usd=lambda_cost, percentage=float(lambda_cost / total_cost * 100) if total_cost > 0 else 0),
|
|
CostBreakdown(service='Bedrock', cost_usd=bedrock_cost, percentage=float(bedrock_cost / total_cost * 100) if total_cost > 0 else 0),
|
|
]
|
|
|
|
summary = MetricSummary(
|
|
total_requests=scenario.total_requests,
|
|
total_cost_usd=total_cost,
|
|
sqs_blocks=summary_row.total_sqs_blocks or 0,
|
|
lambda_invocations=lambda_invocations,
|
|
llm_tokens=summary_row.total_tokens or 0,
|
|
pii_violations=summary_row.pii_violations or 0
|
|
)
|
|
|
|
# Get timeseries data (grouped by hour)
|
|
timeseries_result = await db.execute(
|
|
select(
|
|
func.date_trunc('hour', ScenarioLog.received_at).label('hour'),
|
|
func.count(ScenarioLog.id).label('count')
|
|
)
|
|
.where(ScenarioLog.scenario_id == scenario_id)
|
|
.group_by(func.date_trunc('hour', ScenarioLog.received_at))
|
|
.order_by(func.date_trunc('hour', ScenarioLog.received_at))
|
|
)
|
|
|
|
timeseries = [
|
|
TimeseriesPoint(
|
|
timestamp=row.hour,
|
|
metric_type='requests',
|
|
value=Decimal(row.count)
|
|
)
|
|
for row in timeseries_result.all()
|
|
]
|
|
|
|
return MetricsResponse(
|
|
scenario_id=scenario_id,
|
|
summary=summary,
|
|
cost_breakdown=cost_breakdown,
|
|
timeseries=timeseries
|
|
)
|
|
```
|
|
|
|
**Aggiorna `src/api/v1/__init__.py`**:
|
|
```python
|
|
api_router.include_router(metrics_router, prefix="/scenarios", tags=["metrics"])
|
|
```
|
|
|
|
### Criteri di accettazione
|
|
- [ ] GET /api/v1/scenarios/{id}/metrics - Full metrics
|
|
- [ ] Calcolo costi usando CostCalculator
|
|
- [ ] Aggregazioni da scenario_logs
|
|
- [ ] Timeseries raggruppato per ora
|
|
- [ ] Response conforme a MetricsResponse schema
|
|
|
|
---
|
|
|
|
## 📋 Checklist Completamento
|
|
|
|
Prima di procedere, verifica:
|
|
|
|
- [ ] Tutti i services implementati e testati
|
|
- [ ] API endpoints funzionanti
|
|
- [ ] Error handling appropriato
|
|
- [ ] Response schemas corrette
|
|
- [ ] Logging aggiunto dove necessario
|
|
- [ ] Documentazione OpenAPI visibile su /docs
|
|
|
|
---
|
|
|
|
## 🧪 Testing
|
|
|
|
```bash
|
|
# Test API
|
|
curl -X POST http://localhost:8000/api/v1/scenarios \
|
|
-H "Content-Type: application/json" \
|
|
-d '{"name": "Test", "region": "us-east-1"}'
|
|
|
|
# Test ingest
|
|
curl -X POST http://localhost:8000/ingest \
|
|
-H "Content-Type: application/json" \
|
|
-H "X-Scenario-ID: <uuid>" \
|
|
-d '{"message": "test log", "source": "test"}'
|
|
|
|
# Check docs
|
|
open http://localhost:8000/docs
|
|
```
|
|
|
|
---
|
|
|
|
## 🚀 Comandi Utili
|
|
|
|
```bash
|
|
# Run server
|
|
uv run uvicorn src.main:app --reload
|
|
|
|
# Run tests
|
|
uv run pytest tests/integration/test_api_scenarios.py -v
|
|
|
|
# Check types
|
|
uv run mypy src/
|
|
```
|
|
|
|
---
|
|
|
|
**@backend-dev: CONTINUIAMO! 🚀**
|
|
|
|
Procedi in ordine: BE-006 → BE-007 → BE-008 → BE-009 → BE-010
|
|
|
|
**Domande?** Riferiti ai files già creati in `src/`.
|
|
|
|
**Commit convenzioni:**
|
|
- `feat: add API dependencies and exception handling`
|
|
- `feat: implement services (cost calculator, PII detector, ingest)`
|
|
- `feat: add scenarios CRUD API endpoints`
|
|
- `feat: update ingest endpoint to use database`
|
|
- `feat: add metrics API with cost calculations`
|
|
|
|
**Buon lavoro! 💪**
|