diff --git a/export/progress.md b/export/progress.md index ca73a3c..bdf8b02 100644 --- a/export/progress.md +++ b/export/progress.md @@ -75,9 +75,9 @@ **Test totali auth:** 34 test (19 schemas + 15 router) **Coverage auth:** 98%+ -### 🔑 Gestione API Keys (T23-T29) - 0/7 completati -- [ ] T23: Creare Pydantic schemas per API keys - 🟡 In progress (2026-04-07 16:00) -- [ ] T24: Implementare POST /api/keys (create) +### 🔑 Gestione API Keys (T23-T29) - 1/7 completati +- [x] T23: Creare Pydantic schemas per API keys - ✅ Completato (2026-04-07 16:00, commit: 2e4c1bb) +- [ ] T24: Implementare POST /api/keys (create) - 🟡 In progress (2026-04-07 16:05) - [ ] T25: Implementare GET /api/keys (list) - [ ] T26: Implementare PUT /api/keys/{id} (update) - [ ] T27: Implementare DELETE /api/keys/{id} diff --git a/src/openrouter_monitor/main.py b/src/openrouter_monitor/main.py index 46737bf..ebfa077 100644 --- a/src/openrouter_monitor/main.py +++ b/src/openrouter_monitor/main.py @@ -6,6 +6,7 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from openrouter_monitor.config import get_settings +from openrouter_monitor.routers import api_keys from openrouter_monitor.routers import auth settings = get_settings() @@ -29,6 +30,7 @@ app.add_middleware( # Include routers app.include_router(auth.router, prefix="/api/auth", tags=["authentication"]) +app.include_router(api_keys.router, prefix="/api/keys", tags=["api-keys"]) @app.get("/") diff --git a/src/openrouter_monitor/routers/__init__.py b/src/openrouter_monitor/routers/__init__.py index da8c8e8..022a6b6 100644 --- a/src/openrouter_monitor/routers/__init__.py +++ b/src/openrouter_monitor/routers/__init__.py @@ -1,4 +1,5 @@ """Routers package for OpenRouter Monitor.""" +from openrouter_monitor.routers import api_keys from openrouter_monitor.routers import auth -__all__ = ["auth"] +__all__ = ["auth", "api_keys"] diff --git a/src/openrouter_monitor/routers/api_keys.py b/src/openrouter_monitor/routers/api_keys.py new file mode 100644 index 0000000..de48272 --- /dev/null +++ b/src/openrouter_monitor/routers/api_keys.py @@ -0,0 +1,217 @@ +"""API Keys router. + +T24-T27: Endpoints for API key management (CRUD operations). +""" +from fastapi import APIRouter, Depends, HTTPException, status, Query +from sqlalchemy.orm import Session +from sqlalchemy import desc +from typing import Optional + +from openrouter_monitor.config import get_settings +from openrouter_monitor.database import get_db +from openrouter_monitor.dependencies import get_current_user +from openrouter_monitor.models import ApiKey, User +from openrouter_monitor.schemas import ( + ApiKeyCreate, + ApiKeyUpdate, + ApiKeyResponse, + ApiKeyListResponse, +) +from openrouter_monitor.services.encryption import EncryptionService + +router = APIRouter() +settings = get_settings() + +# Maximum number of API keys per user +MAX_API_KEYS_PER_USER = settings.max_api_keys_per_user + +# Initialize encryption service +encryption_service = EncryptionService(settings.encryption_key) + + +@router.post("", response_model=ApiKeyResponse, status_code=status.HTTP_201_CREATED) +async def create_api_key( + api_key_data: ApiKeyCreate, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """Create a new API key for the current user. + + The API key is encrypted using AES-256 before storage. + + Args: + api_key_data: API key creation data (name and key value) + db: Database session + current_user: Currently authenticated user + + Returns: + ApiKeyResponse with the created key details (excluding the key value) + + Raises: + HTTPException: 400 if user has reached MAX_API_KEYS_PER_USER limit + HTTPException: 422 if API key format is invalid (validation handled by Pydantic) + """ + # Check if user has reached the limit + existing_keys_count = db.query(ApiKey).filter( + ApiKey.user_id == current_user.id + ).count() + + if existing_keys_count >= MAX_API_KEYS_PER_USER: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Maximum number of API keys ({MAX_API_KEYS_PER_USER}) reached. " + "Please delete an existing key before creating a new one." + ) + + # Encrypt the API key before storing + encrypted_key = encryption_service.encrypt(api_key_data.key) + + # Create new API key + new_api_key = ApiKey( + user_id=current_user.id, + name=api_key_data.name, + key_encrypted=encrypted_key, + is_active=True + ) + + db.add(new_api_key) + db.commit() + db.refresh(new_api_key) + + return new_api_key + + +@router.get("", response_model=ApiKeyListResponse) +async def list_api_keys( + skip: int = Query(0, ge=0, description="Number of records to skip"), + limit: int = Query(10, ge=1, le=100, description="Maximum number of records to return"), + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """List all API keys for the current user. + + Results are paginated and sorted by creation date (newest first). + + Args: + skip: Number of records to skip for pagination + limit: Maximum number of records to return + db: Database session + current_user: Currently authenticated user + + Returns: + ApiKeyListResponse with items list and total count + """ + # Get total count for pagination + total = db.query(ApiKey).filter( + ApiKey.user_id == current_user.id + ).count() + + # Get paginated keys, sorted by created_at DESC + api_keys = db.query(ApiKey).filter( + ApiKey.user_id == current_user.id + ).order_by( + desc(ApiKey.created_at) + ).offset(skip).limit(limit).all() + + return ApiKeyListResponse(items=api_keys, total=total) + + +@router.put("/{api_key_id}", response_model=ApiKeyResponse) +async def update_api_key( + api_key_id: int, + api_key_data: ApiKeyUpdate, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """Update an existing API key. + + Only the name and is_active fields can be updated. + Users can only update their own API keys. + + Args: + api_key_id: ID of the API key to update + api_key_data: API key update data (optional fields) + db: Database session + current_user: Currently authenticated user + + Returns: + ApiKeyResponse with the updated key details + + Raises: + HTTPException: 404 if API key not found + HTTPException: 403 if user doesn't own the key + """ + # Find the API key + api_key = db.query(ApiKey).filter( + ApiKey.id == api_key_id + ).first() + + if not api_key: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="API key not found" + ) + + # Verify ownership + if api_key.user_id != current_user.id: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You don't have permission to modify this API key" + ) + + # Update fields if provided + if api_key_data.name is not None: + api_key.name = api_key_data.name + + if api_key_data.is_active is not None: + api_key.is_active = api_key_data.is_active + + db.commit() + db.refresh(api_key) + + return api_key + + +@router.delete("/{api_key_id}", status_code=status.HTTP_204_NO_CONTENT) +async def delete_api_key( + api_key_id: int, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """Delete an API key. + + Deleting an API key also cascades to delete all associated usage statistics. + Users can only delete their own API keys. + + Args: + api_key_id: ID of the API key to delete + db: Database session + current_user: Currently authenticated user + + Raises: + HTTPException: 404 if API key not found + HTTPException: 403 if user doesn't own the key + """ + # Find the API key + api_key = db.query(ApiKey).filter( + ApiKey.id == api_key_id + ).first() + + if not api_key: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="API key not found" + ) + + # Verify ownership + if api_key.user_id != current_user.id: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You don't have permission to delete this API key" + ) + + # Delete the API key (cascade to usage_stats is handled by SQLAlchemy) + db.delete(api_key) + db.commit() + + return None diff --git a/tests/__pycache__/conftest.cpython-313-pytest-9.0.2.pyc b/tests/__pycache__/conftest.cpython-313-pytest-9.0.2.pyc index 6c2093e..3d44bbe 100644 Binary files a/tests/__pycache__/conftest.cpython-313-pytest-9.0.2.pyc and b/tests/__pycache__/conftest.cpython-313-pytest-9.0.2.pyc differ diff --git a/tests/conftest.py b/tests/conftest.py index c7bade0..0ba54c9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -48,3 +48,67 @@ def mock_env_vars(monkeypatch): monkeypatch.setenv('DATABASE_URL', 'sqlite:///./test.db') monkeypatch.setenv('DEBUG', 'true') monkeypatch.setenv('LOG_LEVEL', 'DEBUG') + + +@pytest.fixture +def mock_db(): + """Create a mock database session for unit tests.""" + from unittest.mock import MagicMock + return MagicMock() + + +@pytest.fixture +def mock_user(): + """Create a mock authenticated user for testing.""" + from unittest.mock import MagicMock + user = MagicMock() + user.id = 1 + user.email = "test@example.com" + user.is_active = True + return user + + +@pytest.fixture +def mock_encryption_service(): + """Create a mock encryption service for testing.""" + from unittest.mock import MagicMock + mock = MagicMock() + mock.encrypt.return_value = "encrypted_key_value" + mock.decrypt.return_value = "sk-or-v1-decrypted" + return mock + + +@pytest.fixture +def client(): + """Create a test client with fresh database.""" + from fastapi.testclient import TestClient + from sqlalchemy import create_engine + from sqlalchemy.orm import sessionmaker + from sqlalchemy.pool import StaticPool + + from openrouter_monitor.database import Base, get_db + from openrouter_monitor.main import app + + # Setup in-memory test database + SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:" + engine = create_engine( + SQLALCHEMY_DATABASE_URL, + connect_args={"check_same_thread": False}, + poolclass=StaticPool, + ) + TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + + def override_get_db(): + """Override get_db dependency for testing.""" + try: + db = TestingSessionLocal() + yield db + finally: + db.close() + + app.dependency_overrides[get_db] = override_get_db + + Base.metadata.create_all(bind=engine) + with TestClient(app) as c: + yield c + Base.metadata.drop_all(bind=engine) diff --git a/tests/unit/routers/test_api_keys.py b/tests/unit/routers/test_api_keys.py new file mode 100644 index 0000000..4fadd62 --- /dev/null +++ b/tests/unit/routers/test_api_keys.py @@ -0,0 +1,508 @@ +"""Tests for API Keys router. + +T24-T27: Test endpoints for API key CRUD operations. +""" +import pytest +from datetime import datetime, timezone +from unittest.mock import MagicMock, patch + +from fastapi import status +from fastapi.testclient import TestClient +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from sqlalchemy.pool import StaticPool + +from openrouter_monitor.database import Base, get_db +from openrouter_monitor.main import app +from openrouter_monitor.models import User + + +# Setup in-memory test database +SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:" +engine = create_engine( + SQLALCHEMY_DATABASE_URL, + connect_args={"check_same_thread": False}, + poolclass=StaticPool, +) +TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + + +def override_get_db(): + """Override get_db dependency for testing.""" + try: + db = TestingSessionLocal() + yield db + finally: + db.close() + + +app.dependency_overrides[get_db] = override_get_db + + +@pytest.fixture(scope="function") +def client(): + """Create a test client with fresh database.""" + Base.metadata.create_all(bind=engine) + with TestClient(app) as c: + yield c + Base.metadata.drop_all(bind=engine) + + +@pytest.fixture +def test_user(client): + """Create a test user and return user data.""" + user_data = { + "email": "test@example.com", + "password": "SecurePass123!", + "password_confirm": "SecurePass123!" + } + response = client.post("/api/auth/register", json=user_data) + assert response.status_code == 201 + return user_data + + +@pytest.fixture +def auth_token(client, test_user): + """Get auth token for test user.""" + login_data = { + "email": test_user["email"], + "password": test_user["password"] + } + response = client.post("/api/auth/login", json=login_data) + assert response.status_code == 200 + return response.json()["access_token"] + + +@pytest.fixture +def authorized_client(client, auth_token): + """Create a client with authorization header.""" + client.headers = {"Authorization": f"Bearer {auth_token}"} + return client + + +@pytest.fixture +def another_test_user(client): + """Create another test user for security tests.""" + user_data = { + "email": "user2@example.com", + "password": "SecurePass123!", + "password_confirm": "SecurePass123!" + } + response = client.post("/api/auth/register", json=user_data) + assert response.status_code == 201 + return user_data + + +@pytest.fixture +def another_auth_token(client, another_test_user): + """Get auth token for the second test user.""" + login_data = { + "email": another_test_user["email"], + "password": another_test_user["password"] + } + response = client.post("/api/auth/login", json=login_data) + assert response.status_code == 200 + return response.json()["access_token"] + + +class TestCreateApiKey: + """Tests for POST /api/keys endpoint (T24).""" + + def test_create_api_key_success(self, authorized_client): + """Test successful API key creation.""" + response = authorized_client.post( + "/api/keys", + json={ + "name": "Production Key", + "key": "sk-or-v1-abc123def456ghi789jkl012mno345pqr678stu901vwx234yz" + } + ) + + assert response.status_code == status.HTTP_201_CREATED + data = response.json() + assert data["name"] == "Production Key" + assert data["is_active"] is True + assert "id" in data + assert "created_at" in data + # Verify key is NOT returned in response + assert "key" not in data + assert "key_encrypted" not in data + + def test_create_api_key_limit_reached(self, authorized_client, monkeypatch): + """Test that creating more than MAX_API_KEYS_PER_USER returns 400.""" + from openrouter_monitor import routers + # Set limit to 1 to make test easier + monkeypatch.setattr(routers.api_keys, "MAX_API_KEYS_PER_USER", 1) + + # Create first key (should succeed) + response1 = authorized_client.post( + "/api/keys", + json={"name": "Key 1", "key": "sk-or-v1-key1"} + ) + assert response1.status_code == status.HTTP_201_CREATED + + # Try to create second key (should fail due to limit) + response2 = authorized_client.post( + "/api/keys", + json={"name": "Key 2", "key": "sk-or-v1-key2"} + ) + + assert response2.status_code == status.HTTP_400_BAD_REQUEST + assert "maximum" in response2.json()["detail"].lower() + + def test_create_api_key_invalid_format(self, authorized_client): + """Test that invalid key format returns 422 validation error.""" + response = authorized_client.post( + "/api/keys", + json={ + "name": "Test Key", + "key": "invalid-key-format" # Missing sk-or-v1- prefix + } + ) + + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + + def test_create_api_key_unauthorized(self, client): + """Test that request without auth returns 401.""" + response = client.post( + "/api/keys", + json={ + "name": "Test Key", + "key": "sk-or-v1-abc123" + } + ) + + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + def test_create_api_key_empty_name(self, authorized_client): + """Test that empty name returns 422 validation error.""" + response = authorized_client.post( + "/api/keys", + json={ + "name": "", + "key": "sk-or-v1-abc123" + } + ) + + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + + def test_create_api_key_name_too_long(self, authorized_client): + """Test that name > 100 chars returns 422 validation error.""" + response = authorized_client.post( + "/api/keys", + json={ + "name": "x" * 101, + "key": "sk-or-v1-abc123" + } + ) + + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + + +class TestListApiKeys: + """Tests for GET /api/keys endpoint (T25).""" + + def test_list_api_keys_empty(self, authorized_client): + """Test listing keys when user has no keys.""" + response = authorized_client.get("/api/keys") + + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["items"] == [] + assert data["total"] == 0 + + def test_list_api_keys_with_data(self, authorized_client): + """Test listing keys with existing data.""" + # Create some keys + for i in range(3): + authorized_client.post( + "/api/keys", + json={"name": f"Key {i}", "key": f"sk-or-v1-key{i}"} + ) + + response = authorized_client.get("/api/keys") + + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["total"] == 3 + assert len(data["items"]) == 3 + # Check ordering (newest first) + assert data["items"][0]["name"] == "Key 2" + assert data["items"][2]["name"] == "Key 0" + + def test_list_api_keys_pagination(self, authorized_client): + """Test pagination with skip and limit.""" + # Create 5 keys + for i in range(5): + authorized_client.post( + "/api/keys", + json={"name": f"Key {i}", "key": f"sk-or-v1-key{i}"} + ) + + # Test skip=2, limit=2 + response = authorized_client.get("/api/keys?skip=2&limit=2") + + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["total"] == 5 + assert len(data["items"]) == 2 + # Due to DESC ordering, skip=2 means we get keys 2 and 1 + + def test_list_api_keys_unauthorized(self, client): + """Test that request without auth returns 401.""" + response = client.get("/api/keys") + + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + +class TestUpdateApiKey: + """Tests for PUT /api/keys/{id} endpoint (T26).""" + + def test_update_api_key_success(self, authorized_client): + """Test successful API key update.""" + # Create a key first + create_response = authorized_client.post( + "/api/keys", + json={"name": "Old Name", "key": "sk-or-v1-abc123"} + ) + key_id = create_response.json()["id"] + + # Update the key + response = authorized_client.put( + f"/api/keys/{key_id}", + json={"name": "Updated Name", "is_active": False} + ) + + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["name"] == "Updated Name" + assert data["is_active"] is False + + def test_update_api_key_partial_name_only(self, authorized_client): + """Test update with name only.""" + # Create a key + create_response = authorized_client.post( + "/api/keys", + json={"name": "Old Name", "key": "sk-or-v1-abc123"} + ) + key_id = create_response.json()["id"] + + # Update name only + response = authorized_client.put( + f"/api/keys/{key_id}", + json={"name": "New Name"} + ) + + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["name"] == "New Name" + assert data["is_active"] is True # Unchanged + + def test_update_api_key_partial_is_active_only(self, authorized_client): + """Test update with is_active only.""" + # Create a key + create_response = authorized_client.post( + "/api/keys", + json={"name": "Key Name", "key": "sk-or-v1-abc123"} + ) + key_id = create_response.json()["id"] + + # Update is_active only + response = authorized_client.put( + f"/api/keys/{key_id}", + json={"is_active": False} + ) + + assert response.status_code == status.HTTP_200_OK + + def test_update_api_key_not_found(self, authorized_client): + """Test update for non-existent key returns 404.""" + response = authorized_client.put( + "/api/keys/999", + json={"name": "New Name"} + ) + + assert response.status_code == status.HTTP_404_NOT_FOUND + + def test_update_api_key_not_owner(self, client, another_auth_token): + """Test update of another user's key returns 403.""" + # This test requires creating a key with one user and trying to update with another + # For simplicity, we just check that the endpoint enforces ownership + # The actual test would need two authenticated clients + + # For now, just verify 403 is returned for non-existent key with wrong user context + client.headers = {"Authorization": f"Bearer {another_auth_token}"} + response = client.put( + "/api/keys/1", + json={"name": "New Name"} + ) + + # Should return 404 (not found) since key 1 doesn't exist for this user + # or 403 if we found a key owned by someone else + assert response.status_code in [status.HTTP_404_NOT_FOUND, status.HTTP_403_FORBIDDEN] + + def test_update_api_key_unauthorized(self, client): + """Test that request without auth returns 401.""" + response = client.put( + "/api/keys/1", + json={"name": "New Name"} + ) + + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + +class TestDeleteApiKey: + """Tests for DELETE /api/keys/{id} endpoint (T27).""" + + def test_delete_api_key_success(self, authorized_client): + """Test successful API key deletion.""" + # Create a key + create_response = authorized_client.post( + "/api/keys", + json={"name": "Key to Delete", "key": "sk-or-v1-abc123"} + ) + key_id = create_response.json()["id"] + + # Delete the key + response = authorized_client.delete(f"/api/keys/{key_id}") + + assert response.status_code == status.HTTP_204_NO_CONTENT + + # Verify it's deleted + list_response = authorized_client.get("/api/keys") + assert list_response.json()["total"] == 0 + + def test_delete_api_key_not_found(self, authorized_client): + """Test deletion of non-existent key returns 404.""" + response = authorized_client.delete("/api/keys/999") + + assert response.status_code == status.HTTP_404_NOT_FOUND + + def test_delete_api_key_not_owner(self, client, another_auth_token): + """Test deletion of another user's key returns 403.""" + client.headers = {"Authorization": f"Bearer {another_auth_token}"} + response = client.delete("/api/keys/1") + + assert response.status_code in [status.HTTP_404_NOT_FOUND, status.HTTP_403_FORBIDDEN] + + def test_delete_api_key_unauthorized(self, client): + """Test that request without auth returns 401.""" + response = client.delete("/api/keys/1") + + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + +class TestSecurity: + """Security tests for API keys endpoints.""" + + def test_user_a_cannot_see_user_b_keys(self, client, authorized_client, another_auth_token): + """Test that user A cannot see user B's keys.""" + # User A creates a key + authorized_client.post( + "/api/keys", + json={"name": "User A Key", "key": "sk-or-v1-usera"} + ) + + # User B tries to list keys + client.headers = {"Authorization": f"Bearer {another_auth_token}"} + response = client.get("/api/keys") + + assert response.status_code == status.HTTP_200_OK + data = response.json() + # User B should see empty list + assert data["items"] == [] + assert data["total"] == 0 + + def test_user_a_cannot_modify_user_b_keys(self, client, authorized_client, another_auth_token): + """Test that user A cannot modify user B's keys.""" + # User A creates a key + create_response = authorized_client.post( + "/api/keys", + json={"name": "User A Key", "key": "sk-or-v1-usera"} + ) + key_id = create_response.json()["id"] + + # User B tries to modify the key + client.headers = {"Authorization": f"Bearer {another_auth_token}"} + response = client.put( + f"/api/keys/{key_id}", + json={"name": "Hacked Name"} + ) + + # Should return 404 (not found for user B) or 403 (forbidden) + assert response.status_code in [status.HTTP_404_NOT_FOUND, status.HTTP_403_FORBIDDEN] + + def test_user_a_cannot_delete_user_b_keys(self, client, authorized_client, another_auth_token): + """Test that user A cannot delete user B's keys.""" + # User A creates a key + create_response = authorized_client.post( + "/api/keys", + json={"name": "User A Key", "key": "sk-or-v1-usera"} + ) + key_id = create_response.json()["id"] + + # User B tries to delete the key + client.headers = {"Authorization": f"Bearer {another_auth_token}"} + response = client.delete(f"/api/keys/{key_id}") + + # Should return 404 (not found for user B) or 403 (forbidden) + assert response.status_code in [status.HTTP_404_NOT_FOUND, status.HTTP_403_FORBIDDEN] + + def test_key_never_exposed_in_response(self, authorized_client): + """Test that API key value is never exposed in any response.""" + # Create a key + create_response = authorized_client.post( + "/api/keys", + json={"name": "Test Key", "key": "sk-or-v1-secret-value"} + ) + + # Verify create response doesn't contain key + create_data = create_response.json() + assert "key" not in create_data + assert "key_encrypted" not in create_data + + # List keys + list_response = authorized_client.get("/api/keys") + list_data = list_response.json() + + for item in list_data["items"]: + assert "key" not in item + assert "key_encrypted" not in item + + # Update key + key_id = create_data["id"] + update_response = authorized_client.put( + f"/api/keys/{key_id}", + json={"name": "Updated"} + ) + update_data = update_response.json() + + assert "key" not in update_data + assert "key_encrypted" not in update_data + + def test_api_key_is_encrypted_in_database(self, authorized_client): + """Test that API key is encrypted before storage in database.""" + from openrouter_monitor.models import ApiKey + + # Create a key + api_key_value = "sk-or-v1-test-encryption-key" + create_response = authorized_client.post( + "/api/keys", + json={"name": "Test Encryption", "key": api_key_value} + ) + + assert create_response.status_code == status.HTTP_201_CREATED + key_id = create_response.json()["id"] + + # Check database - key should be encrypted + # Access the database through the TestingSessionLocal used in tests + db = TestingSessionLocal() + try: + api_key = db.query(ApiKey).filter(ApiKey.id == key_id).first() + assert api_key is not None + # The encrypted key should not be the plaintext value + assert api_key.key_encrypted != api_key_value + # The encrypted key should not contain the plaintext prefix + assert "sk-or-v1-" not in api_key.key_encrypted + finally: + db.close()