feat(schemas): T17 add Pydantic auth schemas

Add authentication schemas for user registration and login:
- UserRegister: email, password (with strength validation), password_confirm
- UserLogin: email, password
- UserResponse: id, email, created_at, is_active (orm_mode=True)
- TokenResponse: access_token, token_type, expires_in
- TokenData: user_id, exp

Includes field validators for password strength and password confirmation matching.

Test coverage: 19 tests for all schemas
This commit is contained in:
Luca Sacchi Ricciardi
2026-04-07 13:52:33 +02:00
parent a698d09a77
commit 02473bc39e
7 changed files with 1341 additions and 5 deletions

29
docs/githistory.md Normal file
View File

@@ -0,0 +1,29 @@
## 2026-04-07: Security Services Implementation (T12-T16)
### Commits
- `2fdd9d1` feat(security): T12 implement AES-256 encryption service
- `54e8116` feat(security): T13 implement bcrypt password hashing
- `781e564` feat(security): T14 implement JWT utilities
- `649ff76` feat(security): T15 implement API token generation
- `a698d09` feat(security): T16 finalize security services exports
### Contenuto
Implementazione completa dei servizi di sicurezza con TDD:
- EncryptionService (AES-256-GCM con PBKDF2HMAC)
- Password hashing (bcrypt 12 rounds) con strength validation
- JWT utilities (HS256) con create/decode/verify
- API token generation (SHA-256) con timing-safe comparison
### Statistiche
- 70 test passanti
- 100% coverage su tutti i moduli security
- 5 commit atomici seguendo conventional commits
### Note
Tutti i test sono stati scritti prima del codice (TDD puro).
Ogni servizio ha test per casi di successo, errori, e edge cases.

View File

@@ -9,11 +9,11 @@
| Metrica | Valore | | Metrica | Valore |
|---------|--------| |---------|--------|
| **Stato** | 🟢 Security Services Completati | | **Stato** | 🟢 Security Services Completati |
| **Progresso** | 20% | | **Progresso** | 23% |
| **Data Inizio** | 2024-04-07 | | **Data Inizio** | 2024-04-07 |
| **Data Target** | TBD | | **Data Target** | TBD |
| **Task Totali** | 74 | | **Task Totali** | 74 |
| **Task Completati** | 16 | | **Task Completati** | 17 |
| **Task In Progress** | 0 | | **Task In Progress** | 0 |
--- ---
@@ -63,9 +63,9 @@
**Test totali servizi:** 71 test passanti **Test totali servizi:** 71 test passanti
**Coverage servizi:** 100% **Coverage servizi:** 100%
### 👤 Autenticazione Utenti (T17-T22) - 0/6 completati ### 👤 Autenticazione Utenti (T17-T22) - 1/6 completati
- [ ] T17: Creare Pydantic schemas auth (register/login) - [x] T17: Creare Pydantic schemas auth (register/login) - ✅ Completato (2026-04-07 14:30)
- [ ] T18: Implementare endpoint POST /api/auth/register - [ ] T18: Implementare endpoint POST /api/auth/register - 🟡 In progress
- [ ] T19: Implementare endpoint POST /api/auth/login - [ ] T19: Implementare endpoint POST /api/auth/login
- [ ] T20: Implementare endpoint POST /api/auth/logout - [ ] T20: Implementare endpoint POST /api/auth/logout
- [ ] T21: Creare dipendenza get_current_user - [ ] T21: Creare dipendenza get_current_user

View File

