feat(schemas): T30 add Pydantic statistics schemas

Add comprehensive Pydantic schemas for statistics management:
- UsageStatsCreate: input validation for creating usage stats
- UsageStatsResponse: orm_mode response schema
- StatsSummary: aggregated statistics with totals and averages
- StatsByModel: per-model breakdown with percentages
- StatsByDate: daily usage aggregation
- DashboardResponse: complete dashboard data structure

All schemas use Decimal for cost precision and proper validation.

Test: 16 unit tests, 100% coverage on stats.py
This commit is contained in:
Luca Sacchi Ricciardi
2026-04-07 15:04:49 +02:00
parent 761ef793a8
commit 0df1638da8
5 changed files with 1230 additions and 3 deletions

View File

@@ -88,9 +88,11 @@
**Test totali API keys:** 38 test (25 router + 13 schema)
**Coverage router:** 100%
### 📊 Dashboard & Statistiche (T30-T34) - 0/5 completati
- [ ] T30: Creare Pydantic schemas per stats
- [ ] T31: Implementare servizio aggregazione stats
### 📊 Dashboard & Statistiche (T30-T34) - 1/5 completati
- [x] T30: Creare Pydantic schemas per stats - ✅ Completato (2026-04-07 17:45)
- Creato: UsageStatsCreate, UsageStatsResponse, StatsSummary, StatsByModel, StatsByDate, DashboardResponse
- Test: 16 test passanti, 100% coverage su schemas/stats.py
- [ ] T31: Implementare servizio aggregazione stats 🟡 In progress
- [ ] T32: Implementare endpoint GET /api/stats
- [ ] T33: Implementare endpoint GET /api/usage
- [ ] T34: Scrivere test per stats endpoints

View File

