feat(tokens): T41-T43 implement API token management endpoints
- Add max_api_tokens_per_user config (default 5)
- Implement POST /api/tokens (T41): generate token with limit check
- Implement GET /api/tokens (T42): list active tokens, no values exposed
- Implement DELETE /api/tokens/{id} (T43): soft delete with ownership check
- Security: plaintext token shown ONLY at creation
- Security: SHA-256 hash stored in DB, never the plaintext
- Security: revoked tokens return 401 on public API
- 24 tests with 100% coverage on tokens router
Closes T41, T42, T43
This commit is contained in:
@@ -8,10 +8,10 @@
|
|||||||
|
|
||||||
| Metrica | Valore |
|
| Metrica | Valore |
|
||||||
|---------|--------|
|
|---------|--------|
|
||||||
| **Stato** | 🟢 Public API Completati |
|
| **Stato** | 🟢 Gestione Token API Completata |
|
||||||
| **Progresso** | 48% |
|
| **Progresso** | 52% |
|
||||||
| **Task Totali** | 74 |
|
| **Task Totali** | 74 |
|
||||||
| **Task Completati** | 35 |
|
| **Task Completati** | 38 |
|
||||||
| **Task In Progress** | 0 |
|
| **Task In Progress** | 0 |
|
||||||
|
|
||||||
---
|
---
|
||||||
@@ -128,9 +128,24 @@
|
|||||||
- [x] T40: Scrivere test per public API endpoints - ✅ Completato (2026-04-07)
|
- [x] T40: Scrivere test per public API endpoints - ✅ Completato (2026-04-07)
|
||||||
- 27 test endpoint + 18 test rate limit + 25 test schemas = 70 test totali
|
- 27 test endpoint + 18 test rate limit + 25 test schemas = 70 test totali
|
||||||
- Coverage: public_api.py 100%, rate_limit.py 98%
|
- Coverage: public_api.py 100%, rate_limit.py 98%
|
||||||
- [ ] T41: Implementare POST /api/tokens (generate)
|
- [x] T41: Implementare POST /api/tokens (generate) - ✅ Completato (2026-04-07)
|
||||||
- [ ] T42: Implementare GET /api/tokens (list)
|
- Endpoint: POST /api/tokens con auth JWT
|
||||||
- [ ] T43: Implementare DELETE /api/tokens/{id})
|
- Limite: MAX_API_TOKENS_PER_USER (default 5)
|
||||||
|
- Token plaintext mostrato SOLO in risposta creazione
|
||||||
|
- Hash SHA-256 salvato nel DB
|
||||||
|
- Test: 8 test passanti, 100% coverage
|
||||||
|
- [x] T42: Implementare GET /api/tokens (list) - ✅ Completato (2026-04-07)
|
||||||
|
- Endpoint: GET /api/tokens con auth JWT
|
||||||
|
- NO token values in risposta (sicurezza)
|
||||||
|
- Ordinamento: created_at DESC
|
||||||
|
- Solo token attivi (is_active=True)
|
||||||
|
- Test: 7 test passanti
|
||||||
|
- [x] T43: Implementare DELETE /api/tokens/{id} - ✅ Completato (2026-04-07)
|
||||||
|
- Endpoint: DELETE /api/tokens/{id} con auth JWT
|
||||||
|
- Soft delete: is_active=False
|
||||||
|
- Verifica ownership (403 se non proprio)
|
||||||
|
- Token revocato non funziona su API pubblica
|
||||||
|
- Test: 9 test passanti
|
||||||
|
|
||||||
### 🎨 Frontend Web (T44-T54) - 0/11 completati
|
### 🎨 Frontend Web (T44-T54) - 0/11 completati
|
||||||
- [ ] T44: Setup Jinja2 templates e static files
|
- [ ] T44: Setup Jinja2 templates e static files
|
||||||
|
|||||||
@@ -62,6 +62,10 @@ class Settings(BaseSettings):
|
|||||||
default=10,
|
default=10,
|
||||||
description="Maximum API keys per user"
|
description="Maximum API keys per user"
|
||||||
)
|
)
|
||||||
|
max_api_tokens_per_user: int = Field(
|
||||||
|
default=5,
|
||||||
|
description="Maximum API tokens per user"
|
||||||
|
)
|
||||||
rate_limit_requests: int = Field(
|
rate_limit_requests: int = Field(
|
||||||
default=100,
|
default=100,
|
||||||
description="API rate limit requests"
|
description="API rate limit requests"
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ from openrouter_monitor.routers import api_keys
|
|||||||
from openrouter_monitor.routers import auth
|
from openrouter_monitor.routers import auth
|
||||||
from openrouter_monitor.routers import public_api
|
from openrouter_monitor.routers import public_api
|
||||||
from openrouter_monitor.routers import stats
|
from openrouter_monitor.routers import stats
|
||||||
|
from openrouter_monitor.routers import tokens
|
||||||
|
|
||||||
settings = get_settings()
|
settings = get_settings()
|
||||||
|
|
||||||
@@ -33,6 +34,7 @@ app.add_middleware(
|
|||||||
# Include routers
|
# Include routers
|
||||||
app.include_router(auth.router, prefix="/api/auth", tags=["authentication"])
|
app.include_router(auth.router, prefix="/api/auth", tags=["authentication"])
|
||||||
app.include_router(api_keys.router, prefix="/api/keys", tags=["api-keys"])
|
app.include_router(api_keys.router, prefix="/api/keys", tags=["api-keys"])
|
||||||
|
app.include_router(tokens.router)
|
||||||
app.include_router(stats.router)
|
app.include_router(stats.router)
|
||||||
app.include_router(public_api.router)
|
app.include_router(public_api.router)
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
"""Routers package for OpenRouter Monitor."""
|
"""Routers package for OpenRouter Monitor."""
|
||||||
from openrouter_monitor.routers import api_keys
|
from openrouter_monitor.routers import api_keys
|
||||||
from openrouter_monitor.routers import auth
|
from openrouter_monitor.routers import auth
|
||||||
|
from openrouter_monitor.routers import public_api
|
||||||
from openrouter_monitor.routers import stats
|
from openrouter_monitor.routers import stats
|
||||||
|
from openrouter_monitor.routers import tokens
|
||||||
|
|
||||||
__all__ = ["auth", "api_keys", "stats"]
|
__all__ = ["auth", "api_keys", "public_api", "stats", "tokens"]
|
||||||
|
|||||||
192
src/openrouter_monitor/routers/tokens.py
Normal file
192
src/openrouter_monitor/routers/tokens.py
Normal file
@@ -0,0 +1,192 @@
|
|||||||
|
"""API tokens router for OpenRouter API Key Monitor.
|
||||||
|
|
||||||
|
T41: POST /api/tokens - Generate API token
|
||||||
|
T42: GET /api/tokens - List API tokens
|
||||||
|
T43: DELETE /api/tokens/{id} - Revoke API token
|
||||||
|
"""
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
from fastapi import APIRouter, Depends, HTTPException, status
|
||||||
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
|
from openrouter_monitor.config import get_settings
|
||||||
|
from openrouter_monitor.database import get_db
|
||||||
|
from openrouter_monitor.dependencies.auth import get_current_user
|
||||||
|
from openrouter_monitor.models import ApiToken, User
|
||||||
|
from openrouter_monitor.schemas.public_api import (
|
||||||
|
ApiTokenCreate,
|
||||||
|
ApiTokenCreateResponse,
|
||||||
|
ApiTokenResponse,
|
||||||
|
)
|
||||||
|
from openrouter_monitor.services.token import generate_api_token
|
||||||
|
|
||||||
|
router = APIRouter(prefix="/api/tokens", tags=["api-tokens"])
|
||||||
|
|
||||||
|
|
||||||
|
@router.post(
|
||||||
|
"",
|
||||||
|
response_model=ApiTokenCreateResponse,
|
||||||
|
status_code=status.HTTP_201_CREATED,
|
||||||
|
summary="Create new API token",
|
||||||
|
description="Generate a new API token for public API access. "
|
||||||
|
"The plaintext token is shown ONLY in this response.",
|
||||||
|
)
|
||||||
|
async def create_token(
|
||||||
|
token_data: ApiTokenCreate,
|
||||||
|
current_user: User = Depends(get_current_user),
|
||||||
|
db: Session = Depends(get_db),
|
||||||
|
):
|
||||||
|
"""Create a new API token.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
token_data: Token creation data (name)
|
||||||
|
current_user: Authenticated user from JWT
|
||||||
|
db: Database session
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
ApiTokenCreateResponse with plaintext token (shown only once!)
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
HTTPException: 400 if token limit reached
|
||||||
|
"""
|
||||||
|
settings = get_settings()
|
||||||
|
|
||||||
|
# Check token limit
|
||||||
|
token_count = db.query(ApiToken).filter(
|
||||||
|
ApiToken.user_id == current_user.id,
|
||||||
|
ApiToken.is_active == True
|
||||||
|
).count()
|
||||||
|
|
||||||
|
if token_count >= settings.max_api_tokens_per_user:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_400_BAD_REQUEST,
|
||||||
|
detail=f"Maximum number of API tokens ({settings.max_api_tokens_per_user}) reached. "
|
||||||
|
"Revoke an existing token before creating a new one."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Generate token (returns plaintext and hash)
|
||||||
|
plaintext_token, token_hash = generate_api_token()
|
||||||
|
|
||||||
|
# Create token record
|
||||||
|
db_token = ApiToken(
|
||||||
|
user_id=current_user.id,
|
||||||
|
token_hash=token_hash,
|
||||||
|
name=token_data.name,
|
||||||
|
is_active=True,
|
||||||
|
)
|
||||||
|
db.add(db_token)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(db_token)
|
||||||
|
|
||||||
|
# Return response with plaintext (shown only once!)
|
||||||
|
return ApiTokenCreateResponse(
|
||||||
|
id=db_token.id,
|
||||||
|
name=db_token.name,
|
||||||
|
token=plaintext_token,
|
||||||
|
created_at=db_token.created_at,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@router.get(
|
||||||
|
"",
|
||||||
|
response_model=List[ApiTokenResponse],
|
||||||
|
summary="List API tokens",
|
||||||
|
description="List all active API tokens for the current user. "
|
||||||
|
"Token values are NOT included for security.",
|
||||||
|
)
|
||||||
|
async def list_tokens(
|
||||||
|
current_user: User = Depends(get_current_user),
|
||||||
|
db: Session = Depends(get_db),
|
||||||
|
):
|
||||||
|
"""List all active API tokens for the current user.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
current_user: Authenticated user from JWT
|
||||||
|
db: Database session
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of ApiTokenResponse (without token values)
|
||||||
|
"""
|
||||||
|
tokens = db.query(ApiToken).filter(
|
||||||
|
ApiToken.user_id == current_user.id,
|
||||||
|
ApiToken.is_active == True
|
||||||
|
).order_by(ApiToken.created_at.desc()).all()
|
||||||
|
|
||||||
|
return [
|
||||||
|
ApiTokenResponse(
|
||||||
|
id=token.id,
|
||||||
|
name=token.name,
|
||||||
|
created_at=token.created_at,
|
||||||
|
last_used_at=token.last_used_at,
|
||||||
|
is_active=token.is_active,
|
||||||
|
)
|
||||||
|
for token in tokens
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@router.delete(
|
||||||
|
"/{token_id}",
|
||||||
|
status_code=status.HTTP_204_NO_CONTENT,
|
||||||
|
summary="Revoke API token",
|
||||||
|
description="Revoke (soft delete) an API token. The token cannot be used after revocation.",
|
||||||
|
)
|
||||||
|
async def revoke_token(
|
||||||
|
token_id: int,
|
||||||
|
current_user: User = Depends(get_current_user),
|
||||||
|
db: Session = Depends(get_db),
|
||||||
|
):
|
||||||
|
"""Revoke an API token (soft delete).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
token_id: ID of the token to revoke
|
||||||
|
current_user: Authenticated user from JWT
|
||||||
|
db: Database session
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
HTTPException: 404 if token not found, 403 if not owned by user
|
||||||
|
"""
|
||||||
|
# Find token (must be active and owned by current user)
|
||||||
|
token = db.query(ApiToken).filter(
|
||||||
|
ApiToken.id == token_id,
|
||||||
|
ApiToken.user_id == current_user.id,
|
||||||
|
ApiToken.is_active == True
|
||||||
|
).first()
|
||||||
|
|
||||||
|
if not token:
|
||||||
|
# Check if token exists but is inactive (already revoked)
|
||||||
|
inactive_token = db.query(ApiToken).filter(
|
||||||
|
ApiToken.id == token_id,
|
||||||
|
ApiToken.user_id == current_user.id,
|
||||||
|
ApiToken.is_active == False
|
||||||
|
).first()
|
||||||
|
|
||||||
|
if inactive_token:
|
||||||
|
# Token exists but is already revoked
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_404_NOT_FOUND,
|
||||||
|
detail="Token not found or already revoked"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check if token exists but belongs to another user
|
||||||
|
other_user_token = db.query(ApiToken).filter(
|
||||||
|
ApiToken.id == token_id,
|
||||||
|
ApiToken.is_active == True
|
||||||
|
).first()
|
||||||
|
|
||||||
|
if other_user_token:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_403_FORBIDDEN,
|
||||||
|
detail="You don't have permission to revoke this token"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Token doesn't exist at all
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_404_NOT_FOUND,
|
||||||
|
detail="Token not found"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Soft delete: set is_active to False
|
||||||
|
token.is_active = False
|
||||||
|
db.commit()
|
||||||
|
|
||||||
|
return None # 204 No Content
|
||||||
625
tests/unit/routers/test_tokens.py
Normal file
625
tests/unit/routers/test_tokens.py
Normal file
@@ -0,0 +1,625 @@
|
|||||||
|
"""Tests for API tokens router.
|
||||||
|
|
||||||
|
T41: POST /api/tokens - Generate API token
|
||||||
|
T42: GET /api/tokens - List API tokens
|
||||||
|
T43: DELETE /api/tokens/{id} - Revoke API token
|
||||||
|
"""
|
||||||
|
import pytest
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from fastapi.testclient import TestClient
|
||||||
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
|
from openrouter_monitor.models import User, ApiToken
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# Fixtures
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def test_user_with_auth(client, db_session):
|
||||||
|
"""Create a test user and return user + auth headers.
|
||||||
|
|
||||||
|
Returns tuple (user, auth_headers)
|
||||||
|
"""
|
||||||
|
from openrouter_monitor.services.password import hash_password
|
||||||
|
from openrouter_monitor.services.jwt import create_access_token
|
||||||
|
|
||||||
|
# Create user directly in database
|
||||||
|
user = User(
|
||||||
|
email="tokentest@example.com",
|
||||||
|
password_hash=hash_password("TestPassword123!"),
|
||||||
|
is_active=True
|
||||||
|
)
|
||||||
|
db_session.add(user)
|
||||||
|
db_session.commit()
|
||||||
|
db_session.refresh(user)
|
||||||
|
|
||||||
|
# Create JWT token
|
||||||
|
token = create_access_token(data={"sub": str(user.id)})
|
||||||
|
headers = {"Authorization": f"Bearer {token}"}
|
||||||
|
|
||||||
|
return user, headers
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def test_user(test_user_with_auth):
|
||||||
|
"""Get test user from fixture."""
|
||||||
|
return test_user_with_auth[0]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def auth_headers(test_user_with_auth):
|
||||||
|
"""Get auth headers from fixture."""
|
||||||
|
return test_user_with_auth[1]
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# T41: POST /api/tokens - Generate API Token
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
@pytest.mark.unit
|
||||||
|
class TestCreateToken:
|
||||||
|
"""Test POST /api/tokens endpoint (T41)."""
|
||||||
|
|
||||||
|
def test_create_token_success_returns_201_and_token(
|
||||||
|
self, client: TestClient, auth_headers: dict, db_session: Session
|
||||||
|
):
|
||||||
|
"""Test successful token creation returns 201 with plaintext token."""
|
||||||
|
# Arrange
|
||||||
|
token_data = {"name": "Test Token"}
|
||||||
|
|
||||||
|
# Act
|
||||||
|
response = client.post("/api/tokens", json=token_data, headers=auth_headers)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert response.status_code == 201
|
||||||
|
data = response.json()
|
||||||
|
assert "id" in data
|
||||||
|
assert data["name"] == "Test Token"
|
||||||
|
assert "token" in data
|
||||||
|
assert data["token"].startswith("or_api_") # Token format check
|
||||||
|
assert "created_at" in data
|
||||||
|
|
||||||
|
def test_create_token_saves_hash_not_plaintext(
|
||||||
|
self, client: TestClient, auth_headers: dict, db_session: Session
|
||||||
|
):
|
||||||
|
"""Test that only hash is saved to database, not plaintext."""
|
||||||
|
# Arrange
|
||||||
|
token_data = {"name": "Secure Token"}
|
||||||
|
|
||||||
|
# Act
|
||||||
|
response = client.post("/api/tokens", json=token_data, headers=auth_headers)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert response.status_code == 201
|
||||||
|
data = response.json()
|
||||||
|
plaintext_token = data["token"]
|
||||||
|
token_id = data["id"]
|
||||||
|
|
||||||
|
# Verify in database: token_hash should NOT match plaintext
|
||||||
|
db_token = db_session.query(ApiToken).filter(ApiToken.id == token_id).first()
|
||||||
|
assert db_token is not None
|
||||||
|
assert db_token.token_hash != plaintext_token
|
||||||
|
assert len(db_token.token_hash) == 64 # SHA-256 hex is 64 chars
|
||||||
|
|
||||||
|
def test_create_token_without_auth_returns_401(
|
||||||
|
self, client: TestClient
|
||||||
|
):
|
||||||
|
"""Test that token creation without auth returns 401."""
|
||||||
|
# Arrange
|
||||||
|
token_data = {"name": "Test Token"}
|
||||||
|
|
||||||
|
# Act
|
||||||
|
response = client.post("/api/tokens", json=token_data)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert response.status_code == 401
|
||||||
|
|
||||||
|
def test_create_token_empty_name_returns_422(
|
||||||
|
self, client: TestClient, auth_headers: dict
|
||||||
|
):
|
||||||
|
"""Test that empty name returns validation error 422."""
|
||||||
|
# Arrange
|
||||||
|
token_data = {"name": ""}
|
||||||
|
|
||||||
|
# Act
|
||||||
|
response = client.post("/api/tokens", json=token_data, headers=auth_headers)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert response.status_code == 422
|
||||||
|
|
||||||
|
def test_create_token_name_too_long_returns_422(
|
||||||
|
self, client: TestClient, auth_headers: dict
|
||||||
|
):
|
||||||
|
"""Test that name > 100 chars returns validation error 422."""
|
||||||
|
# Arrange
|
||||||
|
token_data = {"name": "x" * 101}
|
||||||
|
|
||||||
|
# Act
|
||||||
|
response = client.post("/api/tokens", json=token_data, headers=auth_headers)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert response.status_code == 422
|
||||||
|
|
||||||
|
def test_create_token_exceeds_limit_returns_400(
|
||||||
|
self, client: TestClient, auth_headers: dict, test_user: User, db_session: Session
|
||||||
|
):
|
||||||
|
"""Test that creating token when limit reached returns 400."""
|
||||||
|
# Arrange: Create max tokens (5 by default)
|
||||||
|
from openrouter_monitor.services.token import generate_api_token
|
||||||
|
for i in range(5):
|
||||||
|
_, token_hash = generate_api_token()
|
||||||
|
token = ApiToken(
|
||||||
|
user_id=test_user.id,
|
||||||
|
token_hash=token_hash,
|
||||||
|
name=f"Existing Token {i}",
|
||||||
|
is_active=True
|
||||||
|
)
|
||||||
|
db_session.add(token)
|
||||||
|
db_session.commit()
|
||||||
|
|
||||||
|
# Act: Try to create 6th token
|
||||||
|
token_data = {"name": "One Too Many"}
|
||||||
|
response = client.post("/api/tokens", json=token_data, headers=auth_headers)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert response.status_code == 400
|
||||||
|
assert "limit" in response.json()["detail"].lower() or "maximum" in response.json()["detail"].lower()
|
||||||
|
|
||||||
|
def test_create_token_associated_with_current_user(
|
||||||
|
self, client: TestClient, auth_headers: dict, test_user: User, db_session: Session
|
||||||
|
):
|
||||||
|
"""Test that created token is associated with authenticated user."""
|
||||||
|
# Arrange
|
||||||
|
token_data = {"name": "My Token"}
|
||||||
|
|
||||||
|
# Act
|
||||||
|
response = client.post("/api/tokens", json=token_data, headers=auth_headers)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert response.status_code == 201
|
||||||
|
data = response.json()
|
||||||
|
|
||||||
|
# Verify ownership
|
||||||
|
db_token = db_session.query(ApiToken).filter(ApiToken.id == data["id"]).first()
|
||||||
|
assert db_token.user_id == test_user.id
|
||||||
|
|
||||||
|
def test_create_token_sets_is_active_true(
|
||||||
|
self, client: TestClient, auth_headers: dict, db_session: Session
|
||||||
|
):
|
||||||
|
"""Test that created token has is_active=True by default."""
|
||||||
|
# Arrange
|
||||||
|
token_data = {"name": "Active Token"}
|
||||||
|
|
||||||
|
# Act
|
||||||
|
response = client.post("/api/tokens", json=token_data, headers=auth_headers)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert response.status_code == 201
|
||||||
|
data = response.json()
|
||||||
|
|
||||||
|
db_token = db_session.query(ApiToken).filter(ApiToken.id == data["id"]).first()
|
||||||
|
assert db_token.is_active is True
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# T42: GET /api/tokens - List API Tokens
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
@pytest.mark.unit
|
||||||
|
class TestListTokens:
|
||||||
|
"""Test GET /api/tokens endpoint (T42)."""
|
||||||
|
|
||||||
|
def test_list_tokens_empty_returns_empty_list(
|
||||||
|
self, client: TestClient, auth_headers: dict
|
||||||
|
):
|
||||||
|
"""Test listing tokens when user has no tokens returns empty list."""
|
||||||
|
# Act
|
||||||
|
response = client.get("/api/tokens", headers=auth_headers)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert response.status_code == 200
|
||||||
|
data = response.json()
|
||||||
|
assert data == []
|
||||||
|
|
||||||
|
def test_list_tokens_returns_user_tokens(
|
||||||
|
self, client: TestClient, auth_headers: dict, test_user: User, db_session: Session
|
||||||
|
):
|
||||||
|
"""Test listing returns only current user's tokens."""
|
||||||
|
# Arrange: Create tokens for user
|
||||||
|
from openrouter_monitor.services.token import generate_api_token
|
||||||
|
for i in range(3):
|
||||||
|
_, token_hash = generate_api_token()
|
||||||
|
token = ApiToken(
|
||||||
|
user_id=test_user.id,
|
||||||
|
token_hash=token_hash,
|
||||||
|
name=f"Token {i}",
|
||||||
|
is_active=True
|
||||||
|
)
|
||||||
|
db_session.add(token)
|
||||||
|
db_session.commit()
|
||||||
|
|
||||||
|
# Act
|
||||||
|
response = client.get("/api/tokens", headers=auth_headers)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert response.status_code == 200
|
||||||
|
data = response.json()
|
||||||
|
assert len(data) == 3
|
||||||
|
for token in data:
|
||||||
|
assert "id" in token
|
||||||
|
assert "name" in token
|
||||||
|
assert "created_at" in token
|
||||||
|
assert "is_active" in token
|
||||||
|
|
||||||
|
def test_list_tokens_does_not_include_token_values(
|
||||||
|
self, client: TestClient, auth_headers: dict, test_user: User, db_session: Session
|
||||||
|
):
|
||||||
|
"""CRITICAL SECURITY TEST: Token values must NOT be in response."""
|
||||||
|
# Arrange: Create a token
|
||||||
|
from openrouter_monitor.services.token import generate_api_token
|
||||||
|
_, token_hash = generate_api_token()
|
||||||
|
token = ApiToken(
|
||||||
|
user_id=test_user.id,
|
||||||
|
token_hash=token_hash,
|
||||||
|
name="Secret Token",
|
||||||
|
is_active=True
|
||||||
|
)
|
||||||
|
db_session.add(token)
|
||||||
|
db_session.commit()
|
||||||
|
|
||||||
|
# Act
|
||||||
|
response = client.get("/api/tokens", headers=auth_headers)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert response.status_code == 200
|
||||||
|
data = response.json()
|
||||||
|
assert len(data) == 1
|
||||||
|
|
||||||
|
# Security check: NO token field should be present
|
||||||
|
assert "token" not in data[0]
|
||||||
|
assert "token_hash" not in data[0]
|
||||||
|
|
||||||
|
# Even hash should not be returned
|
||||||
|
assert token_hash not in str(data)
|
||||||
|
|
||||||
|
def test_list_tokens_ordered_by_created_at_desc(
|
||||||
|
self, client: TestClient, auth_headers: dict, test_user: User, db_session: Session
|
||||||
|
):
|
||||||
|
"""Test tokens are ordered by created_at DESC (newest first)."""
|
||||||
|
# Arrange: Create tokens with different timestamps
|
||||||
|
from openrouter_monitor.services.token import generate_api_token
|
||||||
|
base_time = datetime.utcnow()
|
||||||
|
|
||||||
|
for i in range(3):
|
||||||
|
_, token_hash = generate_api_token()
|
||||||
|
token = ApiToken(
|
||||||
|
user_id=test_user.id,
|
||||||
|
token_hash=token_hash,
|
||||||
|
name=f"Token {i}",
|
||||||
|
created_at=base_time - timedelta(days=i),
|
||||||
|
is_active=True
|
||||||
|
)
|
||||||
|
db_session.add(token)
|
||||||
|
db_session.commit()
|
||||||
|
|
||||||
|
# Act
|
||||||
|
response = client.get("/api/tokens", headers=auth_headers)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert response.status_code == 200
|
||||||
|
data = response.json()
|
||||||
|
assert len(data) == 3
|
||||||
|
|
||||||
|
# Check ordering: newest first
|
||||||
|
assert data[0]["name"] == "Token 0" # Created now
|
||||||
|
assert data[1]["name"] == "Token 1" # Created 1 day ago
|
||||||
|
assert data[2]["name"] == "Token 2" # Created 2 days ago
|
||||||
|
|
||||||
|
def test_list_tokens_includes_last_used_at(
|
||||||
|
self, client: TestClient, auth_headers: dict, test_user: User, db_session: Session
|
||||||
|
):
|
||||||
|
"""Test response includes last_used_at field."""
|
||||||
|
# Arrange: Create token with last_used_at
|
||||||
|
from openrouter_monitor.services.token import generate_api_token
|
||||||
|
_, token_hash = generate_api_token()
|
||||||
|
token = ApiToken(
|
||||||
|
user_id=test_user.id,
|
||||||
|
token_hash=token_hash,
|
||||||
|
name="Used Token",
|
||||||
|
last_used_at=datetime.utcnow(),
|
||||||
|
is_active=True
|
||||||
|
)
|
||||||
|
db_session.add(token)
|
||||||
|
db_session.commit()
|
||||||
|
|
||||||
|
# Act
|
||||||
|
response = client.get("/api/tokens", headers=auth_headers)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert response.status_code == 200
|
||||||
|
data = response.json()
|
||||||
|
assert len(data) == 1
|
||||||
|
assert "last_used_at" in data[0]
|
||||||
|
assert data[0]["last_used_at"] is not None
|
||||||
|
|
||||||
|
def test_list_tokens_returns_only_active_tokens(
|
||||||
|
self, client: TestClient, auth_headers: dict, test_user: User, db_session: Session
|
||||||
|
):
|
||||||
|
"""Test that only active tokens are returned (soft delete)."""
|
||||||
|
# Arrange: Create active and inactive tokens
|
||||||
|
from openrouter_monitor.services.token import generate_api_token
|
||||||
|
|
||||||
|
# Active token
|
||||||
|
_, hash1 = generate_api_token()
|
||||||
|
token1 = ApiToken(
|
||||||
|
user_id=test_user.id,
|
||||||
|
token_hash=hash1,
|
||||||
|
name="Active Token",
|
||||||
|
is_active=True
|
||||||
|
)
|
||||||
|
db_session.add(token1)
|
||||||
|
|
||||||
|
# Inactive token (revoked)
|
||||||
|
_, hash2 = generate_api_token()
|
||||||
|
token2 = ApiToken(
|
||||||
|
user_id=test_user.id,
|
||||||
|
token_hash=hash2,
|
||||||
|
name="Revoked Token",
|
||||||
|
is_active=False
|
||||||
|
)
|
||||||
|
db_session.add(token2)
|
||||||
|
db_session.commit()
|
||||||
|
|
||||||
|
# Act
|
||||||
|
response = client.get("/api/tokens", headers=auth_headers)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert response.status_code == 200
|
||||||
|
data = response.json()
|
||||||
|
assert len(data) == 1
|
||||||
|
assert data[0]["name"] == "Active Token"
|
||||||
|
|
||||||
|
def test_list_tokens_without_auth_returns_401(
|
||||||
|
self, client: TestClient
|
||||||
|
):
|
||||||
|
"""Test that listing tokens without auth returns 401."""
|
||||||
|
# Act
|
||||||
|
response = client.get("/api/tokens")
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert response.status_code == 401
|
||||||
|
|
||||||
|
def test_list_tokens_does_not_show_other_users_tokens(
|
||||||
|
self, client: TestClient, auth_headers: dict, db_session: Session
|
||||||
|
):
|
||||||
|
"""Test that user can only see their own tokens."""
|
||||||
|
# Arrange: Create another user and their token
|
||||||
|
from openrouter_monitor.services.password import hash_password
|
||||||
|
from openrouter_monitor.services.token import generate_api_token
|
||||||
|
|
||||||
|
other_user = User(
|
||||||
|
email="other@example.com",
|
||||||
|
password_hash=hash_password("OtherPass123!"),
|
||||||
|
is_active=True
|
||||||
|
)
|
||||||
|
db_session.add(other_user)
|
||||||
|
db_session.commit()
|
||||||
|
|
||||||
|
_, token_hash = generate_api_token()
|
||||||
|
other_token = ApiToken(
|
||||||
|
user_id=other_user.id,
|
||||||
|
token_hash=token_hash,
|
||||||
|
name="Other User's Token",
|
||||||
|
is_active=True
|
||||||
|
)
|
||||||
|
db_session.add(other_token)
|
||||||
|
db_session.commit()
|
||||||
|
|
||||||
|
# Act
|
||||||
|
response = client.get("/api/tokens", headers=auth_headers)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert response.status_code == 200
|
||||||
|
data = response.json()
|
||||||
|
assert len(data) == 0 # Should not see other user's token
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# T43: DELETE /api/tokens/{id} - Revoke API Token
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
@pytest.mark.unit
|
||||||
|
class TestRevokeToken:
|
||||||
|
"""Test DELETE /api/tokens/{id} endpoint (T43)."""
|
||||||
|
|
||||||
|
def test_revoke_token_success_returns_204(
|
||||||
|
self, client: TestClient, auth_headers: dict, test_user: User, db_session: Session
|
||||||
|
):
|
||||||
|
"""Test successful revocation returns 204 No Content."""
|
||||||
|
# Arrange: Create a token
|
||||||
|
from openrouter_monitor.services.token import generate_api_token
|
||||||
|
_, token_hash = generate_api_token()
|
||||||
|
token = ApiToken(
|
||||||
|
user_id=test_user.id,
|
||||||
|
token_hash=token_hash,
|
||||||
|
name="Token to Revoke",
|
||||||
|
is_active=True
|
||||||
|
)
|
||||||
|
db_session.add(token)
|
||||||
|
db_session.commit()
|
||||||
|
|
||||||
|
# Act
|
||||||
|
response = client.delete(f"/api/tokens/{token.id}", headers=auth_headers)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert response.status_code == 204
|
||||||
|
assert response.content == b"" # No content
|
||||||
|
|
||||||
|
def test_revoke_token_sets_is_active_false(
|
||||||
|
self, client: TestClient, auth_headers: dict, test_user: User, db_session: Session
|
||||||
|
):
|
||||||
|
"""Test that revocation sets is_active=False (soft delete)."""
|
||||||
|
# Arrange
|
||||||
|
from openrouter_monitor.services.token import generate_api_token
|
||||||
|
_, token_hash = generate_api_token()
|
||||||
|
token = ApiToken(
|
||||||
|
user_id=test_user.id,
|
||||||
|
token_hash=token_hash,
|
||||||
|
name="Token to Revoke",
|
||||||
|
is_active=True
|
||||||
|
)
|
||||||
|
db_session.add(token)
|
||||||
|
db_session.commit()
|
||||||
|
|
||||||
|
# Act
|
||||||
|
response = client.delete(f"/api/tokens/{token.id}", headers=auth_headers)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert response.status_code == 204
|
||||||
|
|
||||||
|
# Verify in database
|
||||||
|
db_session.refresh(token)
|
||||||
|
assert token.is_active is False
|
||||||
|
|
||||||
|
def test_revoke_token_does_not_delete_from_db(
|
||||||
|
self, client: TestClient, auth_headers: dict, test_user: User, db_session: Session
|
||||||
|
):
|
||||||
|
"""Test that token is NOT deleted from database (soft delete)."""
|
||||||
|
# Arrange
|
||||||
|
from openrouter_monitor.services.token import generate_api_token
|
||||||
|
_, token_hash = generate_api_token()
|
||||||
|
token = ApiToken(
|
||||||
|
user_id=test_user.id,
|
||||||
|
token_hash=token_hash,
|
||||||
|
name="Token to Revoke",
|
||||||
|
is_active=True
|
||||||
|
)
|
||||||
|
db_session.add(token)
|
||||||
|
db_session.commit()
|
||||||
|
db_session.refresh(token)
|
||||||
|
token_id = token.id
|
||||||
|
|
||||||
|
# Act
|
||||||
|
response = client.delete(f"/api/tokens/{token.id}", headers=auth_headers)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert response.status_code == 204
|
||||||
|
|
||||||
|
# Verify token still exists in DB - use a fresh query
|
||||||
|
db_session.expire_all()
|
||||||
|
db_token = db_session.query(ApiToken).filter(ApiToken.id == token_id).first()
|
||||||
|
assert db_token is not None
|
||||||
|
assert db_token.is_active is False
|
||||||
|
|
||||||
|
def test_revoke_token_not_found_returns_404(
|
||||||
|
self, client: TestClient, auth_headers: dict
|
||||||
|
):
|
||||||
|
"""Test revoking non-existent token returns 404."""
|
||||||
|
# Act
|
||||||
|
response = client.delete("/api/tokens/99999", headers=auth_headers)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert response.status_code == 404
|
||||||
|
|
||||||
|
def test_revoke_other_users_token_returns_403(
|
||||||
|
self, client: TestClient, auth_headers: dict, db_session: Session
|
||||||
|
):
|
||||||
|
"""Test revoking another user's token returns 403 Forbidden."""
|
||||||
|
# Arrange: Create another user and their token
|
||||||
|
from openrouter_monitor.services.password import hash_password
|
||||||
|
from openrouter_monitor.services.token import generate_api_token
|
||||||
|
|
||||||
|
other_user = User(
|
||||||
|
email="other@example.com",
|
||||||
|
password_hash=hash_password("OtherPass123!"),
|
||||||
|
is_active=True
|
||||||
|
)
|
||||||
|
db_session.add(other_user)
|
||||||
|
db_session.commit()
|
||||||
|
|
||||||
|
_, token_hash = generate_api_token()
|
||||||
|
other_token = ApiToken(
|
||||||
|
user_id=other_user.id,
|
||||||
|
token_hash=token_hash,
|
||||||
|
name="Other User's Token",
|
||||||
|
is_active=True
|
||||||
|
)
|
||||||
|
db_session.add(other_token)
|
||||||
|
db_session.commit()
|
||||||
|
|
||||||
|
# Act: Try to revoke other user's token
|
||||||
|
response = client.delete(f"/api/tokens/{other_token.id}", headers=auth_headers)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert response.status_code == 403
|
||||||
|
|
||||||
|
def test_revoke_token_without_auth_returns_401(
|
||||||
|
self, client: TestClient
|
||||||
|
):
|
||||||
|
"""Test revoking token without auth returns 401."""
|
||||||
|
# Act
|
||||||
|
response = client.delete("/api/tokens/1")
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert response.status_code == 401
|
||||||
|
|
||||||
|
def test_revoked_token_cannot_be_used_for_public_api(
|
||||||
|
self, client: TestClient, auth_headers: dict, test_user: User, db_session: Session
|
||||||
|
):
|
||||||
|
"""INTEGRATION TEST: Revoked token should not work on public API."""
|
||||||
|
# Arrange: Create and revoke a token
|
||||||
|
from openrouter_monitor.services.token import generate_api_token
|
||||||
|
plaintext, token_hash = generate_api_token()
|
||||||
|
token = ApiToken(
|
||||||
|
user_id=test_user.id,
|
||||||
|
token_hash=token_hash,
|
||||||
|
name="Token to Revoke",
|
||||||
|
is_active=True
|
||||||
|
)
|
||||||
|
db_session.add(token)
|
||||||
|
db_session.commit()
|
||||||
|
|
||||||
|
# Verify token works before revocation
|
||||||
|
api_response = client.get(
|
||||||
|
"/api/v1/stats",
|
||||||
|
headers={"Authorization": f"Bearer {plaintext}"}
|
||||||
|
)
|
||||||
|
# Token should be valid (even if returns empty/no data)
|
||||||
|
assert api_response.status_code != 401
|
||||||
|
|
||||||
|
# Revoke the token
|
||||||
|
revoke_response = client.delete(f"/api/tokens/{token.id}", headers=auth_headers)
|
||||||
|
assert revoke_response.status_code == 204
|
||||||
|
|
||||||
|
# Act: Try to use revoked token
|
||||||
|
api_response = client.get(
|
||||||
|
"/api/v1/stats",
|
||||||
|
headers={"Authorization": f"Bearer {plaintext}"}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Assert: Should return 401 Unauthorized
|
||||||
|
assert api_response.status_code == 401
|
||||||
|
|
||||||
|
def test_revoke_already_revoked_token_returns_404(
|
||||||
|
self, client: TestClient, auth_headers: dict, test_user: User, db_session: Session
|
||||||
|
):
|
||||||
|
"""Test revoking an already revoked token returns 404."""
|
||||||
|
# Arrange: Create a revoked token
|
||||||
|
from openrouter_monitor.services.token import generate_api_token
|
||||||
|
_, token_hash = generate_api_token()
|
||||||
|
token = ApiToken(
|
||||||
|
user_id=test_user.id,
|
||||||
|
token_hash=token_hash,
|
||||||
|
name="Already Revoked",
|
||||||
|
is_active=False # Already revoked
|
||||||
|
)
|
||||||
|
db_session.add(token)
|
||||||
|
db_session.commit()
|
||||||
|
|
||||||
|
# Act: Try to revoke again
|
||||||
|
response = client.delete(f"/api/tokens/{token.id}", headers=auth_headers)
|
||||||
|
|
||||||
|
# Assert: Should return 404 (token not found as active)
|
||||||
|
assert response.status_code == 404
|
||||||
Reference in New Issue
Block a user