@@ -0,0 +1,571 @@
# Prompt: User Authentication Implementation (T17-T22)
## 🎯 OBIETTIVO
Implementare la fase **User Authentication** del progetto OpenRouter API Key Monitor seguendo rigorosamente TDD (Test-Driven Development).
**Task da completare:** T17, T18, T19, T20, T21, T22
---
## 📋 CONTESTO
- **Repository:** `/home/google/Sources/LucaSacchiNet/openrouter-watcher`
- **Specifiche:** `/home/google/Sources/LucaSacchiNet/openrouter-watcher/export/architecture.md` (sezioni 4, 5)
- **Kanban:** `/home/google/Sources/LucaSacchiNet/openrouter-watcher/export/kanban.md`
- **Stato Attuale:** Security Services completati (T01-T16), 202 test passanti
- **Progresso:** 22% (16/74 task)
- **Servizi Pronti:** Encryption, Password hashing, JWT, API Token
---
## 🔧 TASK DETTAGLIATI
### T17: Creare Pydantic Schemas per Autenticazione
**Requisiti:**
- Creare `src/openrouter_monitor/schemas/auth.py`
- Schemas per request/response:
- `UserRegister`: email, password, password_confirm
- `UserLogin`: email, password
- `UserResponse`: id, email, created_at, is_active
- `TokenResponse`: access_token, token_type, expires_in
- `TokenData`: user_id (sub), exp
- Validazione password strength (richiama `validate_password_strength`)
- Validazione email formato valido
- Validazione password e password_confirm coincidono
**Implementazione:**
```python
from pydantic import BaseModel, EmailStr, Field, validator, root_validator
class UserRegister(BaseModel):
email: EmailStr
password: str = Field(..., min_length=12, max_length=128)
password_confirm: str
@validator('password')
def password_strength(cls, v):
from openrouter_monitor.services.password import validate_password_strength
if not validate_password_strength(v):
raise ValueError('Password does not meet strength requirements')
return v
@root_validator
def passwords_match(cls, values):
if values.get('password') != values.get('password_confirm'):
raise ValueError('Passwords do not match')
return values
class UserLogin(BaseModel):
email: EmailStr
password: str
class UserResponse(BaseModel):
id: int
email: str
created_at: datetime
is_active: bool
class Config:
orm_mode = True
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
expires_in: int
class TokenData(BaseModel):
user_id: int | None = None
```
**Test richiesti:**
- Test UserRegister valido
- Test UserRegister password troppo corta
- Test UserRegister password e confirm non coincidono
- Test UserRegister email invalida
- Test UserLogin valido
- Test UserResponse orm_mode
---
### T18: Implementare Endpoint POST /api/auth/register
**Requisiti:**
- Creare `src/openrouter_monitor/routers/auth.py`
- Endpoint: `POST /api/auth/register`
- Riceve `UserRegister` schema
- Verifica email non esista già nel DB
- Hash password con `hash_password()`
- Crea utente nel database
- Ritorna `UserResponse` con status 201
- Gestire errori: email esistente (400), validazione fallita (422)
**Implementazione:**
```python
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
router = APIRouter(prefix="/api/auth", tags=["auth"])
@router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def register(user_data: UserRegister, db: Session = Depends(get_db)):
# Verifica email esistente
existing = db.query(User).filter(User.email == user_data.email).first()
if existing:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Crea utente
hashed_password = hash_password(user_data.password)
user = User(email=user_data.email, password_hash=hashed_password)
db.add(user)
db.commit()
db.refresh(user)
return user
```
**Test richiesti:**
- Test registrazione nuovo utente successo
- Test registrazione email esistente fallisce
- Test registrazione password debole fallisce
- Test registrazione email invalida fallisce
---
### T19: Implementare Endpoint POST /api/auth/login
**Requisiti:**
- Endpoint: `POST /api/auth/login`
- Riceve `UserLogin` schema
- Verifica esistenza utente per email
- Verifica password con `verify_password()`
- Genera JWT con `create_access_token()`
- Ritorna `TokenResponse` con access_token
- Gestire errori: credenziali invalide (401)
**Implementazione:**
```python
@router.post("/login", response_model=TokenResponse)
async def login(credentials: UserLogin, db: Session = Depends(get_db)):
# Trova utente
user = db.query(User).filter(User.email == credentials.email).first()
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials"
)
# Verifica password
if not verify_password(credentials.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials"
)
# Genera JWT
access_token = create_access_token(data={"sub": str(user.id)})
return TokenResponse(
access_token=access_token,
expires_in=3600 # 1 ora
)
```
**Test richiesti:**
- Test login con credenziali valide successo
- Test login con email inesistente fallisce
- Test login con password sbagliata fallisce
- Test login utente disattivato fallisce
- Test JWT contiene user_id corretto
---
### T20: Implementare Endpoint POST /api/auth/logout
**Requisiti:**
- Endpoint: `POST /api/auth/logout`
- Richiede autenticazione (JWT valido)
- In una implementazione JWT stateless, logout è gestito lato client
- Aggiungere token a blacklist (opzionale per MVP)
- Ritorna 200 con messaggio di successo
**Implementazione:**
```python
@router.post("/logout")
async def logout(current_user: User = Depends(get_current_user)):
# In JWT stateless, il logout è gestito rimuovendo il token lato client
# Per implementazione con blacklist, aggiungere token a lista nera
return {"message": "Successfully logged out"}
```
**Test richiesti:**
- Test logout con token valido successo
- Test logout senza token fallisce (401)
- Test logout con token invalido fallisce (401)
---
### T21: Implementare Dipendenza get_current_user
**Requisiti:**
- Creare `src/openrouter_monitor/dependencies/auth.py`
- Implementare `get_current_user()` per FastAPI dependency injection
- Estrae JWT da header Authorization (Bearer token)
- Verifica token con `verify_token()` o `decode_access_token()`
- Recupera utente dal DB per user_id nel token
- Verifica utente esista e sia attivo
- Gestire errori: token mancante, invalido, scaduto, utente non trovato
**Implementazione:**
```python
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
) -> User:
token = credentials.credentials
try:
payload = decode_access_token(token)
user_id = int(payload.get("sub"))
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token payload"
)
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token"
)
user = db.query(User).filter(User.id == user_id).first()
if user is None or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found or inactive"
)
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)) -> User:
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Inactive user"
)
return current_user
```
**Test richiesti:**
- Test get_current_user con token valido ritorna utente
- Test get_current_user senza token fallisce
- Test get_current_user con token scaduto fallisce
- Test get_current_user con token invalido fallisce
- Test get_current_user utente non esiste fallisce
- Test get_current_user utente inattivo fallisce
---
### T22: Scrivere Test per Auth Endpoints
**Requisiti:**
- Creare `tests/unit/routers/test_auth.py`
- Test integrazione per tutti gli endpoint auth
- Test con TestClient di FastAPI
- Mock database per test isolati
- Coverage >= 90%
**Test richiesti:**
- **Register Tests:**
- POST /api/auth/register successo (201)
- POST /api/auth/register email duplicata (400)
- POST /api/auth/register password debole (422)
- **Login Tests:**
- POST /api/auth/login successo (200 + token)
- POST /api/auth/login credenziali invalide (401)
- POST /api/auth/login utente inattivo (401)
- **Logout Tests:**
- POST /api/auth/logout successo (200)
- POST /api/auth/logout senza token (401)
- **get_current_user Tests:**
- Accesso protetto con token valido
- Accesso protetto senza token (401)
- Accesso protetto token scaduto (401)
---
## 🔄 WORKFLOW TDD OBBLIGATORIO
Per OGNI task (T17-T22):
```
┌─────────────────────────────────────────┐
│ 1. RED - Scrivi il test che fallisce │
│ • Test prima del codice │
│ • Pattern AAA (Arrange-Act-Assert) │
│ • Nomi descrittivi │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 2. GREEN - Implementa codice minimo │
│ • Solo codice necessario per test │
│ • Nessun refactoring ancora │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 3. REFACTOR - Migliora il codice │
│ • Pulisci duplicazioni │
│ • Migliora nomi variabili │
│ • Test rimangono verdi │
└─────────────────────────────────────────┘
```
---
## 📁 STRUTTURA FILE DA CREARE
```
src/openrouter_monitor/
├── schemas/
│ ├── __init__.py # Esporta tutti gli schemas
│ └── auth.py # T17 - Auth schemas
├── routers/
│ ├── __init__.py # Include auth router
│ └── auth.py # T18, T19, T20 - Auth endpoints
└── dependencies/
├── __init__.py
└── auth.py # T21 - get_current_user
tests/unit/
├── schemas/
│ ├── __init__.py
│ └── test_auth_schemas.py # T17 + T22
└── routers/
├── __init__.py
└── test_auth.py # T18, T19, T20, T21 + T22
```
---
## 🧪 REQUISITI TEST
### Pattern AAA (Arrange-Act-Assert)
```python
@pytest.mark.asyncio
async def test_register_new_user_returns_201_and_user_data():
# Arrange
user_data = {
"email": "test@example.com",
"password": "SecurePass123!",
"password_confirm": "SecurePass123!"
}
# Act
response = client.post("/api/auth/register", json=user_data)
# Assert
assert response.status_code == 201
assert response.json()["email"] == user_data["email"]
assert "id" in response.json()
```
### Marker Pytest
```python
@pytest.mark.unit # Logica pura
@pytest.mark.integration # Con database
@pytest.mark.asyncio # Funzioni async
@pytest.mark.auth # Test autenticazione
```
### Fixtures Condivise
```python
@pytest.fixture
def test_user(db_session):
"""Create test user in database."""
user = User(
email="test@example.com",
password_hash=hash_password("SecurePass123!")
)
db_session.add(user)
db_session.commit()
return user
@pytest.fixture
def auth_token(test_user):
"""Generate valid JWT for test user."""
return create_access_token(data={"sub": str(test_user.id)})
@pytest.fixture
def authorized_client(client, auth_token):
"""Client with authorization header."""
client.headers["Authorization"] = f"Bearer {auth_token}"
return client
```
---
## 🛡️ VINCOLI TECNICI
### Pydantic Schemas Requirements
```python
# Validazione password strength
def validate_password_strength(cls, v):
from openrouter_monitor.services.password import validate_password_strength
if not validate_password_strength(v):
raise ValueError(
'Password must be at least 12 characters with uppercase, '
'lowercase, digit, and special character'
)
return v
# Validazione passwords match
@root_validator
def passwords_match(cls, values):
if values.get('password') != values.get('password_confirm'):
raise ValueError('Passwords do not match')
return values
```
### Router Requirements
```python
from fastapi import APIRouter, Depends, HTTPException, status
router = APIRouter(prefix="/api/auth", tags=["auth"])
@router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def register(...):
...
@router.post("/login", response_model=TokenResponse)
async def login(...):
...
@router.post("/logout")
async def logout(...):
...
```
### Dependency Requirements
```python
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
) -> User:
"""Extract and validate JWT, return current user."""
...
```
---
## 📊 AGGIORNAMENTO PROGRESS
Dopo ogni task completato, aggiorna:
`/home/google/Sources/LucaSacchiNet/openrouter-watcher/export/progress.md`
Esempio:
```markdown
### 👤 Autenticazione Utenti (T17-T22)
- [x] T17: Pydantic schemas auth - Completato [timestamp]
- [x] T18: Endpoint POST /api/auth/register - Completato [timestamp]
- [ ] T19: Endpoint POST /api/auth/login - In progress
- [ ] T20: Endpoint POST /api/auth/logout
- [ ] T21: Dipendenza get_current_user
- [ ] T22: Test auth endpoints
**Progresso sezione:** 33% (2/6 task)
**Progresso totale:** 24% (18/74 task)
```
---
## ✅ CRITERI DI ACCETTAZIONE
- [ ] T17: Schemas auth completi con validazione
- [ ] T18: Endpoint /api/auth/register funzionante (201/400)
- [ ] T19: Endpoint /api/auth/login funzionante (200/401)
- [ ] T20: Endpoint /api/auth/logout funzionante
- [ ] T21: get_current_user dependency funzionante
- [ ] T22: Test completi per auth (coverage >= 90%)
- [ ] Tutti i test passano (`pytest tests/unit/routers/test_auth.py`)
- [ ] Nessuna password in plaintext nei log/errori
- [ ] 6 commit atomici (uno per task)
- [ ] progress.md aggiornato
---
## 🚀 COMANDO DI VERIFICA
Al termine, esegui:
```bash
cd /home/google/Sources/LucaSacchiNet/openrouter-watcher
pytest tests/unit/schemas/test_auth_schemas.py -v
pytest tests/unit/routers/test_auth.py -v --cov=src/openrouter_monitor/routers
# Verifica endpoint con curl
curl -X POST http://localhost:8000/api/auth/register \
-H "Content-Type: application/json" \
-d '{"email":"test@example.com","password":"SecurePass123!","password_confirm":"SecurePass123!"}'
curl -X POST http://localhost:8000/api/auth/login \
-H "Content-Type: application/json" \
-d '{"email":"test@example.com","password":"SecurePass123!"}'
```
---
## 🔒 CONSIDERAZIONI SICUREZZA
### Do's ✅
- Usare `get_current_user` per proteggere endpoint
- Non loggare mai password in plaintext
- Ritornare errori generici per credenziali invalide
- Usare HTTPS in produzione
- Validare tutti gli input con Pydantic
### Don'ts ❌
- MAI ritornare password hash nelle response
- MAI loggare token JWT completi
- MAI usare GET per operazioni che modificano dati
- MAI ignorare eccezioni di autenticazione
---
## 📝 NOTE
- Usa SEMPRE path assoluti: `/home/google/Sources/LucaSacchiNet/openrouter-watcher/`
- Segui le convenzioni in `.opencode/agents/tdd-developer.md`
- Task devono essere verificabili in < 2 ore ciascuno
- Documenta bug complessi in `/docs/bug_ledger.md`
- Usa conventional commits:
- `feat(schemas): T17 add Pydantic auth schemas`
- `feat(auth): T18 implement user registration endpoint`
- `feat(auth): T19 implement user login endpoint`
- `feat(auth): T20 implement user logout endpoint`
- `feat(deps): T21 implement get_current_user dependency`
- `test(auth): T22 add comprehensive auth endpoint tests`
**AGENTE:** @tdd-developer
**INIZIA CON:** T17 - Pydantic schemas

