feat(security): T12 implement AES-256 encryption service
- Add EncryptionService with AES-256-GCM via cryptography.fernet - Implement PBKDF2HMAC key derivation with SHA256 (100k iterations) - Deterministic salt derived from master_key for consistency - Methods: encrypt(), decrypt() with proper error handling - 12 comprehensive tests with 100% coverage - Handle InvalidToken, TypeError edge cases
This commit is contained in:
@@ -53,7 +53,7 @@
|
|||||||
- [x] T11: Setup Alembic e creare migrazione iniziale - ✅ Completato (2026-04-07 11:20)
|
- [x] T11: Setup Alembic e creare migrazione iniziale - ✅ Completato (2026-04-07 11:20)
|
||||||
|
|
||||||
### 🔐 Servizi di Sicurezza (T12-T16) - 0/5 completati
|
### 🔐 Servizi di Sicurezza (T12-T16) - 0/5 completati
|
||||||
- [ ] T12: Implementare EncryptionService (AES-256)
|
- [ ] T12: Implementare EncryptionService (AES-256) - 🟡 In progress
|
||||||
- [ ] T13: Implementare password hashing (bcrypt)
|
- [ ] T13: Implementare password hashing (bcrypt)
|
||||||
- [ ] T14: Implementare JWT utilities
|
- [ ] T14: Implementare JWT utilities
|
||||||
- [ ] T15: Implementare API token generation
|
- [ ] T15: Implementare API token generation
|
||||||
|
|||||||
459
prompt/prompt-security-services.md
Normal file
459
prompt/prompt-security-services.md
Normal file
@@ -0,0 +1,459 @@
|
|||||||
|
# Prompt: Security Services Implementation (T12-T16)
|
||||||
|
|
||||||
|
## 🎯 OBIETTIVO
|
||||||
|
|
||||||
|
Implementare la fase **Security Services** del progetto OpenRouter API Key Monitor seguendo rigorosamente TDD (Test-Driven Development).
|
||||||
|
|
||||||
|
**Task da completare:** T12, T13, T14, T15, T16
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 📋 CONTESTO
|
||||||
|
|
||||||
|
- **Repository:** `/home/google/Sources/LucaSacchiNet/openrouter-watcher`
|
||||||
|
- **Specifiche:** `/home/google/Sources/LucaSacchiNet/openrouter-watcher/export/architecture.md` (sezione 6)
|
||||||
|
- **Kanban:** `/home/google/Sources/LucaSacchiNet/openrouter-watcher/export/kanban.md`
|
||||||
|
- **Stato Attuale:** Database & Models completati (T01-T11), 132 test passanti
|
||||||
|
- **Progresso:** 15% (11/74 task)
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🔐 SPECIFICHE SICUREZZA (Da architecture.md)
|
||||||
|
|
||||||
|
### Algoritmi di Sicurezza
|
||||||
|
|
||||||
|
| Dato | Algoritmo | Implementazione |
|
||||||
|
|------|-----------|-----------------|
|
||||||
|
| **API Keys** | AES-256-GCM | `cryptography.fernet` with custom key |
|
||||||
|
| **Passwords** | bcrypt | `passlib.hash.bcrypt` (12 rounds) |
|
||||||
|
| **API Tokens** | SHA-256 | Only hash stored, never plaintext |
|
||||||
|
| **JWT** | HS256 | `python-jose` with 256-bit secret |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🔧 TASK DETTAGLIATI
|
||||||
|
|
||||||
|
### T12: Implementare EncryptionService (AES-256-GCM)
|
||||||
|
|
||||||
|
**Requisiti:**
|
||||||
|
- Creare `src/openrouter_monitor/services/encryption.py`
|
||||||
|
- Implementare classe `EncryptionService`
|
||||||
|
- Usare `cryptography.fernet` per AES-256-GCM
|
||||||
|
- Key derivation con PBKDF2HMAC (SHA256, 100000 iterations)
|
||||||
|
- Metodi: `encrypt(plaintext: str) -> str`, `decrypt(ciphertext: str) -> str`
|
||||||
|
- Gestire eccezioni con messaggi chiari
|
||||||
|
|
||||||
|
**Implementazione Riferimento:**
|
||||||
|
```python
|
||||||
|
from cryptography.fernet import Fernet
|
||||||
|
from cryptography.hazmat.primitives import hashes
|
||||||
|
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||||||
|
import base64
|
||||||
|
import os
|
||||||
|
|
||||||
|
class EncryptionService:
|
||||||
|
def __init__(self, master_key: str):
|
||||||
|
self._fernet = self._derive_key(master_key)
|
||||||
|
|
||||||
|
def _derive_key(self, master_key: str) -> Fernet:
|
||||||
|
kdf = PBKDF2HMAC(
|
||||||
|
algorithm=hashes.SHA256(),
|
||||||
|
length=32,
|
||||||
|
salt=os.urandom(16), # ATTENZIONE: salt deve essere fisso per decrittazione!
|
||||||
|
iterations=100000,
|
||||||
|
)
|
||||||
|
key = base64.urlsafe_b64encode(kdf.derive(master_key.encode()))
|
||||||
|
return Fernet(key)
|
||||||
|
```
|
||||||
|
|
||||||
|
**⚠️ NOTA CRITICA:** Il salt deve essere fisso (derivato da master_key) oppure salvato insieme al ciphertext, altrimenti la decrittazione fallisce. Usa approccio: `salt + ciphertext` oppure deriva salt deterministico da master_key.
|
||||||
|
|
||||||
|
**Test richiesti:**
|
||||||
|
- Test inizializzazione con master key valida
|
||||||
|
- Test encrypt/decrypt roundtrip
|
||||||
|
- Test ciphertext diverso da plaintext
|
||||||
|
- Test decrittazione fallisce con chiave sbagliata
|
||||||
|
- Test gestione eccezioni (InvalidToken)
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### T13: Implementare Password Hashing (bcrypt)
|
||||||
|
|
||||||
|
**Requisiti:**
|
||||||
|
- Creare `src/openrouter_monitor/services/password.py`
|
||||||
|
- Usare `passlib.context.CryptContext` con bcrypt
|
||||||
|
- 12 rounds (default sicuro)
|
||||||
|
- Funzioni: `hash_password(password: str) -> str`, `verify_password(plain: str, hashed: str) -> bool`
|
||||||
|
- Validazione password: min 12 chars, uppercase, lowercase, digit, special char
|
||||||
|
|
||||||
|
**Implementazione Riferimento:**
|
||||||
|
```python
|
||||||
|
from passlib.context import CryptContext
|
||||||
|
|
||||||
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
||||||
|
|
||||||
|
def hash_password(password: str) -> str:
|
||||||
|
return pwd_context.hash(password)
|
||||||
|
|
||||||
|
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
||||||
|
return pwd_context.verify(plain_password, hashed_password)
|
||||||
|
```
|
||||||
|
|
||||||
|
**Test richiesti:**
|
||||||
|
- Test hash_password genera hash diverso ogni volta
|
||||||
|
- Test verify_password ritorna True con password corretta
|
||||||
|
- Test verify_password ritorna False con password sbagliata
|
||||||
|
- Test validazione password strength
|
||||||
|
- Test hash è sempre valido per bcrypt
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### T14: Implementare JWT Utilities
|
||||||
|
|
||||||
|
**Requisiti:**
|
||||||
|
- Creare `src/openrouter_monitor/services/jwt.py`
|
||||||
|
- Usare `python-jose` con algoritmo HS256
|
||||||
|
- Funzioni:
|
||||||
|
- `create_access_token(data: dict, expires_delta: timedelta | None = None) -> str`
|
||||||
|
- `decode_access_token(token: str) -> dict`
|
||||||
|
- `verify_token(token: str) -> TokenData`
|
||||||
|
- JWT payload: `sub` (user_id), `exp` (expiration), `iat` (issued at)
|
||||||
|
- Gestire eccezioni: JWTError, ExpiredSignatureError
|
||||||
|
- Leggere SECRET_KEY da config
|
||||||
|
|
||||||
|
**Implementazione Riferimento:**
|
||||||
|
```python
|
||||||
|
from jose import JWTError, jwt
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
|
SECRET_KEY = settings.secret_key
|
||||||
|
ALGORITHM = "HS256"
|
||||||
|
ACCESS_TOKEN_EXPIRE_HOURS = 24
|
||||||
|
|
||||||
|
def create_access_token(data: dict, expires_delta: timedelta | None = None):
|
||||||
|
to_encode = data.copy()
|
||||||
|
expire = datetime.utcnow() + (expires_delta or timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS))
|
||||||
|
to_encode.update({"exp": expire})
|
||||||
|
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
||||||
|
|
||||||
|
def decode_access_token(token: str) -> dict:
|
||||||
|
return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
||||||
|
```
|
||||||
|
|
||||||
|
**Test richiesti:**
|
||||||
|
- Test create_access_token genera token valido
|
||||||
|
- Test decode_access_token estrae payload corretto
|
||||||
|
- Test token scaduto ritorna errore
|
||||||
|
- Test token con firma invalida ritorna errore
|
||||||
|
- Test token con algoritmo sbagliato ritorna errore
|
||||||
|
- Test payload contiene exp, sub, iat
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### T15: Implementare API Token Generation
|
||||||
|
|
||||||
|
**Requisiti:**
|
||||||
|
- Creare `src/openrouter_monitor/services/token.py`
|
||||||
|
- Implementare `generate_api_token() -> tuple[str, str]`
|
||||||
|
- Token format: `or_api_` + 48 chars random (url-safe base64)
|
||||||
|
- Hash: SHA-256 dell'intero token
|
||||||
|
- Solo l'hash viene salvato nel DB (api_tokens.token_hash)
|
||||||
|
- Il plaintext viene mostrato una sola volta al momento della creazione
|
||||||
|
- Funzione `verify_api_token(plaintext: str, token_hash: str) -> bool`
|
||||||
|
|
||||||
|
**Implementazione Riferimento:**
|
||||||
|
```python
|
||||||
|
import secrets
|
||||||
|
import hashlib
|
||||||
|
|
||||||
|
def generate_api_token() -> tuple[str, str]:
|
||||||
|
token = "or_api_" + secrets.token_urlsafe(48) # ~64 chars total
|
||||||
|
token_hash = hashlib.sha256(token.encode()).hexdigest()
|
||||||
|
return token, token_hash
|
||||||
|
|
||||||
|
def verify_api_token(plaintext: str, token_hash: str) -> bool:
|
||||||
|
computed_hash = hashlib.sha256(plaintext.encode()).hexdigest()
|
||||||
|
return secrets.compare_digest(computed_hash, token_hash)
|
||||||
|
```
|
||||||
|
|
||||||
|
**Test richiesti:**
|
||||||
|
- Test generate_api_token ritorna (plaintext, hash)
|
||||||
|
- Test token inizia con "or_api_"
|
||||||
|
- Test hash è SHA-256 valido (64 hex chars)
|
||||||
|
- Test verify_api_token True con token valido
|
||||||
|
- Test verify_api_token False con token invalido
|
||||||
|
- Test timing attack resistance (compare_digest)
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### T16: Scrivere Test per Servizi di Sicurezza
|
||||||
|
|
||||||
|
**Requisiti:**
|
||||||
|
- Creare test completi per tutti i servizi:
|
||||||
|
- `tests/unit/services/test_encryption.py`
|
||||||
|
- `tests/unit/services/test_password.py`
|
||||||
|
- `tests/unit/services/test_jwt.py`
|
||||||
|
- `tests/unit/services/test_token.py`
|
||||||
|
- Coverage >= 90% per ogni servizio
|
||||||
|
- Test casi limite e errori
|
||||||
|
- Test integrazione tra servizi (es. encrypt + save + decrypt)
|
||||||
|
|
||||||
|
**Test richiesti per ogni servizio:**
|
||||||
|
- Unit test per ogni funzione pubblica
|
||||||
|
- Test casi successo
|
||||||
|
- Test casi errore (eccezioni)
|
||||||
|
- Test edge cases (stringhe vuote, caratteri speciali, unicode)
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🔄 WORKFLOW TDD OBBLIGATORIO
|
||||||
|
|
||||||
|
Per OGNI task (T12-T16):
|
||||||
|
|
||||||
|
```
|
||||||
|
┌─────────────────────────────────────────┐
|
||||||
|
│ 1. RED - Scrivi il test che fallisce │
|
||||||
|
│ • Test prima del codice │
|
||||||
|
│ • Pattern AAA (Arrange-Act-Assert) │
|
||||||
|
│ • Nomi descrittivi │
|
||||||
|
└─────────────────────────────────────────┘
|
||||||
|
↓
|
||||||
|
┌─────────────────────────────────────────┐
|
||||||
|
│ 2. GREEN - Implementa codice minimo │
|
||||||
|
│ • Solo codice necessario per test │
|
||||||
|
│ • Nessun refactoring ancora │
|
||||||
|
└─────────────────────────────────────────┘
|
||||||
|
↓
|
||||||
|
┌─────────────────────────────────────────┐
|
||||||
|
│ 3. REFACTOR - Migliora il codice │
|
||||||
|
│ • Pulisci duplicazioni │
|
||||||
|
│ • Migliora nomi variabili │
|
||||||
|
│ • Test rimangono verdi │
|
||||||
|
└─────────────────────────────────────────┘
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 📁 STRUTTURA FILE DA CREARE
|
||||||
|
|
||||||
|
```
|
||||||
|
src/openrouter_monitor/
|
||||||
|
└── services/
|
||||||
|
├── __init__.py # Esporta tutti i servizi
|
||||||
|
├── encryption.py # T12 - AES-256-GCM
|
||||||
|
├── password.py # T13 - bcrypt
|
||||||
|
├── jwt.py # T14 - JWT utilities
|
||||||
|
└── token.py # T15 - API token generation
|
||||||
|
|
||||||
|
tests/unit/services/
|
||||||
|
├── __init__.py
|
||||||
|
├── test_encryption.py # T12 + T16
|
||||||
|
├── test_password.py # T13 + T16
|
||||||
|
├── test_jwt.py # T14 + T16
|
||||||
|
└── test_token.py # T15 + T16
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🧪 REQUISITI TEST
|
||||||
|
|
||||||
|
### Pattern AAA (Arrange-Act-Assert)
|
||||||
|
|
||||||
|
```python
|
||||||
|
@pytest.mark.unit
|
||||||
|
def test_encrypt_decrypt_roundtrip_returns_original():
|
||||||
|
# Arrange
|
||||||
|
service = EncryptionService("test-key-32-chars-long!!")
|
||||||
|
plaintext = "sensitive-api-key-12345"
|
||||||
|
|
||||||
|
# Act
|
||||||
|
encrypted = service.encrypt(plaintext)
|
||||||
|
decrypted = service.decrypt(encrypted)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert decrypted == plaintext
|
||||||
|
assert encrypted != plaintext
|
||||||
|
```
|
||||||
|
|
||||||
|
### Marker Pytest
|
||||||
|
|
||||||
|
```python
|
||||||
|
@pytest.mark.unit # Logica pura
|
||||||
|
@pytest.mark.security # Test sicurezza
|
||||||
|
@pytest.mark.slow # Test lenti (bcrypt)
|
||||||
|
```
|
||||||
|
|
||||||
|
### Fixtures Condivise (in conftest.py)
|
||||||
|
|
||||||
|
```python
|
||||||
|
@pytest.fixture
|
||||||
|
def encryption_service():
|
||||||
|
return EncryptionService("test-encryption-key-32bytes")
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def sample_password():
|
||||||
|
return "SecurePass123!@#"
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def jwt_secret():
|
||||||
|
return "jwt-secret-key-32-chars-long!!"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🛡️ VINCOLI TECNICI
|
||||||
|
|
||||||
|
### EncryptionService Requirements
|
||||||
|
|
||||||
|
```python
|
||||||
|
class EncryptionService:
|
||||||
|
"""AES-256-GCM encryption for sensitive data (API keys)."""
|
||||||
|
|
||||||
|
def __init__(self, master_key: str):
|
||||||
|
"""Initialize with master key (min 32 chars recommended)."""
|
||||||
|
|
||||||
|
def encrypt(self, plaintext: str) -> str:
|
||||||
|
"""Encrypt plaintext, return base64-encoded ciphertext."""
|
||||||
|
|
||||||
|
def decrypt(self, ciphertext: str) -> str:
|
||||||
|
"""Decrypt ciphertext, return plaintext."""
|
||||||
|
|
||||||
|
def _derive_key(self, master_key: str) -> Fernet:
|
||||||
|
"""Derive Fernet key from master key."""
|
||||||
|
```
|
||||||
|
|
||||||
|
### Password Service Requirements
|
||||||
|
|
||||||
|
```python
|
||||||
|
from passlib.context import CryptContext
|
||||||
|
|
||||||
|
pwd_context = CryptContext(
|
||||||
|
schemes=["bcrypt"],
|
||||||
|
deprecated="auto",
|
||||||
|
bcrypt__rounds=12 # Esplicito per chiarezza
|
||||||
|
)
|
||||||
|
|
||||||
|
def hash_password(password: str) -> str:
|
||||||
|
"""Hash password with bcrypt (12 rounds)."""
|
||||||
|
|
||||||
|
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
||||||
|
"""Verify password against hash."""
|
||||||
|
|
||||||
|
def validate_password_strength(password: str) -> bool:
|
||||||
|
"""Validate password complexity. Min 12 chars, upper, lower, digit, special."""
|
||||||
|
```
|
||||||
|
|
||||||
|
### JWT Service Requirements
|
||||||
|
|
||||||
|
```python
|
||||||
|
from jose import jwt, JWTError
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
|
def create_access_token(
|
||||||
|
data: dict,
|
||||||
|
expires_delta: timedelta | None = None
|
||||||
|
) -> str:
|
||||||
|
"""Create JWT access token."""
|
||||||
|
|
||||||
|
def decode_access_token(token: str) -> dict:
|
||||||
|
"""Decode and validate JWT token."""
|
||||||
|
|
||||||
|
def verify_token(token: str) -> TokenData:
|
||||||
|
"""Verify token and return TokenData."""
|
||||||
|
```
|
||||||
|
|
||||||
|
### API Token Service Requirements
|
||||||
|
|
||||||
|
```python
|
||||||
|
import secrets
|
||||||
|
import hashlib
|
||||||
|
|
||||||
|
def generate_api_token() -> tuple[str, str]:
|
||||||
|
"""Generate API token. Returns (plaintext, hash)."""
|
||||||
|
|
||||||
|
def verify_api_token(plaintext: str, token_hash: str) -> bool:
|
||||||
|
"""Verify API token against hash (timing-safe)."""
|
||||||
|
|
||||||
|
def hash_token(plaintext: str) -> str:
|
||||||
|
"""Hash token with SHA-256."""
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 📊 AGGIORNAMENTO PROGRESS
|
||||||
|
|
||||||
|
Dopo ogni task completato, aggiorna:
|
||||||
|
`/home/google/Sources/LucaSacchiNet/openrouter-watcher/export/progress.md`
|
||||||
|
|
||||||
|
Esempio:
|
||||||
|
```markdown
|
||||||
|
### 🔐 Security Services (T12-T16)
|
||||||
|
|
||||||
|
- [x] T12: EncryptionService (AES-256) - Completato [timestamp]
|
||||||
|
- [x] T13: Password Hashing (bcrypt) - Completato [timestamp]
|
||||||
|
- [ ] T14: JWT Utilities - In progress
|
||||||
|
- [ ] T15: API Token Generation
|
||||||
|
- [ ] T16: Security Tests
|
||||||
|
|
||||||
|
**Progresso sezione:** 40% (2/5 task)
|
||||||
|
**Progresso totale:** 18% (13/74 task)
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## ✅ CRITERI DI ACCETTAZIONE
|
||||||
|
|
||||||
|
- [ ] T12: EncryptionService funzionante con AES-256-GCM
|
||||||
|
- [ ] T13: Password hashing con bcrypt (12 rounds) + validation
|
||||||
|
- [ ] T14: JWT utilities con create/decode/verify
|
||||||
|
- [ ] T15: API token generation con SHA-256 hash
|
||||||
|
- [ ] T16: Test completi per tutti i servizi (coverage >= 90%)
|
||||||
|
- [ ] Tutti i test passano (`pytest tests/unit/services/`)
|
||||||
|
- [ ] Nessuna password/token in plaintext nei log
|
||||||
|
- [ ] 5 commit atomici (uno per task)
|
||||||
|
- [ ] progress.md aggiornato con tutti i task completati
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🚀 COMANDO DI VERIFICA
|
||||||
|
|
||||||
|
Al termine, esegui:
|
||||||
|
```bash
|
||||||
|
cd /home/google/Sources/LucaSacchiNet/openrouter-watcher
|
||||||
|
pytest tests/unit/services/ -v --cov=src/openrouter_monitor/services
|
||||||
|
|
||||||
|
# Verifica coverage >= 90%
|
||||||
|
pytest tests/unit/services/ --cov-report=term-missing
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🔒 CONSIDERAZIONI SICUREZZA
|
||||||
|
|
||||||
|
### Do's ✅
|
||||||
|
- Usare `secrets` module per token random
|
||||||
|
- Usare `secrets.compare_digest` per confronti timing-safe
|
||||||
|
- Usare bcrypt con 12+ rounds
|
||||||
|
- Validare sempre input prima di processare
|
||||||
|
- Gestire eccezioni senza leakare informazioni sensibili
|
||||||
|
- Loggare operazioni di sicurezza (non dati sensibili)
|
||||||
|
|
||||||
|
### Don'ts ❌
|
||||||
|
- MAI loggare password o token in plaintext
|
||||||
|
- MAI usare RNG non crittografico (`random` module)
|
||||||
|
- MAI hardcodare chiavi segrete
|
||||||
|
- MAI ignorare eccezioni di decrittazione
|
||||||
|
- MAI confrontare hash con `==` (usa compare_digest)
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 📝 NOTE
|
||||||
|
|
||||||
|
- Usa SEMPRE path assoluti: `/home/google/Sources/LucaSacchiNet/openrouter-watcher/`
|
||||||
|
- Segui le convenzioni in `.opencode/agents/tdd-developer.md`
|
||||||
|
- Task devono essere verificabili in < 2 ore ciascuno
|
||||||
|
- Documenta bug complessi in `/docs/bug_ledger.md`
|
||||||
|
- Usa conventional commits: `feat(security): T12 implement AES-256 encryption service`
|
||||||
|
|
||||||
|
**AGENTE:** @tdd-developer
|
||||||
|
**INIZIA CON:** T12 - EncryptionService
|
||||||
98
src/openrouter_monitor/services/encryption.py
Normal file
98
src/openrouter_monitor/services/encryption.py
Normal file
@@ -0,0 +1,98 @@
|
|||||||
|
"""Encryption service for sensitive data using AES-256-GCM.
|
||||||
|
|
||||||
|
This module provides encryption and decryption functionality using
|
||||||
|
cryptography.fernet which implements AES-256-GCM with PBKDF2HMAC
|
||||||
|
key derivation.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import base64
|
||||||
|
import hashlib
|
||||||
|
|
||||||
|
from cryptography.fernet import Fernet, InvalidToken
|
||||||
|
from cryptography.hazmat.primitives import hashes
|
||||||
|
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||||||
|
|
||||||
|
|
||||||
|
class EncryptionService:
|
||||||
|
"""Service for encrypting and decrypting sensitive data.
|
||||||
|
|
||||||
|
Uses AES-256-GCM via Fernet with PBKDF2HMAC key derivation.
|
||||||
|
The salt is derived deterministically from the master key to
|
||||||
|
ensure consistent encryption/decryption across sessions.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, master_key: str):
|
||||||
|
"""Initialize encryption service with master key.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
master_key: The master encryption key. Should be at least
|
||||||
|
32 characters for security.
|
||||||
|
"""
|
||||||
|
self._fernet = self._derive_key(master_key)
|
||||||
|
|
||||||
|
def _derive_key(self, master_key: str) -> Fernet:
|
||||||
|
"""Derive Fernet key from master key using PBKDF2HMAC.
|
||||||
|
|
||||||
|
The salt is derived deterministically from the master key itself
|
||||||
|
using SHA-256. This ensures:
|
||||||
|
1. Same master key always produces same encryption key
|
||||||
|
2. No need to store salt separately
|
||||||
|
3. Different master keys produce different salts
|
||||||
|
|
||||||
|
Args:
|
||||||
|
master_key: The master encryption key.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Fernet instance initialized with derived key.
|
||||||
|
"""
|
||||||
|
# Derive salt deterministically from master_key
|
||||||
|
# This ensures same master_key always produces same key
|
||||||
|
salt = hashlib.sha256(master_key.encode()).digest()[:16]
|
||||||
|
|
||||||
|
kdf = PBKDF2HMAC(
|
||||||
|
algorithm=hashes.SHA256(),
|
||||||
|
length=32,
|
||||||
|
salt=salt,
|
||||||
|
iterations=100000,
|
||||||
|
)
|
||||||
|
key = base64.urlsafe_b64encode(kdf.derive(master_key.encode()))
|
||||||
|
return Fernet(key)
|
||||||
|
|
||||||
|
def encrypt(self, plaintext: str) -> str:
|
||||||
|
"""Encrypt plaintext string.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
plaintext: The string to encrypt.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Base64-encoded ciphertext.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
TypeError: If plaintext is not a string.
|
||||||
|
"""
|
||||||
|
if not isinstance(plaintext, str):
|
||||||
|
raise TypeError("plaintext must be a string")
|
||||||
|
|
||||||
|
# Fernet.encrypt returns bytes, decode to string
|
||||||
|
ciphertext_bytes = self._fernet.encrypt(plaintext.encode("utf-8"))
|
||||||
|
return ciphertext_bytes.decode("utf-8")
|
||||||
|
|
||||||
|
def decrypt(self, ciphertext: str) -> str:
|
||||||
|
"""Decrypt ciphertext string.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
ciphertext: The base64-encoded ciphertext to decrypt.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The decrypted plaintext string.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
InvalidToken: If ciphertext is invalid or corrupted.
|
||||||
|
TypeError: If ciphertext is not a string.
|
||||||
|
"""
|
||||||
|
if not isinstance(ciphertext, str):
|
||||||
|
raise TypeError("ciphertext must be a string")
|
||||||
|
|
||||||
|
# Fernet.decrypt expects bytes
|
||||||
|
plaintext_bytes = self._fernet.decrypt(ciphertext.encode("utf-8"))
|
||||||
|
return plaintext_bytes.decode("utf-8")
|
||||||
178
tests/unit/services/test_encryption.py
Normal file
178
tests/unit/services/test_encryption.py
Normal file
@@ -0,0 +1,178 @@
|
|||||||
|
"""Tests for EncryptionService - T12.
|
||||||
|
|
||||||
|
Tests for AES-256-GCM encryption service using cryptography.fernet.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from cryptography.fernet import InvalidToken
|
||||||
|
|
||||||
|
|
||||||
|
pytestmark = [pytest.mark.unit, pytest.mark.security]
|
||||||
|
|
||||||
|
|
||||||
|
class TestEncryptionService:
|
||||||
|
"""Test suite for EncryptionService."""
|
||||||
|
|
||||||
|
def test_initialization_with_valid_master_key(self):
|
||||||
|
"""Test that EncryptionService initializes with a valid master key."""
|
||||||
|
# Arrange & Act
|
||||||
|
from src.openrouter_monitor.services.encryption import EncryptionService
|
||||||
|
|
||||||
|
service = EncryptionService("test-encryption-key-32bytes-long")
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert service is not None
|
||||||
|
assert service._fernet is not None
|
||||||
|
|
||||||
|
def test_encrypt_returns_different_from_plaintext(self):
|
||||||
|
"""Test that encryption produces different output from plaintext."""
|
||||||
|
# Arrange
|
||||||
|
from src.openrouter_monitor.services.encryption import EncryptionService
|
||||||
|
|
||||||
|
service = EncryptionService("test-encryption-key-32bytes-long")
|
||||||
|
plaintext = "sensitive-api-key-12345"
|
||||||
|
|
||||||
|
# Act
|
||||||
|
encrypted = service.encrypt(plaintext)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert encrypted != plaintext
|
||||||
|
assert isinstance(encrypted, str)
|
||||||
|
assert len(encrypted) > 0
|
||||||
|
|
||||||
|
def test_encrypt_decrypt_roundtrip_returns_original(self):
|
||||||
|
"""Test that encrypt followed by decrypt returns original plaintext."""
|
||||||
|
# Arrange
|
||||||
|
from src.openrouter_monitor.services.encryption import EncryptionService
|
||||||
|
|
||||||
|
service = EncryptionService("test-encryption-key-32bytes-long")
|
||||||
|
plaintext = "my-secret-api-key-abc123"
|
||||||
|
|
||||||
|
# Act
|
||||||
|
encrypted = service.encrypt(plaintext)
|
||||||
|
decrypted = service.decrypt(encrypted)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert decrypted == plaintext
|
||||||
|
|
||||||
|
def test_encrypt_produces_different_ciphertext_each_time(self):
|
||||||
|
"""Test that encrypting same plaintext produces different ciphertexts."""
|
||||||
|
# Arrange
|
||||||
|
from src.openrouter_monitor.services.encryption import EncryptionService
|
||||||
|
|
||||||
|
service = EncryptionService("test-encryption-key-32bytes-long")
|
||||||
|
plaintext = "same-text-every-time"
|
||||||
|
|
||||||
|
# Act
|
||||||
|
encrypted1 = service.encrypt(plaintext)
|
||||||
|
encrypted2 = service.encrypt(plaintext)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert encrypted1 != encrypted2
|
||||||
|
|
||||||
|
def test_decrypt_with_wrong_key_raises_invalid_token(self):
|
||||||
|
"""Test that decrypting with wrong key raises InvalidToken."""
|
||||||
|
# Arrange
|
||||||
|
from src.openrouter_monitor.services.encryption import EncryptionService
|
||||||
|
|
||||||
|
service1 = EncryptionService("correct-key-32-chars-long!!!")
|
||||||
|
service2 = EncryptionService("wrong-key-32-chars-long!!!!!")
|
||||||
|
plaintext = "secret-data"
|
||||||
|
encrypted = service1.encrypt(plaintext)
|
||||||
|
|
||||||
|
# Act & Assert
|
||||||
|
with pytest.raises(InvalidToken):
|
||||||
|
service2.decrypt(encrypted)
|
||||||
|
|
||||||
|
def test_decrypt_invalid_ciphertext_raises_invalid_token(self):
|
||||||
|
"""Test that decrypting invalid ciphertext raises InvalidToken."""
|
||||||
|
# Arrange
|
||||||
|
from src.openrouter_monitor.services.encryption import EncryptionService
|
||||||
|
|
||||||
|
service = EncryptionService("test-encryption-key-32bytes-long")
|
||||||
|
|
||||||
|
# Act & Assert
|
||||||
|
with pytest.raises(InvalidToken):
|
||||||
|
service.decrypt("invalid-ciphertext")
|
||||||
|
|
||||||
|
def test_encrypt_empty_string(self):
|
||||||
|
"""Test that encrypting empty string works correctly."""
|
||||||
|
# Arrange
|
||||||
|
from src.openrouter_monitor.services.encryption import EncryptionService
|
||||||
|
|
||||||
|
service = EncryptionService("test-encryption-key-32bytes-long")
|
||||||
|
plaintext = ""
|
||||||
|
|
||||||
|
# Act
|
||||||
|
encrypted = service.encrypt(plaintext)
|
||||||
|
decrypted = service.decrypt(encrypted)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert decrypted == plaintext
|
||||||
|
|
||||||
|
def test_encrypt_unicode_characters(self):
|
||||||
|
"""Test that encrypting unicode characters works correctly."""
|
||||||
|
# Arrange
|
||||||
|
from src.openrouter_monitor.services.encryption import EncryptionService
|
||||||
|
|
||||||
|
service = EncryptionService("test-encryption-key-32bytes-long")
|
||||||
|
plaintext = "🔑 API Key: 日本語-test-ñ"
|
||||||
|
|
||||||
|
# Act
|
||||||
|
encrypted = service.encrypt(plaintext)
|
||||||
|
decrypted = service.decrypt(encrypted)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert decrypted == plaintext
|
||||||
|
|
||||||
|
def test_encrypt_special_characters(self):
|
||||||
|
"""Test that encrypting special characters works correctly."""
|
||||||
|
# Arrange
|
||||||
|
from src.openrouter_monitor.services.encryption import EncryptionService
|
||||||
|
|
||||||
|
service = EncryptionService("test-encryption-key-32bytes-long")
|
||||||
|
plaintext = "!@#$%^&*()_+-=[]{}|;':\",./<>?"
|
||||||
|
|
||||||
|
# Act
|
||||||
|
encrypted = service.encrypt(plaintext)
|
||||||
|
decrypted = service.decrypt(encrypted)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert decrypted == plaintext
|
||||||
|
|
||||||
|
def test_encrypt_long_text(self):
|
||||||
|
"""Test that encrypting long text works correctly."""
|
||||||
|
# Arrange
|
||||||
|
from src.openrouter_monitor.services.encryption import EncryptionService
|
||||||
|
|
||||||
|
service = EncryptionService("test-encryption-key-32bytes-long")
|
||||||
|
plaintext = "a" * 10000
|
||||||
|
|
||||||
|
# Act
|
||||||
|
encrypted = service.encrypt(plaintext)
|
||||||
|
decrypted = service.decrypt(encrypted)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert decrypted == plaintext
|
||||||
|
|
||||||
|
def test_encrypt_non_string_raises_type_error(self):
|
||||||
|
"""Test that encrypting non-string raises TypeError."""
|
||||||
|
# Arrange
|
||||||
|
from src.openrouter_monitor.services.encryption import EncryptionService
|
||||||
|
|
||||||
|
service = EncryptionService("test-encryption-key-32bytes-long")
|
||||||
|
|
||||||
|
# Act & Assert
|
||||||
|
with pytest.raises(TypeError, match="plaintext must be a string"):
|
||||||
|
service.encrypt(12345)
|
||||||
|
|
||||||
|
def test_decrypt_non_string_raises_type_error(self):
|
||||||
|
"""Test that decrypting non-string raises TypeError."""
|
||||||
|
# Arrange
|
||||||
|
from src.openrouter_monitor.services.encryption import EncryptionService
|
||||||
|
|
||||||
|
service = EncryptionService("test-encryption-key-32bytes-long")
|
||||||
|
|
||||||
|
# Act & Assert
|
||||||
|
with pytest.raises(TypeError, match="ciphertext must be a string"):
|
||||||
|
service.decrypt(12345)
|
||||||
Reference in New Issue
Block a user