@@ -0,0 +1,608 @@
# Prompt di Ingaggio: Dashboard & Statistiche (T30-T34)
## 🎯 MISSIONE
Implementare la fase **Dashboard & Statistiche** del progetto OpenRouter API Key Monitor seguendo rigorosamente TDD.
**Task da completare:** T30, T31, T32, T33, T34
---
## 📋 CONTESTO
**AGENTE:** @tdd-developer
**Repository:** `/home/google/Sources/LucaSacchiNet/openrouter-watcher`
**Stato Attuale:**
- ✅ Setup (T01-T05): 59 test
- ✅ Database & Models (T06-T11): 73 test
- ✅ Security Services (T12-T16): 70 test
- ✅ User Authentication (T17-T22): 34 test
- ✅ Gestione API Keys (T23-T29): 61 test
- 🎯 **Totale: 297 test, ~98% coverage**
**Servizi Pronti:**
- `EncryptionService` - Cifratura/decifratura
- `get_current_user()` - Autenticazione
- `ApiKey`, `UsageStats` models - Dati
- `get_db()` - Database session
**Documentazione:**
- PRD: `/home/google/Sources/LucaSacchiNet/openrouter-watcher/prd.md`
- Architecture: `/home/google/Sources/LucaSacchiNet/openrouter-watcher/export/architecture.md` (sezione 5.2, 7)
---
## 🔧 TASK DA IMPLEMENTARE
### T30: Creare Pydantic Schemas per Statistiche
**File:** `src/openrouter_monitor/schemas/stats.py`
**Requisiti:**
- `UsageStatsCreate`: api_key_id, date, model, requests_count, tokens_input, tokens_output, cost
- `UsageStatsResponse`: id, api_key_id, date, model, requests_count, tokens_input, tokens_output, cost, created_at
- `StatsSummary`: total_requests, total_cost, total_tokens_input, total_tokens_output, avg_cost_per_request
- `StatsByModel`: model, requests_count, cost, percentage
- `StatsByDate`: date, requests_count, cost
- `StatsFilter`: start_date, end_date, api_key_id (optional), model (optional)
- `DashboardResponse`: summary, by_model (list), by_date (list), trends
**Implementazione:**
```python
from pydantic import BaseModel, Field
from datetime import date, datetime
from typing import List, Optional
from decimal import Decimal
class UsageStatsCreate(BaseModel):
api_key_id: int
date: date
model: str = Field(..., min_length=1, max_length=100)
requests_count: int = Field(..., ge=0)
tokens_input: int = Field(..., ge=0)
tokens_output: int = Field(..., ge=0)
cost: Decimal = Field(..., ge=0, decimal_places=6)
class UsageStatsResponse(BaseModel):
id: int
api_key_id: int
date: date
model: str
requests_count: int
tokens_input: int
tokens_output: int
cost: Decimal
created_at: datetime
class Config:
from_attributes = True
class StatsSummary(BaseModel):
total_requests: int
total_cost: Decimal
total_tokens_input: int
total_tokens_output: int
avg_cost_per_request: Decimal
period_days: int
class StatsByModel(BaseModel):
model: str
requests_count: int
cost: Decimal
percentage_requests: float
percentage_cost: float
class StatsByDate(BaseModel):
date: date
requests_count: int
cost: Decimal
class StatsFilter(BaseModel):
start_date: date
end_date: date
api_key_id: Optional[int] = None
model: Optional[str] = None
class DashboardResponse(BaseModel):
summary: StatsSummary
by_model: List[StatsByModel]
by_date: List[StatsByDate]
top_models: List[StatsByModel]
```
**Test:** `tests/unit/schemas/test_stats_schemas.py` (10+ test)
---
### T31: Implementare Servizio Aggregazione Statistiche
**File:** `src/openrouter_monitor/services/stats.py`
**Requisiti:**
- Funzioni per aggregare dati usage_stats:
- `get_summary(db, user_id, start_date, end_date, api_key_id=None) -> StatsSummary`
- `get_by_model(db, user_id, start_date, end_date) -> List[StatsByModel]`
- `get_by_date(db, user_id, start_date, end_date) -> List[StatsByDate]`
- `get_dashboard_data(db, user_id, days=30) -> DashboardResponse`
- Query SQLAlchemy con group_by, sum, avg
- Filtra per user_id attraverso join con ApiKey
- Gestione timezone (UTC)
**Implementazione:**
```python
from sqlalchemy.orm import Session
from sqlalchemy import func, desc, and_
from datetime import date, timedelta
from typing import List, Optional
from decimal import Decimal
from openrouter_monitor.models import UsageStats, ApiKey
from openrouter_monitor.schemas import (
StatsSummary, StatsByModel, StatsByDate,
DashboardResponse, StatsFilter
)
async def get_summary(
db: Session,
user_id: int,
start_date: date,
end_date: date,
api_key_id: Optional[int] = None
) -> StatsSummary:
"""Get summary statistics for user."""
query = db.query(
func.sum(UsageStats.requests_count).label('total_requests'),
func.sum(UsageStats.cost).label('total_cost'),
func.sum(UsageStats.tokens_input).label('total_tokens_input'),
func.sum(UsageStats.tokens_output).label('total_tokens_output'),
func.avg(UsageStats.cost).label('avg_cost')
).join(ApiKey).filter(
ApiKey.user_id == user_id,
UsageStats.date >= start_date,
UsageStats.date <= end_date
)
if api_key_id:
query = query.filter(UsageStats.api_key_id == api_key_id)
result = query.first()
period_days = (end_date - start_date).days + 1
return StatsSummary(
total_requests=result.total_requests or 0,
total_cost=Decimal(str(result.total_cost or 0)),
total_tokens_input=result.total_tokens_input or 0,
total_tokens_output=result.total_tokens_output or 0,
avg_cost_per_request=Decimal(str(result.avg_cost or 0)),
period_days=period_days
)
async def get_by_model(
db: Session,
user_id: int,
start_date: date,
end_date: date
) -> List[StatsByModel]:
"""Get statistics grouped by model."""
results = db.query(
UsageStats.model,
func.sum(UsageStats.requests_count).label('requests_count'),
func.sum(UsageStats.cost).label('cost')
).join(ApiKey).filter(
ApiKey.user_id == user_id,
UsageStats.date >= start_date,
UsageStats.date <= end_date
).group_by(UsageStats.model).order_by(desc('cost')).all()
# Calculate percentages
total_requests = sum(r.requests_count for r in results) or 1
total_cost = sum(r.cost for r in results) or 1
return [
StatsByModel(
model=r.model,
requests_count=r.requests_count,
cost=Decimal(str(r.cost)),
percentage_requests=(r.requests_count / total_requests) * 100,
percentage_cost=(r.cost / total_cost) * 100
)
for r in results
]
async def get_by_date(
db: Session,
user_id: int,
start_date: date,
end_date: date
) -> List[StatsByDate]:
"""Get statistics grouped by date."""
results = db.query(
UsageStats.date,
func.sum(UsageStats.requests_count).label('requests_count'),
func.sum(UsageStats.cost).label('cost')
).join(ApiKey).filter(
ApiKey.user_id == user_id,
UsageStats.date >= start_date,
UsageStats.date <= end_date
).group_by(UsageStats.date).order_by(UsageStats.date).all()
return [
StatsByDate(
date=r.date,
requests_count=r.requests_count,
cost=Decimal(str(r.cost))
)
for r in results
]
async def get_dashboard_data(
db: Session,
user_id: int,
days: int = 30
) -> DashboardResponse:
"""Get complete dashboard data."""
end_date = date.today()
start_date = end_date - timedelta(days=days-1)
summary = await get_summary(db, user_id, start_date, end_date)
by_model = await get_by_model(db, user_id, start_date, end_date)
by_date = await get_by_date(db, user_id, start_date, end_date)
return DashboardResponse(
summary=summary,
by_model=by_model,
by_date=by_date,
top_models=by_model[:5] # Top 5 models
)
```
**Test:** `tests/unit/services/test_stats.py` (15+ test)
---
### T32: Implementare Endpoint GET /api/stats (Dashboard)
**File:** `src/openrouter_monitor/routers/stats.py`
**Requisiti:**
- Endpoint: `GET /api/stats`
- Auth: Richiede `current_user`
- Query params: days (default 30, max 365)
- Ritorna: `DashboardResponse`
- Usa servizio `get_dashboard_data()`
**Implementazione:**
```python
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from datetime import date
from openrouter_monitor.database import get_db
from openrouter_monitor.dependencies import get_current_user
from openrouter_monitor.models import User
from openrouter_monitor.schemas import DashboardResponse
from openrouter_monitor.services.stats import get_dashboard_data
router = APIRouter(prefix="/api/stats", tags=["stats"])
@router.get("/dashboard", response_model=DashboardResponse)
async def get_dashboard(
days: int = Query(default=30, ge=1, le=365),
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Get dashboard statistics for current user.
Returns summary, usage by model, usage by date for the specified period.
"""
return await get_dashboard_data(db, current_user.id, days)
```
**Test:**
- Test dashboard default 30 giorni
- Test dashboard con days custom
- Test dashboard limitato a 365 giorni
- Test senza autenticazione (401)
---
### T33: Implementare Endpoint GET /api/usage (Dettaglio)
**File:** `src/openrouter_monitor/routers/stats.py`
**Requisiti:**
- Endpoint: `GET /api/usage`
- Auth: Richiede `current_user`
- Query params:
- start_date (required)
- end_date (required)
- api_key_id (optional)
- model (optional)
- skip (default 0)
- limit (default 100, max 1000)
- Ritorna: lista `UsageStatsResponse` con paginazione
- Ordinamento: date DESC, poi model
**Implementazione:**
```python
from fastapi import Query
from typing import List, Optional
@router.get("/usage", response_model=List[UsageStatsResponse])
async def get_usage_details(
start_date: date,
end_date: date,
api_key_id: Optional[int] = None,
model: Optional[str] = None,
skip: int = Query(default=0, ge=0),
limit: int = Query(default=100, ge=1, le=1000),
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Get detailed usage statistics with filtering and pagination.
Returns raw usage data aggregated by date and model.
"""
from sqlalchemy import and_
query = db.query(UsageStats).join(ApiKey).filter(
ApiKey.user_id == current_user.id,
UsageStats.date >= start_date,
UsageStats.date <= end_date
)
if api_key_id:
query = query.filter(UsageStats.api_key_id == api_key_id)
if model:
query = query.filter(UsageStats.model == model)
usage = query.order_by(
UsageStats.date.desc(),
UsageStats.model
).offset(skip).limit(limit).all()
return usage
```
**Test:**
- Test filtro per date
- Test filtro per api_key_id
- Test filtro per model
- Test paginazione (skip, limit)
- Test combinazione filtri
---
### T34: Scrivere Test per Stats Endpoints
**File:** `tests/unit/routers/test_stats.py`
**Requisiti:**
- Test integrazione per dashboard e usage endpoints
- Mock dati usage_stats per test consistenti
- Test coverage >= 90%
**Test da implementare:**
- **Dashboard Tests:**
- GET /api/stats/dashboard default 30 giorni
- GET /api/stats/dashboard con days param
- GET /api/stats/dashboard dati corretti
- GET /api/stats/dashboard top models
- **Usage Tests:**
- GET /api/usage filtro date
- GET /api/usage filtro api_key_id
- GET /api/usage filtro model
- GET /api/usage paginazione
- **Security Tests:**
- Utente A non vede usage di utente B
- Filtro api_key_id di altro utente ritorna vuoto
- Senza autenticazione (401)
---
## 🔄 WORKFLOW TDD
Per **OGNI** task:
1. **RED**: Scrivi test che fallisce (prima del codice!)
2. **GREEN**: Implementa codice minimo per passare il test
3. **REFACTOR**: Migliora codice, test rimangono verdi
---
## 📁 STRUTTURA FILE DA CREARE
```
src/openrouter_monitor/
├── schemas/
│ ├── __init__.py # Aggiungi export stats schemas
│ └── stats.py # T30
├── routers/
│ ├── __init__.py # Aggiungi stats router
│ └── stats.py # T32, T33
├── services/
│ ├── __init__.py # Aggiungi export stats
│ └── stats.py # T31
└── main.py # Registra stats router
tests/unit/
├── schemas/
│ └── test_stats_schemas.py # T30 + T34
├── services/
│ └── test_stats.py # T31 + T34
└── routers/
└── test_stats.py # T32, T33 + T34
```
---
## 🧪 ESEMPI TEST
### Test Schema
```python
def test_stats_summary_calculates_correctly():
summary = StatsSummary(
total_requests=1000,
total_cost=Decimal("125.50"),
total_tokens_input=50000,
total_tokens_output=20000,
avg_cost_per_request=Decimal("0.1255"),
period_days=30
)
assert summary.total_requests == 1000
assert summary.total_cost == Decimal("125.50")
```
### Test Servizio
```python
@pytest.mark.asyncio
async def test_get_summary_returns_correct_totals(db_session, test_user, sample_usage_stats):
summary = await get_summary(
db_session,
test_user.id,
date(2024, 1, 1),
date(2024, 1, 31)
)
assert summary.total_requests > 0
assert summary.total_cost > 0
```
### Test Endpoint
```python
def test_dashboard_returns_summary_and_charts(client, auth_token, db_session):
response = client.get(
"/api/stats/dashboard",
headers={"Authorization": f"Bearer {auth_token}"}
)
assert response.status_code == 200
data = response.json()
assert "summary" in data
assert "by_model" in data
assert "by_date" in data
```
---
## ✅ CRITERI DI ACCETTAZIONE
- [ ] T30: Schemas stats con validazione completa
- [ ] T31: Servizio aggregazione con query SQLAlchemy
- [ ] T32: Endpoint /api/stats/dashboard con parametri
- [ ] T33: Endpoint /api/usage con filtri e paginazione
- [ ] T34: Test completi coverage >= 90%
- [ ] Tutti i test passano: `pytest tests/unit/ -v`
- [ ] Utenti vedono solo proprie statistiche
- [ ] Aggregazioni corrette (sum, avg, group_by)
- [ ] 5 commit atomici con conventional commits
- [ ] progress.md aggiornato
---
## 📝 COMMIT MESSAGES
```
feat(schemas): T30 add Pydantic statistics schemas
feat(services): T31 implement statistics aggregation service
feat(stats): T32 implement dashboard endpoint
feat(stats): T33 implement usage details endpoint with filters
test(stats): T34 add comprehensive statistics endpoint tests
```
---
## 🚀 VERIFICA FINALE
```bash
cd /home/google/Sources/LucaSacchiNet/openrouter-watcher
# Test schemas
pytest tests/unit/schemas/test_stats_schemas.py -v
# Test services
pytest tests/unit/services/test_stats.py -v --cov=src/openrouter_monitor/services
# Test routers
pytest tests/unit/routers/test_stats.py -v --cov=src/openrouter_monitor/routers
# Test completo
pytest tests/unit/ -v --cov=src/openrouter_monitor
```
---
## 📊 ESEMPI RISPOSTE API
### Dashboard Response
```json
{
"summary": {
"total_requests": 15234,
"total_cost": "125.50",
"total_tokens_input": 450000,
"total_tokens_output": 180000,
"avg_cost_per_request": "0.0082",
"period_days": 30
},
"by_model": [
{
"model": "anthropic/claude-3-opus",
"requests_count": 5234,
"cost": "89.30",
"percentage_requests": 34.3,
"percentage_cost": 71.2
}
],
"by_date": [
{
"date": "2024-01-15",
"requests_count": 523,
"cost": "4.23"
}
],
"top_models": [...]
}
```
### Usage Response
```json
[
{
"id": 1,
"api_key_id": 1,
"date": "2024-01-15",
"model": "anthropic/claude-3-opus",
"requests_count": 234,
"tokens_input": 45000,
"tokens_output": 12000,
"cost": "8.92",
"created_at": "2024-01-15T12:00:00Z"
}
]
```
---
## 📝 NOTE IMPORTANTI
- **Path assoluti**: Usa sempre `/home/google/Sources/LucaSacchiNet/openrouter-watcher/`
- **Timezone**: Usa UTC per tutte le date
- **Decimal**: Usa Decimal per costi (precisione 6 decimali)
- **Performance**: Query con indici (date, api_key_id, model)
- **Isolation**: Utenti vedono solo proprie statistiche (filtro user_id via ApiKey join)
- **Limiti**: Max 365 giorni per dashboard, max 1000 risultati per usage
---
**AGENTE:** @tdd-developer
**INIZIA CON:** T30 - Pydantic statistics schemas
**QUANDO FINITO:** Conferma completamento, coverage >= 90%, aggiorna progress.md