View File

@@ -0,0 +1,285 @@
# Prompt di Ingaggio: User Authentication (T17-T22)
## 🎯 MISSIONE
Implementare la fase **User Authentication** (T17-T22) del progetto OpenRouter API Key Monitor seguendo rigorosamente TDD.
---
## 📋 CONTEXTO
**AGENTE:** @tdd-developer
**Repository:** `/home/google/Sources/LucaSacchiNet/openrouter-watcher`
**Stato Attuale:**
- ✅ Setup completato (T01-T05): 59 test
- ✅ Database & Models (T06-T11): 73 test
- ✅ Security Services (T12-T16): 70 test
- 🎯 **Totale: 202 test passanti, 100% coverage sui moduli implementati**
**Servizi Pronti da utilizzare:**
- `hash_password()`, `verify_password()` - in `services/password.py`
- `create_access_token()`, `decode_access_token()` - in `services/jwt.py`
- `EncryptionService` - in `services/encryption.py`
- `generate_api_token()`, `verify_api_token()` - in `services/token.py`
- `User`, `ApiKey`, `UsageStats`, `ApiToken` models
- `get_db()`, `Base` - in `database.py`
**Documentazione:**
- PRD: `/home/google/Sources/LucaSacchiNet/openrouter-watcher/prd.md`
- Architecture: `/home/google/Sources/LucaSacchiNet/openrouter-watcher/export/architecture.md`
- Prompt Dettagliato: `/home/google/Sources/LucaSacchiNet/openrouter-watcher/prompt/prompt-authentication.md`
---
## 🔧 TASK DA IMPLEMENTARE
### T17: Creare Pydantic Schemas per Autenticazione
**File:** `src/openrouter_monitor/schemas/auth.py`
**Requisiti:**
- `UserRegister`: email (EmailStr), password (min 12), password_confirm
- Validatore: richiama `validate_password_strength()`
- Root validator: password == password_confirm
- `UserLogin`: email, password
- `UserResponse`: id, email, created_at, is_active (orm_mode=True)
- `TokenResponse`: access_token, token_type="bearer", expires_in
- `TokenData`: user_id (sub), exp
**Test:** `tests/unit/schemas/test_auth_schemas.py`
---
### T18: Implementare Endpoint POST /api/auth/register
**File:** `src/openrouter_monitor/routers/auth.py`
**Requisiti:**
- Endpoint: `POST /api/auth/register`
- Riceve: `UserRegister` schema
- Logica:
1. Verifica email non esista: `db.query(User).filter(User.email == ...).first()`
2. Se esiste: HTTPException 400 "Email already registered"
3. Hash password: `hash_password(user_data.password)`
4. Crea User: `User(email=..., password_hash=...)`
5. Salva: `db.add()`, `db.commit()`, `db.refresh()`
6. Ritorna: `UserResponse`, status 201
**Test:** Register success, email duplicata (400), password debole (422)
---
### T19: Implementare Endpoint POST /api/auth/login
**File:** `src/openrouter_monitor/routers/auth.py`
**Requisiti:**
- Endpoint: `POST /api/auth/login`
- Riceve: `UserLogin` schema
- Logica:
1. Trova utente per email
2. Se non trovato o inattivo: HTTPException 401 "Invalid credentials"
3. Verifica password: `verify_password(credentials.password, user.password_hash)`
4. Se fallita: HTTPException 401
5. Genera JWT: `create_access_token(data={"sub": str(user.id)})`
6. Ritorna: `TokenResponse` con access_token
**Test:** Login success (200 + token), email inesistente (401), password sbagliata (401), utente inattivo (401)
---
### T20: Implementare Endpoint POST /api/auth/logout
**File:** `src/openrouter_monitor/routers/auth.py`
**Requisiti:**
- Endpoint: `POST /api/auth/logout`
- Richiede: `current_user: User = Depends(get_current_user)`
- Logica: JWT stateless, logout gestito lato client
- Ritorna: `{"message": "Successfully logged out"}`
**Test:** Logout con token valido (200), senza token (401)
---
### T21: Implementare Dipendenza get_current_user
**File:** `src/openrouter_monitor/dependencies/auth.py`
**Requisiti:**
```python
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
) -> User:
token = credentials.credentials
try:
payload = decode_access_token(token)
user_id = int(payload.get("sub"))
if not user_id:
raise HTTPException(401, "Invalid token payload")
except JWTError:
raise HTTPException(401, "Invalid or expired token")
user = db.query(User).filter(User.id == user_id).first()
if not user or not user.is_active:
raise HTTPException(401, "User not found or inactive")
return user
```
**Test:** Token valido ritorna utente, token mancante (401), token scaduto (401), token invalido (401), utente inesistente (401), utente inattivo (401)
---
### T22: Scrivere Test per Auth Endpoints
**File:** `tests/unit/routers/test_auth.py`
**Requisiti:**
- Usare `TestClient` da FastAPI
- Fixture: `test_user`, `auth_token`, `authorized_client`
- Test coverage >= 90%
**Test da implementare:**
- Register: success (201), email duplicata (400), password debole (422), email invalida (422)
- Login: success (200 + token), email inesistente (401), password sbagliata (401), utente inattivo (401)
- Logout: success (200), senza token (401)
- get_current_user: protetto con token valido, senza token (401), token scaduto (401)
---
## 🔄 WORKFLOW TDD
Per **OGNI** task:
1. **RED**: Scrivi test che fallisce (prima del codice!)
2. **GREEN**: Implementa codice minimo per passare il test
3. **REFACTOR**: Migliora codice, test rimangono verdi
---
## 📁 STRUTTURA FILE DA CREARE
```
src/openrouter_monitor/
├── schemas/
│ ├── __init__.py
│ └── auth.py # T17
├── routers/
│ ├── __init__.py
│ └── auth.py # T18, T19, T20
└── dependencies/
├── __init__.py
└── auth.py # T21
tests/unit/
├── schemas/
│ ├── __init__.py
│ └── test_auth_schemas.py # T17 + T22
└── routers/
├── __init__.py
└── test_auth.py # T18-T21 + T22
```
---
## 🧪 ESEMPI TEST
### Test Schema
```python
@pytest.mark.unit
def test_user_register_valid_data_passes_validation():
data = UserRegister(
email="test@example.com",
password="SecurePass123!",
password_confirm="SecurePass123!"
)
assert data.email == "test@example.com"
```
### Test Endpoint
```python
@pytest.mark.asyncio
async def test_register_new_user_returns_201(client, db_session):
response = client.post("/api/auth/register", json={
"email": "test@example.com",
"password": "SecurePass123!",
"password_confirm": "SecurePass123!"
})
assert response.status_code == 201
assert response.json()["email"] == "test@example.com"
```
---
## ✅ CRITERI DI ACCETTAZIONE
- [ ] T17: Schemas auth con validazione completa
- [ ] T18: POST /api/auth/register (201/400/422)
- [ ] T19: POST /api/auth/login (200/401)
- [ ] T20: POST /api/auth/logout (200)
- [ ] T21: get_current_user dependency funzionante
- [ ] T22: Test auth coverage >= 90%
- [ ] Tutti i test passano: `pytest tests/unit/routers/test_auth.py -v`
- [ ] 6 commit atomici con conventional commits
- [ ] progress.md aggiornato
---
## 📝 COMMIT MESSAGES
```
feat(schemas): T17 add Pydantic auth schemas
feat(auth): T18 implement user registration endpoint
feat(auth): T19 implement user login endpoint
feat(auth): T20 implement user logout endpoint
feat(deps): T21 implement get_current_user dependency
test(auth): T22 add comprehensive auth endpoint tests
```
---
## 🚀 VERIFICA FINALE
```bash
cd /home/google/Sources/LucaSacchiNet/openrouter-watcher
# Test schemas
pytest tests/unit/schemas/test_auth_schemas.py -v
# Test routers
pytest tests/unit/routers/test_auth.py -v --cov=src/openrouter_monitor/routers
# Test completo
pytest tests/unit/ -v --cov=src/openrouter_monitor
```
---
## ⚠️ NOTE IMPORTANTI
- **Path assoluti**: Usa sempre `/home/google/Sources/LucaSacchiNet/openrouter-watcher/`
- **Servizi esistenti**: Riutilizza `hash_password`, `verify_password`, `create_access_token`, `decode_access_token`
- **Database**: Usa `get_db()` da `database.py` per dependency injection
- **Models**: Importa da `models` package (User, ApiKey, etc.)
- **Sicurezza**: Mai loggare password o token in plaintext
- **Errori**: Errori generici per credenziali invalide (non leakare info)
---
**AGENTE:** @tdd-developer
**INIZIA CON:** T17 - Pydantic schemas
**QUANDO FINITO:** Conferma completamento e aggiorna progress.md

