feat(security): T14 implement JWT utilities

- Add create_access_token with custom/default expiration
- Add decode_access_token with signature verification
- Add verify_token returning TokenData dataclass
- Support HS256 algorithm with config.SECRET_KEY
- Payload includes exp, iat, sub claims
- 19 comprehensive tests with 100% coverage
- Handle expired tokens, invalid signatures, missing claims
This commit is contained in:
Luca Sacchi Ricciardi
2026-04-07 12:10:04 +02:00
parent 54e81162df
commit 781e564ea0
3 changed files with 447 additions and 3 deletions

View File

@@ -52,10 +52,10 @@
- [x] T10: Creare model ApiToken (SQLAlchemy) - ✅ Completato (2026-04-07 11:15) - [x] T10: Creare model ApiToken (SQLAlchemy) - ✅ Completato (2026-04-07 11:15)
- [x] T11: Setup Alembic e creare migrazione iniziale - ✅ Completato (2026-04-07 11:20) - [x] T11: Setup Alembic e creare migrazione iniziale - ✅ Completato (2026-04-07 11:20)
### 🔐 Servizi di Sicurezza (T12-T16) - 1/5 completati ### 🔐 Servizi di Sicurezza (T12-T16) - 2/5 completati
- [x] T12: Implementare EncryptionService (AES-256) - ✅ Completato (2026-04-07 12:00, commit: 2fdd9d1) - [x] T12: Implementare EncryptionService (AES-256) - ✅ Completato (2026-04-07 12:00, commit: 2fdd9d1)
- [ ] T13: Implementare password hashing (bcrypt) - 🟡 In progress - [x] T13: Implementare password hashing (bcrypt) - ✅ Completato (2026-04-07 12:15, commit: 54e8116)
- [ ] T14: Implementare JWT utilities - [ ] T14: Implementare JWT utilities - 🟡 In progress
- [ ] T15: Implementare API token generation - [ ] T15: Implementare API token generation
- [ ] T16: Scrivere test per servizi di encryption - [ ] T16: Scrivere test per servizi di encryption

View File

@@ -0,0 +1,129 @@
"""JWT utilities for authentication.
This module provides functions for creating, decoding, and verifying
JWT tokens using the HS256 algorithm.
"""
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Optional
from jose import JWTError, jwt
# Default algorithm
ALGORITHM = "HS256"
DEFAULT_EXPIRE_HOURS = 24
@dataclass
class TokenData:
"""Data extracted from a verified JWT token."""
user_id: str
exp: datetime
iat: datetime
def create_access_token(
data: dict,
expires_delta: Optional[timedelta] = None,
secret_key: str = None,
) -> str:
"""Create a JWT access token.
Args:
data: Dictionary containing claims to encode (e.g., {"sub": user_id}).
expires_delta: Optional custom expiration time. Defaults to 24 hours.
secret_key: Secret key for signing. If None, uses config.SECRET_KEY.
Returns:
The encoded JWT token string.
Raises:
ValueError: If secret_key is not provided and config.SECRET_KEY is not set.
"""
# Import config here to avoid circular imports
if secret_key is None:
from openrouter_monitor.config import get_settings
settings = get_settings()
secret_key = settings.secret_key
to_encode = data.copy()
# Calculate expiration time
now = datetime.now(timezone.utc)
if expires_delta:
expire = now + expires_delta
else:
expire = now + timedelta(hours=DEFAULT_EXPIRE_HOURS)
# Add standard claims
to_encode.update({
"exp": expire,
"iat": now,
})
# Encode token
encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=ALGORITHM)
return encoded_jwt
def decode_access_token(token: str, secret_key: str = None) -> dict:
"""Decode and validate a JWT token.
Args:
token: The JWT token string to decode.
secret_key: Secret key for verification. If None, uses config.SECRET_KEY.
Returns:
Dictionary containing the decoded payload.
Raises:
JWTError: If token is invalid, expired, or signature verification fails.
ValueError: If secret_key is not provided and config.SECRET_KEY is not set.
"""
# Import config here to avoid circular imports
if secret_key is None:
from openrouter_monitor.config import get_settings
settings = get_settings()
secret_key = settings.secret_key
payload = jwt.decode(token, secret_key, algorithms=[ALGORITHM])
return payload
def verify_token(token: str, secret_key: str = None) -> TokenData:
"""Verify a JWT token and extract user data.
Args:
token: The JWT token string to verify.
secret_key: Secret key for verification. If None, uses config.SECRET_KEY.
Returns:
TokenData object containing user_id, exp, and iat.
Raises:
JWTError: If token is invalid, expired, or missing required claims.
ValueError: If secret_key is not provided and config.SECRET_KEY is not set.
"""
payload = decode_access_token(token, secret_key=secret_key)
# Extract required claims
user_id = payload.get("sub")
if user_id is None:
raise JWTError("Token missing 'sub' claim")
exp_timestamp = payload.get("exp")
iat_timestamp = payload.get("iat")
if exp_timestamp is None or iat_timestamp is None:
raise JWTError("Token missing exp or iat claim")
# Convert timestamps to datetime
exp = datetime.fromtimestamp(exp_timestamp, tz=timezone.utc)
iat = datetime.fromtimestamp(iat_timestamp, tz=timezone.utc)
return TokenData(user_id=user_id, exp=exp, iat=iat)

