diff --git a/export/progress.md b/export/progress.md index f22dacb..8fd9947 100644 --- a/export/progress.md +++ b/export/progress.md @@ -52,10 +52,10 @@ - [x] T10: Creare model ApiToken (SQLAlchemy) - ✅ Completato (2026-04-07 11:15) - [x] T11: Setup Alembic e creare migrazione iniziale - ✅ Completato (2026-04-07 11:20) -### 🔐 Servizi di Sicurezza (T12-T16) - 1/5 completati +### 🔐 Servizi di Sicurezza (T12-T16) - 2/5 completati - [x] T12: Implementare EncryptionService (AES-256) - ✅ Completato (2026-04-07 12:00, commit: 2fdd9d1) -- [ ] T13: Implementare password hashing (bcrypt) - 🟡 In progress -- [ ] T14: Implementare JWT utilities +- [x] T13: Implementare password hashing (bcrypt) - ✅ Completato (2026-04-07 12:15, commit: 54e8116) +- [ ] T14: Implementare JWT utilities - 🟡 In progress - [ ] T15: Implementare API token generation - [ ] T16: Scrivere test per servizi di encryption diff --git a/src/openrouter_monitor/services/jwt.py b/src/openrouter_monitor/services/jwt.py new file mode 100644 index 0000000..a42951d --- /dev/null +++ b/src/openrouter_monitor/services/jwt.py @@ -0,0 +1,129 @@ +"""JWT utilities for authentication. + +This module provides functions for creating, decoding, and verifying +JWT tokens using the HS256 algorithm. +""" + +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from typing import Optional + +from jose import JWTError, jwt + + +# Default algorithm +ALGORITHM = "HS256" +DEFAULT_EXPIRE_HOURS = 24 + + +@dataclass +class TokenData: + """Data extracted from a verified JWT token.""" + + user_id: str + exp: datetime + iat: datetime + + +def create_access_token( + data: dict, + expires_delta: Optional[timedelta] = None, + secret_key: str = None, +) -> str: + """Create a JWT access token. + + Args: + data: Dictionary containing claims to encode (e.g., {"sub": user_id}). + expires_delta: Optional custom expiration time. Defaults to 24 hours. + secret_key: Secret key for signing. If None, uses config.SECRET_KEY. + + Returns: + The encoded JWT token string. + + Raises: + ValueError: If secret_key is not provided and config.SECRET_KEY is not set. + """ + # Import config here to avoid circular imports + if secret_key is None: + from openrouter_monitor.config import get_settings + + settings = get_settings() + secret_key = settings.secret_key + + to_encode = data.copy() + + # Calculate expiration time + now = datetime.now(timezone.utc) + if expires_delta: + expire = now + expires_delta + else: + expire = now + timedelta(hours=DEFAULT_EXPIRE_HOURS) + + # Add standard claims + to_encode.update({ + "exp": expire, + "iat": now, + }) + + # Encode token + encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=ALGORITHM) + return encoded_jwt + + +def decode_access_token(token: str, secret_key: str = None) -> dict: + """Decode and validate a JWT token. + + Args: + token: The JWT token string to decode. + secret_key: Secret key for verification. If None, uses config.SECRET_KEY. + + Returns: + Dictionary containing the decoded payload. + + Raises: + JWTError: If token is invalid, expired, or signature verification fails. + ValueError: If secret_key is not provided and config.SECRET_KEY is not set. + """ + # Import config here to avoid circular imports + if secret_key is None: + from openrouter_monitor.config import get_settings + + settings = get_settings() + secret_key = settings.secret_key + + payload = jwt.decode(token, secret_key, algorithms=[ALGORITHM]) + return payload + + +def verify_token(token: str, secret_key: str = None) -> TokenData: + """Verify a JWT token and extract user data. + + Args: + token: The JWT token string to verify. + secret_key: Secret key for verification. If None, uses config.SECRET_KEY. + + Returns: + TokenData object containing user_id, exp, and iat. + + Raises: + JWTError: If token is invalid, expired, or missing required claims. + ValueError: If secret_key is not provided and config.SECRET_KEY is not set. + """ + payload = decode_access_token(token, secret_key=secret_key) + + # Extract required claims + user_id = payload.get("sub") + if user_id is None: + raise JWTError("Token missing 'sub' claim") + + exp_timestamp = payload.get("exp") + iat_timestamp = payload.get("iat") + + if exp_timestamp is None or iat_timestamp is None: + raise JWTError("Token missing exp or iat claim") + + # Convert timestamps to datetime + exp = datetime.fromtimestamp(exp_timestamp, tz=timezone.utc) + iat = datetime.fromtimestamp(iat_timestamp, tz=timezone.utc) + + return TokenData(user_id=user_id, exp=exp, iat=iat) diff --git a/tests/unit/services/test_jwt.py b/tests/unit/services/test_jwt.py new file mode 100644 index 0000000..b1131d1 --- /dev/null +++ b/tests/unit/services/test_jwt.py @@ -0,0 +1,315 @@ +"""Tests for JWT utilities - T14. + +Tests for JWT token creation, decoding, and verification. +""" + +from datetime import datetime, timedelta, timezone + +import pytest +from jose import JWTError, jwt + + +pytestmark = [pytest.mark.unit, pytest.mark.security] + + +class TestJWTCreateAccessToken: + """Test suite for create_access_token function.""" + + def test_create_access_token_returns_string(self, jwt_secret): + """Test that create_access_token returns a string token.""" + # Arrange + from src.openrouter_monitor.services.jwt import create_access_token + + data = {"sub": "user123"} + + # Act + token = create_access_token(data, secret_key=jwt_secret) + + # Assert + assert isinstance(token, str) + assert len(token) > 0 + + def test_create_access_token_contains_payload(self, jwt_secret): + """Test that token contains the original payload data.""" + # Arrange + from src.openrouter_monitor.services.jwt import create_access_token, decode_access_token + + data = {"sub": "user123", "email": "test@example.com"} + + # Act + token = create_access_token(data, secret_key=jwt_secret) + decoded = decode_access_token(token, secret_key=jwt_secret) + + # Assert + assert decoded["sub"] == "user123" + assert decoded["email"] == "test@example.com" + + def test_create_access_token_includes_exp(self, jwt_secret): + """Test that token includes expiration claim.""" + # Arrange + from src.openrouter_monitor.services.jwt import create_access_token, decode_access_token + + data = {"sub": "user123"} + + # Act + token = create_access_token(data, secret_key=jwt_secret) + decoded = decode_access_token(token, secret_key=jwt_secret) + + # Assert + assert "exp" in decoded + assert isinstance(decoded["exp"], int) + + def test_create_access_token_includes_iat(self, jwt_secret): + """Test that token includes issued-at claim.""" + # Arrange + from src.openrouter_monitor.services.jwt import create_access_token, decode_access_token + + data = {"sub": "user123"} + + # Act + token = create_access_token(data, secret_key=jwt_secret) + decoded = decode_access_token(token, secret_key=jwt_secret) + + # Assert + assert "iat" in decoded + assert isinstance(decoded["iat"], int) + + def test_create_access_token_with_custom_expiration(self, jwt_secret): + """Test token creation with custom expiration delta.""" + # Arrange + from src.openrouter_monitor.services.jwt import create_access_token, decode_access_token + + data = {"sub": "user123"} + expires_delta = timedelta(hours=1) + + # Act + token = create_access_token(data, expires_delta=expires_delta, secret_key=jwt_secret) + decoded = decode_access_token(token, secret_key=jwt_secret) + + # Assert + exp_timestamp = decoded["exp"] + iat_timestamp = decoded["iat"] + exp_duration = exp_timestamp - iat_timestamp + assert exp_duration == 3600 # 1 hour in seconds + + def test_create_access_token_default_expiration(self, jwt_secret): + """Test token creation with default expiration (24 hours).""" + # Arrange + from src.openrouter_monitor.services.jwt import create_access_token, decode_access_token + + data = {"sub": "user123"} + + # Act + token = create_access_token(data, secret_key=jwt_secret) + decoded = decode_access_token(token, secret_key=jwt_secret) + + # Assert + exp_timestamp = decoded["exp"] + iat_timestamp = decoded["iat"] + exp_duration = exp_timestamp - iat_timestamp + assert exp_duration == 86400 # 24 hours in seconds + + +class TestJWTDecodeAccessToken: + """Test suite for decode_access_token function.""" + + def test_decode_valid_token_returns_payload(self, jwt_secret): + """Test decoding a valid token returns the payload.""" + # Arrange + from src.openrouter_monitor.services.jwt import create_access_token, decode_access_token + + data = {"sub": "user123", "role": "admin"} + token = create_access_token(data, secret_key=jwt_secret) + + # Act + decoded = decode_access_token(token, secret_key=jwt_secret) + + # Assert + assert decoded["sub"] == "user123" + assert decoded["role"] == "admin" + + def test_decode_expired_token_raises_error(self, jwt_secret): + """Test decoding an expired token raises JWTError.""" + # Arrange + from src.openrouter_monitor.services.jwt import create_access_token, decode_access_token + + data = {"sub": "user123"} + # Create token that expired 1 hour ago + expires_delta = timedelta(hours=-1) + token = create_access_token(data, expires_delta=expires_delta, secret_key=jwt_secret) + + # Act & Assert + with pytest.raises(JWTError): + decode_access_token(token, secret_key=jwt_secret) + + def test_decode_invalid_signature_raises_error(self, jwt_secret): + """Test decoding token with wrong secret raises JWTError.""" + # Arrange + from src.openrouter_monitor.services.jwt import create_access_token, decode_access_token + + data = {"sub": "user123"} + token = create_access_token(data, secret_key=jwt_secret) + + # Act & Assert + with pytest.raises(JWTError): + decode_access_token(token, secret_key="wrong-secret-key-32-chars-long!") + + def test_decode_malformed_token_raises_error(self, jwt_secret): + """Test decoding malformed token raises JWTError.""" + # Arrange + from src.openrouter_monitor.services.jwt import decode_access_token + + # Act & Assert + with pytest.raises(JWTError): + decode_access_token("invalid-token", secret_key=jwt_secret) + + +class TestJWTVerifyToken: + """Test suite for verify_token function.""" + + def test_verify_valid_token_returns_token_data(self, jwt_secret): + """Test verifying valid token returns TokenData.""" + # Arrange + from src.openrouter_monitor.services.jwt import create_access_token, verify_token + + data = {"sub": "user123"} + token = create_access_token(data, secret_key=jwt_secret) + + # Act + token_data = verify_token(token, secret_key=jwt_secret) + + # Assert + assert token_data.user_id == "user123" + assert token_data.exp is not None + + def test_verify_token_without_sub_raises_error(self, jwt_secret): + """Test verifying token without sub claim raises error.""" + # Arrange + from src.openrouter_monitor.services.jwt import create_access_token, verify_token + + data = {"email": "test@example.com"} # No sub + token = create_access_token(data, secret_key=jwt_secret) + + # Act & Assert + with pytest.raises(JWTError): + verify_token(token, secret_key=jwt_secret) + + def test_verify_expired_token_raises_error(self, jwt_secret): + """Test verifying expired token raises JWTError.""" + # Arrange + from src.openrouter_monitor.services.jwt import create_access_token, verify_token + + data = {"sub": "user123"} + expires_delta = timedelta(hours=-1) + token = create_access_token(data, expires_delta=expires_delta, secret_key=jwt_secret) + + # Act & Assert + with pytest.raises(JWTError): + verify_token(token, secret_key=jwt_secret) + + +class TestJWTAlgorithm: + """Test suite for JWT algorithm configuration.""" + + def test_token_uses_hs256_algorithm(self, jwt_secret): + """Test that token uses HS256 algorithm.""" + # Arrange + from src.openrouter_monitor.services.jwt import create_access_token + + data = {"sub": "user123"} + + # Act + token = create_access_token(data, secret_key=jwt_secret) + + # Decode without verification to check header + header = jwt.get_unverified_header(token) + + # Assert + assert header["alg"] == "HS256" + + +class TestJWTWithConfig: + """Test suite for JWT functions using config settings.""" + + def test_create_access_token_uses_config_secret(self): + """Test that create_access_token uses SECRET_KEY from config when not provided.""" + # Arrange + from src.openrouter_monitor.services.jwt import create_access_token, decode_access_token + + data = {"sub": "user123"} + + # Act - Don't pass secret_key, should use config + token = create_access_token(data) + decoded = decode_access_token(token) + + # Assert + assert decoded["sub"] == "user123" + + def test_decode_access_token_uses_config_secret(self): + """Test that decode_access_token uses SECRET_KEY from config when not provided.""" + # Arrange + from src.openrouter_monitor.services.jwt import create_access_token, decode_access_token + + data = {"sub": "user123"} + token = create_access_token(data) # Uses config + + # Act - Don't pass secret_key, should use config + decoded = decode_access_token(token) + + # Assert + assert decoded["sub"] == "user123" + + def test_verify_token_uses_config_secret(self): + """Test that verify_token uses SECRET_KEY from config when not provided.""" + # Arrange + from src.openrouter_monitor.services.jwt import create_access_token, verify_token + + data = {"sub": "user123"} + token = create_access_token(data) # Uses config + + # Act - Don't pass secret_key, should use config + token_data = verify_token(token) + + # Assert + assert token_data.user_id == "user123" + + +class TestJWTEdgeCases: + """Test suite for JWT edge cases.""" + + def test_verify_token_without_exp_raises_error(self, jwt_secret): + """Test verifying token without exp claim raises error.""" + # Arrange - Create token manually without exp + from jose import jwt as jose_jwt + + payload = {"sub": "user123", "iat": datetime.now(timezone.utc).timestamp()} + token = jose_jwt.encode(payload, jwt_secret, algorithm="HS256") + + # Act & Assert + from src.openrouter_monitor.services.jwt import verify_token + + with pytest.raises(Exception): # JWTError or similar + verify_token(token, secret_key=jwt_secret) + + def test_verify_token_without_iat_raises_error(self, jwt_secret): + """Test verifying token without iat claim raises error.""" + # Arrange - Create token manually without iat + from jose import jwt as jose_jwt + from datetime import timedelta + + now = datetime.now(timezone.utc) + payload = {"sub": "user123", "exp": (now + timedelta(hours=1)).timestamp()} + token = jose_jwt.encode(payload, jwt_secret, algorithm="HS256") + + # Act & Assert + from src.openrouter_monitor.services.jwt import verify_token + + with pytest.raises(Exception): # JWTError or similar + verify_token(token, secret_key=jwt_secret) + + +# Fixtures +@pytest.fixture +def jwt_secret(): + """Provide a test JWT secret key.""" + return "test-jwt-secret-key-32-chars-long!"