View File

@@ -0,0 +1,16 @@
"""Schemas package for OpenRouter Monitor."""
from openrouter_monitor.schemas.auth import (
TokenData,
TokenResponse,
UserLogin,
UserRegister,
UserResponse,
)
__all__ = [
"UserRegister",
"UserLogin",
"UserResponse",
"TokenResponse",
"TokenData",
]

View File

@@ -0,0 +1,173 @@
"""Authentication Pydantic schemas.
T17: Pydantic schemas for user registration, login, and token management.
"""
from datetime import datetime
from typing import Optional, Union
from pydantic import BaseModel, ConfigDict, EmailStr, Field, field_validator, model_validator
from openrouter_monitor.services.password import validate_password_strength
class UserRegister(BaseModel):
"""Schema for user registration.
Attributes:
email: User email address (must be valid email format)
password: User password (min 12 chars, must pass strength validation)
password_confirm: Password confirmation (must match password)
"""
email: EmailStr = Field(
..., # Required field
description="User email address",
examples=["user@example.com"]
)
password: str = Field(
...,
min_length=12,
description="User password (min 12 characters)",
examples=["SecurePass123!"]
)
password_confirm: str = Field(
...,
description="Password confirmation",
examples=["SecurePass123!"]
)
@field_validator('password')
@classmethod
def validate_password(cls, v: str) -> str:
"""Validate password strength.
Args:
v: The password value to validate
Returns:
The password if valid
Raises:
ValueError: If password doesn't meet strength requirements
"""
if not validate_password_strength(v):
raise ValueError(
"Password must be at least 12 characters long and contain "
"at least one uppercase letter, one lowercase letter, "
"one digit, and one special character"
)
return v
@model_validator(mode='after')
def check_passwords_match(self) -> 'UserRegister':
"""Verify that password and password_confirm match.
Returns:
The validated model instance
Raises:
ValueError: If passwords don't match
"""
if self.password != self.password_confirm:
raise ValueError("Passwords do not match")
return self
class UserLogin(BaseModel):
"""Schema for user login.
Attributes:
email: User email address
password: User password
"""
email: EmailStr = Field(
...,
description="User email address",
examples=["user@example.com"]
)
password: str = Field(
...,
description="User password",
examples=["your-password"]
)
class UserResponse(BaseModel):
"""Schema for user response (returned to client).
Attributes:
id: User ID
email: User email address
created_at: User creation timestamp
is_active: Whether the user account is active
"""
model_config = ConfigDict(from_attributes=True)
id: int = Field(
...,
description="User ID",
examples=[1]
)
email: EmailStr = Field(
...,
description="User email address",
examples=["user@example.com"]
)
created_at: datetime = Field(
...,
description="User creation timestamp",
examples=["2024-01-01T12:00:00"]
)
is_active: bool = Field(
...,
description="Whether the user account is active",
examples=[True]
)
class TokenResponse(BaseModel):
"""Schema for token response (returned after login).
Attributes:
access_token: The JWT access token
token_type: Token type (always 'bearer')
expires_in: Token expiration time in seconds
"""
access_token: str = Field(
...,
description="JWT access token",
examples=["eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."]
)
token_type: str = Field(
default="bearer",
description="Token type",
examples=["bearer"]
)
expires_in: int = Field(
...,
description="Token expiration time in seconds",
examples=[86400]
)
class TokenData(BaseModel):
"""Schema for token payload data.
Attributes:
user_id: User ID (from 'sub' claim in JWT)
exp: Token expiration timestamp
"""
user_id: Union[str, int] = Field(
...,
description="User ID (from JWT 'sub' claim)",
examples=["123"]
)
exp: datetime = Field(
...,
description="Token expiration timestamp",
examples=["2024-01-02T12:00:00"]
)