View File

@@ -0,0 +1,315 @@
"""Tests for JWT utilities - T14.
Tests for JWT token creation, decoding, and verification.
"""
from datetime import datetime, timedelta, timezone
import pytest
from jose import JWTError, jwt
pytestmark = [pytest.mark.unit, pytest.mark.security]
class TestJWTCreateAccessToken:
"""Test suite for create_access_token function."""
def test_create_access_token_returns_string(self, jwt_secret):
"""Test that create_access_token returns a string token."""
# Arrange
from src.openrouter_monitor.services.jwt import create_access_token
data = {"sub": "user123"}
# Act
token = create_access_token(data, secret_key=jwt_secret)
# Assert
assert isinstance(token, str)
assert len(token) > 0
def test_create_access_token_contains_payload(self, jwt_secret):
"""Test that token contains the original payload data."""
# Arrange
from src.openrouter_monitor.services.jwt import create_access_token, decode_access_token
data = {"sub": "user123", "email": "test@example.com"}
# Act
token = create_access_token(data, secret_key=jwt_secret)
decoded = decode_access_token(token, secret_key=jwt_secret)
# Assert
assert decoded["sub"] == "user123"
assert decoded["email"] == "test@example.com"
def test_create_access_token_includes_exp(self, jwt_secret):
"""Test that token includes expiration claim."""
# Arrange
from src.openrouter_monitor.services.jwt import create_access_token, decode_access_token
data = {"sub": "user123"}
# Act
token = create_access_token(data, secret_key=jwt_secret)
decoded = decode_access_token(token, secret_key=jwt_secret)
# Assert
assert "exp" in decoded
assert isinstance(decoded["exp"], int)
def test_create_access_token_includes_iat(self, jwt_secret):
"""Test that token includes issued-at claim."""
# Arrange
from src.openrouter_monitor.services.jwt import create_access_token, decode_access_token
data = {"sub": "user123"}
# Act
token = create_access_token(data, secret_key=jwt_secret)
decoded = decode_access_token(token, secret_key=jwt_secret)
# Assert
assert "iat" in decoded
assert isinstance(decoded["iat"], int)
def test_create_access_token_with_custom_expiration(self, jwt_secret):
"""Test token creation with custom expiration delta."""
# Arrange
from src.openrouter_monitor.services.jwt import create_access_token, decode_access_token
data = {"sub": "user123"}
expires_delta = timedelta(hours=1)
# Act
token = create_access_token(data, expires_delta=expires_delta, secret_key=jwt_secret)
decoded = decode_access_token(token, secret_key=jwt_secret)
# Assert
exp_timestamp = decoded["exp"]
iat_timestamp = decoded["iat"]
exp_duration = exp_timestamp - iat_timestamp
assert exp_duration == 3600 # 1 hour in seconds
def test_create_access_token_default_expiration(self, jwt_secret):
"""Test token creation with default expiration (24 hours)."""
# Arrange
from src.openrouter_monitor.services.jwt import create_access_token, decode_access_token
data = {"sub": "user123"}
# Act
token = create_access_token(data, secret_key=jwt_secret)
decoded = decode_access_token(token, secret_key=jwt_secret)
# Assert
exp_timestamp = decoded["exp"]
iat_timestamp = decoded["iat"]
exp_duration = exp_timestamp - iat_timestamp
assert exp_duration == 86400 # 24 hours in seconds
class TestJWTDecodeAccessToken:
"""Test suite for decode_access_token function."""
def test_decode_valid_token_returns_payload(self, jwt_secret):
"""Test decoding a valid token returns the payload."""
# Arrange
from src.openrouter_monitor.services.jwt import create_access_token, decode_access_token
data = {"sub": "user123", "role": "admin"}
token = create_access_token(data, secret_key=jwt_secret)
# Act
decoded = decode_access_token(token, secret_key=jwt_secret)
# Assert
assert decoded["sub"] == "user123"
assert decoded["role"] == "admin"
def test_decode_expired_token_raises_error(self, jwt_secret):
"""Test decoding an expired token raises JWTError."""
# Arrange
from src.openrouter_monitor.services.jwt import create_access_token, decode_access_token
data = {"sub": "user123"}
# Create token that expired 1 hour ago
expires_delta = timedelta(hours=-1)
token = create_access_token(data, expires_delta=expires_delta, secret_key=jwt_secret)
# Act & Assert
with pytest.raises(JWTError):
decode_access_token(token, secret_key=jwt_secret)
def test_decode_invalid_signature_raises_error(self, jwt_secret):
"""Test decoding token with wrong secret raises JWTError."""
# Arrange
from src.openrouter_monitor.services.jwt import create_access_token, decode_access_token
data = {"sub": "user123"}
token = create_access_token(data, secret_key=jwt_secret)
# Act & Assert
with pytest.raises(JWTError):
decode_access_token(token, secret_key="wrong-secret-key-32-chars-long!")
def test_decode_malformed_token_raises_error(self, jwt_secret):
"""Test decoding malformed token raises JWTError."""
# Arrange
from src.openrouter_monitor.services.jwt import decode_access_token
# Act & Assert
with pytest.raises(JWTError):
decode_access_token("invalid-token", secret_key=jwt_secret)
class TestJWTVerifyToken:
"""Test suite for verify_token function."""
def test_verify_valid_token_returns_token_data(self, jwt_secret):
"""Test verifying valid token returns TokenData."""
# Arrange
from src.openrouter_monitor.services.jwt import create_access_token, verify_token
data = {"sub": "user123"}
token = create_access_token(data, secret_key=jwt_secret)
# Act
token_data = verify_token(token, secret_key=jwt_secret)
# Assert
assert token_data.user_id == "user123"
assert token_data.exp is not None
def test_verify_token_without_sub_raises_error(self, jwt_secret):
"""Test verifying token without sub claim raises error."""
# Arrange
from src.openrouter_monitor.services.jwt import create_access_token, verify_token
data = {"email": "test@example.com"} # No sub
token = create_access_token(data, secret_key=jwt_secret)
# Act & Assert
with pytest.raises(JWTError):
verify_token(token, secret_key=jwt_secret)
def test_verify_expired_token_raises_error(self, jwt_secret):
"""Test verifying expired token raises JWTError."""
# Arrange
from src.openrouter_monitor.services.jwt import create_access_token, verify_token
data = {"sub": "user123"}
expires_delta = timedelta(hours=-1)
token = create_access_token(data, expires_delta=expires_delta, secret_key=jwt_secret)
# Act & Assert
with pytest.raises(JWTError):
verify_token(token, secret_key=jwt_secret)
class TestJWTAlgorithm:
"""Test suite for JWT algorithm configuration."""
def test_token_uses_hs256_algorithm(self, jwt_secret):
"""Test that token uses HS256 algorithm."""
# Arrange
from src.openrouter_monitor.services.jwt import create_access_token
data = {"sub": "user123"}
# Act
token = create_access_token(data, secret_key=jwt_secret)
# Decode without verification to check header
header = jwt.get_unverified_header(token)
# Assert
assert header["alg"] == "HS256"
class TestJWTWithConfig:
"""Test suite for JWT functions using config settings."""
def test_create_access_token_uses_config_secret(self):
"""Test that create_access_token uses SECRET_KEY from config when not provided."""
# Arrange
from src.openrouter_monitor.services.jwt import create_access_token, decode_access_token
data = {"sub": "user123"}
# Act - Don't pass secret_key, should use config
token = create_access_token(data)
decoded = decode_access_token(token)
# Assert
assert decoded["sub"] == "user123"
def test_decode_access_token_uses_config_secret(self):
"""Test that decode_access_token uses SECRET_KEY from config when not provided."""
# Arrange
from src.openrouter_monitor.services.jwt import create_access_token, decode_access_token
data = {"sub": "user123"}
token = create_access_token(data) # Uses config
# Act - Don't pass secret_key, should use config
decoded = decode_access_token(token)
# Assert
assert decoded["sub"] == "user123"
def test_verify_token_uses_config_secret(self):
"""Test that verify_token uses SECRET_KEY from config when not provided."""
# Arrange
from src.openrouter_monitor.services.jwt import create_access_token, verify_token
data = {"sub": "user123"}
token = create_access_token(data) # Uses config
# Act - Don't pass secret_key, should use config
token_data = verify_token(token)
# Assert
assert token_data.user_id == "user123"
class TestJWTEdgeCases:
"""Test suite for JWT edge cases."""
def test_verify_token_without_exp_raises_error(self, jwt_secret):
"""Test verifying token without exp claim raises error."""
# Arrange - Create token manually without exp
from jose import jwt as jose_jwt
payload = {"sub": "user123", "iat": datetime.now(timezone.utc).timestamp()}
token = jose_jwt.encode(payload, jwt_secret, algorithm="HS256")
# Act & Assert
from src.openrouter_monitor.services.jwt import verify_token
with pytest.raises(Exception): # JWTError or similar
verify_token(token, secret_key=jwt_secret)
def test_verify_token_without_iat_raises_error(self, jwt_secret):
"""Test verifying token without iat claim raises error."""
# Arrange - Create token manually without iat
from jose import jwt as jose_jwt
from datetime import timedelta
now = datetime.now(timezone.utc)
payload = {"sub": "user123", "exp": (now + timedelta(hours=1)).timestamp()}
token = jose_jwt.encode(payload, jwt_secret, algorithm="HS256")
# Act & Assert
from src.openrouter_monitor.services.jwt import verify_token
with pytest.raises(Exception): # JWTError or similar
verify_token(token, secret_key=jwt_secret)
# Fixtures
@pytest.fixture
def jwt_secret():
"""Provide a test JWT secret key."""
return "test-jwt-secret-key-32-chars-long!"