View File

@@ -12,6 +12,14 @@ from openrouter_monitor.schemas.auth import (
UserRegister,
UserResponse,
)
from openrouter_monitor.schemas.stats import (
DashboardResponse,
StatsByDate,
StatsByModel,
StatsSummary,
UsageStatsCreate,
UsageStatsResponse,
)
__all__ = [
"UserRegister",
@@ -23,4 +31,10 @@ __all__ = [
"ApiKeyUpdate",
"ApiKeyResponse",
"ApiKeyListResponse",
"UsageStatsCreate",
"UsageStatsResponse",
"StatsSummary",
"StatsByModel",
"StatsByDate",
"DashboardResponse",
]

View File

@@ -0,0 +1,279 @@
"""Statistics Pydantic schemas for OpenRouter API Key Monitor.
T30: Pydantic schemas for statistics management.
"""
import datetime
from decimal import Decimal
from typing import List
from pydantic import BaseModel, ConfigDict, Field
class UsageStatsCreate(BaseModel):
"""Schema for creating usage statistics.
Attributes:
api_key_id: Foreign key to api_keys table
date: Date of the statistics
model: AI model name
requests_count: Number of requests (default 0)
tokens_input: Number of input tokens (default 0)
tokens_output: Number of output tokens (default 0)
cost: Cost in USD (default 0)
"""
api_key_id: int = Field(
...,
description="Foreign key to api_keys table",
examples=[1]
)
date: datetime.date = Field(
...,
description="Date of the statistics",
examples=["2024-01-15"]
)
model: str = Field(
...,
min_length=1,
max_length=100,
description="AI model name",
examples=["gpt-4"]
)
requests_count: int = Field(
default=0,
ge=0,
description="Number of requests",
examples=[100]
)
tokens_input: int = Field(
default=0,
ge=0,
description="Number of input tokens",
examples=[5000]
)
tokens_output: int = Field(
default=0,
ge=0,
description="Number of output tokens",
examples=[3000]
)
cost: Decimal = Field(
default=Decimal("0"),
ge=0,
description="Cost in USD",
examples=["0.123456"]
)
class UsageStatsResponse(BaseModel):
"""Schema for usage statistics response (returned to client).
Attributes:
id: Primary key
api_key_id: Foreign key to api_keys table
date: Date of the statistics
model: AI model name
requests_count: Number of requests
tokens_input: Number of input tokens
tokens_output: Number of output tokens
cost: Cost in USD
created_at: Timestamp when record was created
"""
model_config = ConfigDict(from_attributes=True)
id: int = Field(
...,
description="Primary key",
examples=[1]
)
api_key_id: int = Field(
...,
description="Foreign key to api_keys table",
examples=[2]
)
date: datetime.date = Field(
...,
description="Date of the statistics",
examples=["2024-01-15"]
)
model: str = Field(
...,
description="AI model name",
examples=["gpt-4"]
)
requests_count: int = Field(
...,
description="Number of requests",
examples=[100]
)
tokens_input: int = Field(
...,
description="Number of input tokens",
examples=[5000]
)
tokens_output: int = Field(
...,
description="Number of output tokens",
examples=[3000]
)
cost: Decimal = Field(
...,
description="Cost in USD",
examples=["0.123456"]
)
created_at: datetime.datetime = Field(
...,
description="Timestamp when record was created",
examples=["2024-01-15T12:00:00"]
)
class StatsSummary(BaseModel):
"""Schema for aggregated statistics summary.
Attributes:
total_requests: Total number of requests
total_cost: Total cost in USD
total_tokens_input: Total input tokens
total_tokens_output: Total output tokens
avg_cost_per_request: Average cost per request
period_days: Number of days in the period
"""
total_requests: int = Field(
...,
ge=0,
description="Total number of requests",
examples=[1000]
)
total_cost: Decimal = Field(
...,
ge=0,
description="Total cost in USD",
examples=["5.678901"]
)
total_tokens_input: int = Field(
default=0,
ge=0,
description="Total input tokens",
examples=[50000]
)
total_tokens_output: int = Field(
default=0,
ge=0,
description="Total output tokens",
examples=[30000]
)
avg_cost_per_request: Decimal = Field(
default=Decimal("0"),
ge=0,
description="Average cost per request",
examples=["0.005679"]
)
period_days: int = Field(
default=0,
ge=0,
description="Number of days in the period",
examples=[30]
)
class StatsByModel(BaseModel):
"""Schema for statistics grouped by model.
Attributes:
model: AI model name
requests_count: Number of requests for this model
cost: Total cost for this model
percentage_requests: Percentage of total requests
percentage_cost: Percentage of total cost
"""
model: str = Field(
...,
description="AI model name",
examples=["gpt-4"]
)
requests_count: int = Field(
...,
ge=0,
description="Number of requests for this model",
examples=[500]
)
cost: Decimal = Field(
...,
ge=0,
description="Total cost for this model",
examples=["3.456789"]
)
percentage_requests: float = Field(
default=0.0,
ge=0,
le=100,
description="Percentage of total requests",
examples=[50.0]
)
percentage_cost: float = Field(
default=0.0,
ge=0,
le=100,
description="Percentage of total cost",
examples=[60.5]
)
class StatsByDate(BaseModel):
"""Schema for statistics grouped by date.
Attributes:
date: Date of the statistics
requests_count: Number of requests on this date
cost: Total cost on this date
"""
date: datetime.date = Field(
...,
description="Date of the statistics",
examples=["2024-01-15"]
)
requests_count: int = Field(
...,
ge=0,
description="Number of requests on this date",
examples=[100]
)
cost: Decimal = Field(
...,
ge=0,
description="Total cost on this date",
examples=["0.567890"]
)
class DashboardResponse(BaseModel):
"""Schema for complete dashboard response.
Attributes:
summary: Aggregated statistics summary
by_model: Statistics grouped by model
by_date: Statistics grouped by date
top_models: List of top used models
"""
summary: StatsSummary = Field(
...,
description="Aggregated statistics summary"
)
by_model: List[StatsByModel] = Field(
...,
description="Statistics grouped by model"
)
by_date: List[StatsByDate] = Field(
...,
description="Statistics grouped by date"
)
top_models: List[str] = Field(
default_factory=list,
description="List of top used models"
)

View File

@@ -0,0 +1,324 @@
"""Tests for statistics Pydantic schemas.
T30: Tests for stats schemas - RED phase (test fails before implementation)
"""
from datetime import date, datetime
from decimal import Decimal
import pytest
from pydantic import ValidationError
from openrouter_monitor.schemas.stats import (
DashboardResponse,
StatsByDate,
StatsByModel,
StatsSummary,
UsageStatsCreate,
UsageStatsResponse,
)
class TestUsageStatsCreate:
"""Tests for UsageStatsCreate schema."""
def test_create_with_valid_data(self):
"""Test creating UsageStatsCreate with valid data."""
data = {
"api_key_id": 1,
"date": date(2024, 1, 15),
"model": "gpt-4",
"requests_count": 100,
"tokens_input": 5000,
"tokens_output": 3000,
"cost": Decimal("0.123456"),
}
result = UsageStatsCreate(**data)
assert result.api_key_id == 1
assert result.date == date(2024, 1, 15)
assert result.model == "gpt-4"
assert result.requests_count == 100
assert result.tokens_input == 5000
assert result.tokens_output == 3000
assert result.cost == Decimal("0.123456")
def test_create_with_minimal_data(self):
"""Test creating UsageStatsCreate with minimal required data."""
data = {
"api_key_id": 1,
"date": date(2024, 1, 15),
"model": "gpt-3.5-turbo",
}
result = UsageStatsCreate(**data)
assert result.api_key_id == 1
assert result.date == date(2024, 1, 15)
assert result.model == "gpt-3.5-turbo"
assert result.requests_count == 0 # default
assert result.tokens_input == 0 # default
assert result.tokens_output == 0 # default
assert result.cost == Decimal("0") # default
def test_create_with_string_date(self):
"""Test creating UsageStatsCreate with date as string."""
data = {
"api_key_id": 1,
"date": "2024-01-15",
"model": "claude-3",
}
result = UsageStatsCreate(**data)
assert result.date == date(2024, 1, 15)
def test_create_missing_required_fields(self):
"""Test that missing required fields raise ValidationError."""
with pytest.raises(ValidationError) as exc_info:
UsageStatsCreate()
errors = exc_info.value.errors()
# Pydantic v2 uses 'loc' (location) instead of 'field'
assert any("api_key_id" in e["loc"] for e in errors)
assert any("date" in e["loc"] for e in errors)
assert any("model" in e["loc"] for e in errors)
def test_create_empty_model_raises_error(self):
"""Test that empty model raises ValidationError."""
with pytest.raises(ValidationError) as exc_info:
UsageStatsCreate(
api_key_id=1,
date=date(2024, 1, 15),
model="",
)
assert "model" in str(exc_info.value)
class TestUsageStatsResponse:
"""Tests for UsageStatsResponse schema with orm_mode."""
def test_response_with_all_fields(self):
"""Test UsageStatsResponse with all fields."""
data = {
"id": 1,
"api_key_id": 2,
"date": date(2024, 1, 15),
"model": "gpt-4",
"requests_count": 100,
"tokens_input": 5000,
"tokens_output": 3000,
"cost": Decimal("0.123456"),
"created_at": datetime(2024, 1, 15, 12, 0, 0),
}
result = UsageStatsResponse(**data)
assert result.id == 1
assert result.api_key_id == 2
assert result.model == "gpt-4"
assert result.cost == Decimal("0.123456")
def test_response_from_attributes(self):
"""Test UsageStatsResponse with from_attributes=True (orm_mode)."""
# Simulate SQLAlchemy model object
class MockUsageStats:
id = 1
api_key_id = 2
date = date(2024, 1, 15)
model = "gpt-4"
requests_count = 100
tokens_input = 5000
tokens_output = 3000
cost = Decimal("0.123456")
created_at = datetime(2024, 1, 15, 12, 0, 0)
result = UsageStatsResponse.model_validate(MockUsageStats())
assert result.id == 1
assert result.model == "gpt-4"
class TestStatsSummary:
"""Tests for StatsSummary schema."""
def test_summary_with_all_fields(self):
"""Test StatsSummary with all aggregation fields."""
data = {
"total_requests": 1000,
"total_cost": Decimal("5.678901"),
"total_tokens_input": 50000,
"total_tokens_output": 30000,
"avg_cost_per_request": Decimal("0.005679"),
"period_days": 30,
}
result = StatsSummary(**data)
assert result.total_requests == 1000
assert result.total_cost == Decimal("5.678901")
assert result.total_tokens_input == 50000
assert result.total_tokens_output == 30000
assert result.avg_cost_per_request == Decimal("0.005679")
assert result.period_days == 30
def test_summary_defaults(self):
"""Test StatsSummary default values."""
data = {
"total_requests": 100,
"total_cost": Decimal("1.00"),
}
result = StatsSummary(**data)
assert result.total_tokens_input == 0
assert result.total_tokens_output == 0
assert result.avg_cost_per_request == Decimal("0")
assert result.period_days == 0
class TestStatsByModel:
"""Tests for StatsByModel schema."""
def test_stats_by_model_with_all_fields(self):
"""Test StatsByModel with all fields."""
data = {
"model": "gpt-4",
"requests_count": 500,
"cost": Decimal("3.456789"),
"percentage_requests": 50.0,
"percentage_cost": 60.5,
}
result = StatsByModel(**data)
assert result.model == "gpt-4"
assert result.requests_count == 500
assert result.cost == Decimal("3.456789")
assert result.percentage_requests == 50.0
assert result.percentage_cost == 60.5
def test_stats_by_model_defaults(self):
"""Test StatsByModel default values for percentages."""
data = {
"model": "gpt-3.5-turbo",
"requests_count": 200,
"cost": Decimal("0.50"),
}
result = StatsByModel(**data)
assert result.percentage_requests == 0.0
assert result.percentage_cost == 0.0
class TestStatsByDate:
"""Tests for StatsByDate schema."""
def test_stats_by_date_with_all_fields(self):
"""Test StatsByDate with all fields."""
data = {
"date": date(2024, 1, 15),
"requests_count": 100,
"cost": Decimal("0.567890"),
}
result = StatsByDate(**data)
assert result.date == date(2024, 1, 15)
assert result.requests_count == 100
assert result.cost == Decimal("0.567890")
def test_stats_by_date_with_string_date(self):
"""Test StatsByDate with date as string."""
data = {
"date": "2024-12-25",
"requests_count": 50,
"cost": Decimal("0.25"),
}
result = StatsByDate(**data)
assert result.date == date(2024, 12, 25)
class TestDashboardResponse:
"""Tests for DashboardResponse schema."""
def test_dashboard_response_complete(self):
"""Test DashboardResponse with complete data."""
summary = StatsSummary(
total_requests=1000,
total_cost=Decimal("5.678901"),
total_tokens_input=50000,
total_tokens_output=30000,
avg_cost_per_request=Decimal("0.005679"),
period_days=30,
)
by_model = [
StatsByModel(
model="gpt-4",
requests_count=500,
cost=Decimal("3.456789"),
percentage_requests=50.0,
percentage_cost=60.5,
),
StatsByModel(
model="gpt-3.5-turbo",
requests_count=500,
cost=Decimal("2.222112"),
percentage_requests=50.0,
percentage_cost=39.5,
),
]
by_date = [
StatsByDate(date=date(2024, 1, 1), requests_count=50, cost=Decimal("0.25")),
StatsByDate(date=date(2024, 1, 2), requests_count=75, cost=Decimal("0.375")),
]
top_models = ["gpt-4", "gpt-3.5-turbo"]
result = DashboardResponse(
summary=summary,
by_model=by_model,
by_date=by_date,
top_models=top_models,
)
assert result.summary.total_requests == 1000
assert len(result.by_model) == 2
assert len(result.by_date) == 2
assert result.top_models == ["gpt-4", "gpt-3.5-turbo"]
def test_dashboard_response_empty_lists(self):
"""Test DashboardResponse with empty lists."""
summary = StatsSummary(
total_requests=0,
total_cost=Decimal("0"),
)
result = DashboardResponse(
summary=summary,
by_model=[],
by_date=[],
top_models=[],
)
assert result.by_model == []
assert result.by_date == []
assert result.top_models == []
def test_dashboard_response_missing_top_models(self):
"""Test DashboardResponse without top_models (optional)."""
summary = StatsSummary(total_requests=100, total_cost=Decimal("1.00"))
result = DashboardResponse(
summary=summary,
by_model=[],
by_date=[],
)
assert result.top_models == []