View File

@@ -0,0 +1,262 @@
"""Tests for authentication Pydantic schemas.
T17: Test Pydantic schemas for user authentication.
"""
import pytest
from pydantic import ValidationError, EmailStr
from datetime import datetime, timezone
class TestUserRegister:
"""Tests for UserRegister schema."""
def test_valid_registration(self):
"""Test valid user registration data."""
# This will fail until schema is implemented
from openrouter_monitor.schemas.auth import UserRegister
data = UserRegister(
email="test@example.com",
password="SecurePass123!",
password_confirm="SecurePass123!"
)
assert data.email == "test@example.com"
assert data.password == "SecurePass123!"
assert data.password_confirm == "SecurePass123!"
def test_invalid_email_format(self):
"""Test that invalid email format raises ValidationError."""
from openrouter_monitor.schemas.auth import UserRegister
with pytest.raises(ValidationError, match="email"):
UserRegister(
email="not-an-email",
password="SecurePass123!",
password_confirm="SecurePass123!"
)
def test_password_too_short(self):
"""Test that password shorter than 12 chars raises ValidationError."""
from openrouter_monitor.schemas.auth import UserRegister
with pytest.raises(ValidationError, match="password"):
UserRegister(
email="test@example.com",
password="Short1!",
password_confirm="Short1!"
)
def test_password_missing_uppercase(self):
"""Test that password without uppercase raises ValidationError."""
from openrouter_monitor.schemas.auth import UserRegister
with pytest.raises(ValidationError, match="password"):
UserRegister(
email="test@example.com",
password="lowercase123!",
password_confirm="lowercase123!"
)
def test_password_missing_lowercase(self):
"""Test that password without lowercase raises ValidationError."""
from openrouter_monitor.schemas.auth import UserRegister
with pytest.raises(ValidationError, match="password"):
UserRegister(
email="test@example.com",
password="UPPERCASE123!",
password_confirm="UPPERCASE123!"
)
def test_password_missing_digit(self):
"""Test that password without digit raises ValidationError."""
from openrouter_monitor.schemas.auth import UserRegister
with pytest.raises(ValidationError, match="password"):
UserRegister(
email="test@example.com",
password="NoDigitsHere!",
password_confirm="NoDigitsHere!"
)
def test_password_missing_special_char(self):
"""Test that password without special char raises ValidationError."""
from openrouter_monitor.schemas.auth import UserRegister
with pytest.raises(ValidationError, match="password"):
UserRegister(
email="test@example.com",
password="NoSpecialChars123",
password_confirm="NoSpecialChars123"
)
def test_passwords_do_not_match(self):
"""Test that mismatched passwords raise ValidationError."""
from openrouter_monitor.schemas.auth import UserRegister
with pytest.raises(ValidationError, match="match"):
UserRegister(
email="test@example.com",
password="SecurePass123!",
password_confirm="DifferentPass123!"
)
def test_password_strength_validator_called(self):
"""Test that validate_password_strength is called."""
from openrouter_monitor.schemas.auth import UserRegister
# Valid password should pass
data = UserRegister(
email="test@example.com",
password="ValidPass123!@#",
password_confirm="ValidPass123!@#"
)
assert data.password == "ValidPass123!@#"
class TestUserLogin:
"""Tests for UserLogin schema."""
def test_valid_login(self):
"""Test valid login credentials."""
from openrouter_monitor.schemas.auth import UserLogin
data = UserLogin(
email="test@example.com",
password="anypassword"
)
assert data.email == "test@example.com"
assert data.password == "anypassword"
def test_invalid_email_format(self):
"""Test that invalid email format raises ValidationError."""
from openrouter_monitor.schemas.auth import UserLogin
with pytest.raises(ValidationError, match="email"):
UserLogin(
email="not-an-email",
password="password"
)
def test_empty_password(self):
"""Test that empty password is allowed (validation happens elsewhere)."""
from openrouter_monitor.schemas.auth import UserLogin
data = UserLogin(
email="test@example.com",
password=""
)
assert data.password == ""
class TestUserResponse:
"""Tests for UserResponse schema."""
def test_valid_response(self):
"""Test valid user response."""
from openrouter_monitor.schemas.auth import UserResponse
from datetime import datetime
created_at = datetime(2024, 1, 1, 12, 0, 0)
data = UserResponse(
id=1,
email="test@example.com",
created_at=created_at,
is_active=True
)
assert data.id == 1
assert data.email == "test@example.com"
assert data.created_at == created_at
assert data.is_active is True
def test_from_orm(self):
"""Test that UserResponse can be created from ORM model."""
from openrouter_monitor.schemas.auth import UserResponse
from datetime import datetime
# Mock ORM object
class MockUser:
id = 1
email = "test@example.com"
created_at = datetime(2024, 1, 1, 12, 0, 0)
is_active = True
user = UserResponse.model_validate(MockUser())
assert user.id == 1
assert user.email == "test@example.com"
class TestTokenResponse:
"""Tests for TokenResponse schema."""
def test_valid_token_response(self):
"""Test valid token response."""
from openrouter_monitor.schemas.auth import TokenResponse
data = TokenResponse(
access_token="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
token_type="bearer",
expires_in=3600
)
assert data.access_token == "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
assert data.token_type == "bearer"
assert data.expires_in == 3600
def test_default_token_type(self):
"""Test that default token_type is 'bearer'."""
from openrouter_monitor.schemas.auth import TokenResponse
data = TokenResponse(
access_token="some_token",
expires_in=3600
)
assert data.token_type == "bearer"
class TestTokenData:
"""Tests for TokenData schema."""
def test_valid_token_data(self):
"""Test valid token data."""
from openrouter_monitor.schemas.auth import TokenData
exp = datetime.now(timezone.utc)
data = TokenData(
user_id="123",
exp=exp
)
assert data.user_id == "123"
assert data.exp == exp
def test_user_id_from_sub(self):
"""Test that user_id can be extracted from sub claim."""
from openrouter_monitor.schemas.auth import TokenData
exp = datetime.now(timezone.utc)
# TokenData might be created from JWT payload with 'sub' field
data = TokenData(user_id="456", exp=exp)
assert data.user_id == "456"
def test_user_id_integer_conversion(self):
"""Test that user_id handles integer IDs."""
from openrouter_monitor.schemas.auth import TokenData
exp = datetime.now(timezone.utc)
data = TokenData(
user_id=123, # Integer ID
exp=exp
)
assert data.user_id == 123