"""Tests for API tokens router. T41: POST /api/tokens - Generate API token T42: GET /api/tokens - List API tokens T43: DELETE /api/tokens/{id} - Revoke API token """ import pytest from datetime import datetime, timedelta from fastapi.testclient import TestClient from sqlalchemy.orm import Session from openrouter_monitor.models import User, ApiToken # ============================================================================= # Fixtures # ============================================================================= @pytest.fixture def test_user_with_auth(client, db_session): """Create a test user and return user + auth headers. Returns tuple (user, auth_headers) """ from openrouter_monitor.services.password import hash_password from openrouter_monitor.services.jwt import create_access_token # Create user directly in database user = User( email="tokentest@example.com", password_hash=hash_password("TestPassword123!"), is_active=True ) db_session.add(user) db_session.commit() db_session.refresh(user) # Create JWT token token = create_access_token(data={"sub": str(user.id)}) headers = {"Authorization": f"Bearer {token}"} return user, headers @pytest.fixture def test_user(test_user_with_auth): """Get test user from fixture.""" return test_user_with_auth[0] @pytest.fixture def auth_headers(test_user_with_auth): """Get auth headers from fixture.""" return test_user_with_auth[1] # ============================================================================= # T41: POST /api/tokens - Generate API Token # ============================================================================= @pytest.mark.unit class TestCreateToken: """Test POST /api/tokens endpoint (T41).""" def test_create_token_success_returns_201_and_token( self, client: TestClient, auth_headers: dict, db_session: Session ): """Test successful token creation returns 201 with plaintext token.""" # Arrange token_data = {"name": "Test Token"} # Act response = client.post("/api/tokens", json=token_data, headers=auth_headers) # Assert assert response.status_code == 201 data = response.json() assert "id" in data assert data["name"] == "Test Token" assert "token" in data assert data["token"].startswith("or_api_") # Token format check assert "created_at" in data def test_create_token_saves_hash_not_plaintext( self, client: TestClient, auth_headers: dict, db_session: Session ): """Test that only hash is saved to database, not plaintext.""" # Arrange token_data = {"name": "Secure Token"} # Act response = client.post("/api/tokens", json=token_data, headers=auth_headers) # Assert assert response.status_code == 201 data = response.json() plaintext_token = data["token"] token_id = data["id"] # Verify in database: token_hash should NOT match plaintext db_token = db_session.query(ApiToken).filter(ApiToken.id == token_id).first() assert db_token is not None assert db_token.token_hash != plaintext_token assert len(db_token.token_hash) == 64 # SHA-256 hex is 64 chars def test_create_token_without_auth_returns_401( self, client: TestClient ): """Test that token creation without auth returns 401.""" # Arrange token_data = {"name": "Test Token"} # Act response = client.post("/api/tokens", json=token_data) # Assert assert response.status_code == 401 def test_create_token_empty_name_returns_422( self, client: TestClient, auth_headers: dict ): """Test that empty name returns validation error 422.""" # Arrange token_data = {"name": ""} # Act response = client.post("/api/tokens", json=token_data, headers=auth_headers) # Assert assert response.status_code == 422 def test_create_token_name_too_long_returns_422( self, client: TestClient, auth_headers: dict ): """Test that name > 100 chars returns validation error 422.""" # Arrange token_data = {"name": "x" * 101} # Act response = client.post("/api/tokens", json=token_data, headers=auth_headers) # Assert assert response.status_code == 422 def test_create_token_exceeds_limit_returns_400( self, client: TestClient, auth_headers: dict, test_user: User, db_session: Session ): """Test that creating token when limit reached returns 400.""" # Arrange: Create max tokens (5 by default) from openrouter_monitor.services.token import generate_api_token for i in range(5): _, token_hash = generate_api_token() token = ApiToken( user_id=test_user.id, token_hash=token_hash, name=f"Existing Token {i}", is_active=True ) db_session.add(token) db_session.commit() # Act: Try to create 6th token token_data = {"name": "One Too Many"} response = client.post("/api/tokens", json=token_data, headers=auth_headers) # Assert assert response.status_code == 400 assert "limit" in response.json()["detail"].lower() or "maximum" in response.json()["detail"].lower() def test_create_token_associated_with_current_user( self, client: TestClient, auth_headers: dict, test_user: User, db_session: Session ): """Test that created token is associated with authenticated user.""" # Arrange token_data = {"name": "My Token"} # Act response = client.post("/api/tokens", json=token_data, headers=auth_headers) # Assert assert response.status_code == 201 data = response.json() # Verify ownership db_token = db_session.query(ApiToken).filter(ApiToken.id == data["id"]).first() assert db_token.user_id == test_user.id def test_create_token_sets_is_active_true( self, client: TestClient, auth_headers: dict, db_session: Session ): """Test that created token has is_active=True by default.""" # Arrange token_data = {"name": "Active Token"} # Act response = client.post("/api/tokens", json=token_data, headers=auth_headers) # Assert assert response.status_code == 201 data = response.json() db_token = db_session.query(ApiToken).filter(ApiToken.id == data["id"]).first() assert db_token.is_active is True # ============================================================================= # T42: GET /api/tokens - List API Tokens # ============================================================================= @pytest.mark.unit class TestListTokens: """Test GET /api/tokens endpoint (T42).""" def test_list_tokens_empty_returns_empty_list( self, client: TestClient, auth_headers: dict ): """Test listing tokens when user has no tokens returns empty list.""" # Act response = client.get("/api/tokens", headers=auth_headers) # Assert assert response.status_code == 200 data = response.json() assert data == [] def test_list_tokens_returns_user_tokens( self, client: TestClient, auth_headers: dict, test_user: User, db_session: Session ): """Test listing returns only current user's tokens.""" # Arrange: Create tokens for user from openrouter_monitor.services.token import generate_api_token for i in range(3): _, token_hash = generate_api_token() token = ApiToken( user_id=test_user.id, token_hash=token_hash, name=f"Token {i}", is_active=True ) db_session.add(token) db_session.commit() # Act response = client.get("/api/tokens", headers=auth_headers) # Assert assert response.status_code == 200 data = response.json() assert len(data) == 3 for token in data: assert "id" in token assert "name" in token assert "created_at" in token assert "is_active" in token def test_list_tokens_does_not_include_token_values( self, client: TestClient, auth_headers: dict, test_user: User, db_session: Session ): """CRITICAL SECURITY TEST: Token values must NOT be in response.""" # Arrange: Create a token from openrouter_monitor.services.token import generate_api_token _, token_hash = generate_api_token() token = ApiToken( user_id=test_user.id, token_hash=token_hash, name="Secret Token", is_active=True ) db_session.add(token) db_session.commit() # Act response = client.get("/api/tokens", headers=auth_headers) # Assert assert response.status_code == 200 data = response.json() assert len(data) == 1 # Security check: NO token field should be present assert "token" not in data[0] assert "token_hash" not in data[0] # Even hash should not be returned assert token_hash not in str(data) def test_list_tokens_ordered_by_created_at_desc( self, client: TestClient, auth_headers: dict, test_user: User, db_session: Session ): """Test tokens are ordered by created_at DESC (newest first).""" # Arrange: Create tokens with different timestamps from openrouter_monitor.services.token import generate_api_token base_time = datetime.utcnow() for i in range(3): _, token_hash = generate_api_token() token = ApiToken( user_id=test_user.id, token_hash=token_hash, name=f"Token {i}", created_at=base_time - timedelta(days=i), is_active=True ) db_session.add(token) db_session.commit() # Act response = client.get("/api/tokens", headers=auth_headers) # Assert assert response.status_code == 200 data = response.json() assert len(data) == 3 # Check ordering: newest first assert data[0]["name"] == "Token 0" # Created now assert data[1]["name"] == "Token 1" # Created 1 day ago assert data[2]["name"] == "Token 2" # Created 2 days ago def test_list_tokens_includes_last_used_at( self, client: TestClient, auth_headers: dict, test_user: User, db_session: Session ): """Test response includes last_used_at field.""" # Arrange: Create token with last_used_at from openrouter_monitor.services.token import generate_api_token _, token_hash = generate_api_token() token = ApiToken( user_id=test_user.id, token_hash=token_hash, name="Used Token", last_used_at=datetime.utcnow(), is_active=True ) db_session.add(token) db_session.commit() # Act response = client.get("/api/tokens", headers=auth_headers) # Assert assert response.status_code == 200 data = response.json() assert len(data) == 1 assert "last_used_at" in data[0] assert data[0]["last_used_at"] is not None def test_list_tokens_returns_only_active_tokens( self, client: TestClient, auth_headers: dict, test_user: User, db_session: Session ): """Test that only active tokens are returned (soft delete).""" # Arrange: Create active and inactive tokens from openrouter_monitor.services.token import generate_api_token # Active token _, hash1 = generate_api_token() token1 = ApiToken( user_id=test_user.id, token_hash=hash1, name="Active Token", is_active=True ) db_session.add(token1) # Inactive token (revoked) _, hash2 = generate_api_token() token2 = ApiToken( user_id=test_user.id, token_hash=hash2, name="Revoked Token", is_active=False ) db_session.add(token2) db_session.commit() # Act response = client.get("/api/tokens", headers=auth_headers) # Assert assert response.status_code == 200 data = response.json() assert len(data) == 1 assert data[0]["name"] == "Active Token" def test_list_tokens_without_auth_returns_401( self, client: TestClient ): """Test that listing tokens without auth returns 401.""" # Act response = client.get("/api/tokens") # Assert assert response.status_code == 401 def test_list_tokens_does_not_show_other_users_tokens( self, client: TestClient, auth_headers: dict, db_session: Session ): """Test that user can only see their own tokens.""" # Arrange: Create another user and their token from openrouter_monitor.services.password import hash_password from openrouter_monitor.services.token import generate_api_token other_user = User( email="other@example.com", password_hash=hash_password("OtherPass123!"), is_active=True ) db_session.add(other_user) db_session.commit() _, token_hash = generate_api_token() other_token = ApiToken( user_id=other_user.id, token_hash=token_hash, name="Other User's Token", is_active=True ) db_session.add(other_token) db_session.commit() # Act response = client.get("/api/tokens", headers=auth_headers) # Assert assert response.status_code == 200 data = response.json() assert len(data) == 0 # Should not see other user's token # ============================================================================= # T43: DELETE /api/tokens/{id} - Revoke API Token # ============================================================================= @pytest.mark.unit class TestRevokeToken: """Test DELETE /api/tokens/{id} endpoint (T43).""" def test_revoke_token_success_returns_204( self, client: TestClient, auth_headers: dict, test_user: User, db_session: Session ): """Test successful revocation returns 204 No Content.""" # Arrange: Create a token from openrouter_monitor.services.token import generate_api_token _, token_hash = generate_api_token() token = ApiToken( user_id=test_user.id, token_hash=token_hash, name="Token to Revoke", is_active=True ) db_session.add(token) db_session.commit() # Act response = client.delete(f"/api/tokens/{token.id}", headers=auth_headers) # Assert assert response.status_code == 204 assert response.content == b"" # No content def test_revoke_token_sets_is_active_false( self, client: TestClient, auth_headers: dict, test_user: User, db_session: Session ): """Test that revocation sets is_active=False (soft delete).""" # Arrange from openrouter_monitor.services.token import generate_api_token _, token_hash = generate_api_token() token = ApiToken( user_id=test_user.id, token_hash=token_hash, name="Token to Revoke", is_active=True ) db_session.add(token) db_session.commit() # Act response = client.delete(f"/api/tokens/{token.id}", headers=auth_headers) # Assert assert response.status_code == 204 # Verify in database db_session.refresh(token) assert token.is_active is False def test_revoke_token_does_not_delete_from_db( self, client: TestClient, auth_headers: dict, test_user: User, db_session: Session ): """Test that token is NOT deleted from database (soft delete).""" # Arrange from openrouter_monitor.services.token import generate_api_token _, token_hash = generate_api_token() token = ApiToken( user_id=test_user.id, token_hash=token_hash, name="Token to Revoke", is_active=True ) db_session.add(token) db_session.commit() db_session.refresh(token) token_id = token.id # Act response = client.delete(f"/api/tokens/{token.id}", headers=auth_headers) # Assert assert response.status_code == 204 # Verify token still exists in DB - use a fresh query db_session.expire_all() db_token = db_session.query(ApiToken).filter(ApiToken.id == token_id).first() assert db_token is not None assert db_token.is_active is False def test_revoke_token_not_found_returns_404( self, client: TestClient, auth_headers: dict ): """Test revoking non-existent token returns 404.""" # Act response = client.delete("/api/tokens/99999", headers=auth_headers) # Assert assert response.status_code == 404 def test_revoke_other_users_token_returns_403( self, client: TestClient, auth_headers: dict, db_session: Session ): """Test revoking another user's token returns 403 Forbidden.""" # Arrange: Create another user and their token from openrouter_monitor.services.password import hash_password from openrouter_monitor.services.token import generate_api_token other_user = User( email="other@example.com", password_hash=hash_password("OtherPass123!"), is_active=True ) db_session.add(other_user) db_session.commit() _, token_hash = generate_api_token() other_token = ApiToken( user_id=other_user.id, token_hash=token_hash, name="Other User's Token", is_active=True ) db_session.add(other_token) db_session.commit() # Act: Try to revoke other user's token response = client.delete(f"/api/tokens/{other_token.id}", headers=auth_headers) # Assert assert response.status_code == 403 def test_revoke_token_without_auth_returns_401( self, client: TestClient ): """Test revoking token without auth returns 401.""" # Act response = client.delete("/api/tokens/1") # Assert assert response.status_code == 401 def test_revoked_token_cannot_be_used_for_public_api( self, client: TestClient, auth_headers: dict, test_user: User, db_session: Session ): """INTEGRATION TEST: Revoked token should not work on public API.""" # Arrange: Create and revoke a token from openrouter_monitor.services.token import generate_api_token plaintext, token_hash = generate_api_token() token = ApiToken( user_id=test_user.id, token_hash=token_hash, name="Token to Revoke", is_active=True ) db_session.add(token) db_session.commit() # Verify token works before revocation api_response = client.get( "/api/v1/stats", headers={"Authorization": f"Bearer {plaintext}"} ) # Token should be valid (even if returns empty/no data) assert api_response.status_code != 401 # Revoke the token revoke_response = client.delete(f"/api/tokens/{token.id}", headers=auth_headers) assert revoke_response.status_code == 204 # Act: Try to use revoked token api_response = client.get( "/api/v1/stats", headers={"Authorization": f"Bearer {plaintext}"} ) # Assert: Should return 401 Unauthorized assert api_response.status_code == 401 def test_revoke_already_revoked_token_returns_404( self, client: TestClient, auth_headers: dict, test_user: User, db_session: Session ): """Test revoking an already revoked token returns 404.""" # Arrange: Create a revoked token from openrouter_monitor.services.token import generate_api_token _, token_hash = generate_api_token() token = ApiToken( user_id=test_user.id, token_hash=token_hash, name="Already Revoked", is_active=False # Already revoked ) db_session.add(token) db_session.commit() # Act: Try to revoke again response = client.delete(f"/api/tokens/{token.id}", headers=auth_headers) # Assert: Should return 404 (token not found as active) assert response.status_code == 404