From ccd96acaacc1b4f754cd6df010d1f12eb8fa6622 Mon Sep 17 00:00:00 2001 From: Luca Sacchi Ricciardi Date: Tue, 7 Apr 2026 18:02:20 +0200 Subject: [PATCH] feat(frontend): T46 configure HTMX and CSRF protection - Add CSRFMiddleware for form protection - Implement token generation and validation - Add CSRF meta tag to base.html - Create tests for CSRF protection Tests: 13 passing --- export/progress.md | 29 ++-- src/openrouter_monitor/main.py | 6 +- src/openrouter_monitor/middleware/csrf.py | 132 ++++++++++++++ templates/base.html | 4 +- tests/unit/middleware/test_csrf.py | 200 ++++++++++++++++++++++ 5 files changed, 355 insertions(+), 16 deletions(-) create mode 100644 src/openrouter_monitor/middleware/csrf.py create mode 100644 tests/unit/middleware/test_csrf.py diff --git a/export/progress.md b/export/progress.md index ae22dee..b800cbc 100644 --- a/export/progress.md +++ b/export/progress.md @@ -147,22 +147,27 @@ - Token revocato non funziona su API pubblica - Test: 9 test passanti -### 🎨 Frontend Web (T44-T54) - 1/11 completati -- [x] T44: Setup Jinja2 templates e static files ✅ Completato (2026-04-07 16:00, commit: T44) +### 🎨 Frontend Web (T44-T54) - 3/11 completati +- [x] T44: Setup Jinja2 templates e static files ✅ Completato (2026-04-07 16:00, commit: c1f47c8) - Static files mounted on /static - Jinja2Templates configured - Directory structure created - All 12 tests passing -- [ ] T45: Creare base.html (layout principale) 🟡 In progress -- [ ] T46: Creare login.html -- [ ] T47: Creare register.html -- [ ] T48: Implementare router /login (GET/POST) -- [ ] T49: Implementare router /register (GET/POST) -- [ ] T50: Creare dashboard.html -- [ ] T51: Implementare router /dashboard -- [ ] T52: Creare keys.html -- [ ] T53: Implementare router /keys -- [ ] T54: Aggiungere HTMX per azioni CRUD +- [x] T45: Creare base.html (layout principale) ✅ Completato (con T44) + - Base template con Pico.css, HTMX, Chart.js + - Components: navbar, footer +- [x] T46: HTMX e CSRF Protection ✅ Completato (2026-04-07 16:30) + - CSRFMiddleware con validazione token + - Meta tag CSRF in base.html + - 13 tests passing +- [ ] T47: Pagina Login 🟡 In progress +- [ ] T48: Pagina Registrazione +- [ ] T49: Logout +- [ ] T50: Dashboard +- [ ] T51: Gestione API Keys +- [ ] T52: Statistiche Dettagliate +- [ ] T53: Gestione Token API +- [ ] T54: Profilo Utente ### ⚙️ Background Tasks (T55-T58) - 4/4 completati ✅ - [x] T55: Configurare APScheduler - ✅ Completato (2026-04-07 20:30) diff --git a/src/openrouter_monitor/main.py b/src/openrouter_monitor/main.py index 2205223..46b9271 100644 --- a/src/openrouter_monitor/main.py +++ b/src/openrouter_monitor/main.py @@ -11,6 +11,7 @@ from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from openrouter_monitor.config import get_settings +from openrouter_monitor.middleware.csrf import CSRFMiddleware from openrouter_monitor.routers import api_keys from openrouter_monitor.routers import auth from openrouter_monitor.routers import public_api @@ -50,9 +51,12 @@ app = FastAPI( lifespan=lifespan, ) -# Mount static files +# Mount static files (before CSRF middleware to allow access without token) app.mount("/static", StaticFiles(directory=str(PROJECT_ROOT / "static")), name="static") +# CSRF protection middleware +app.add_middleware(CSRFMiddleware) + # CORS middleware app.add_middleware( CORSMiddleware, diff --git a/src/openrouter_monitor/middleware/csrf.py b/src/openrouter_monitor/middleware/csrf.py new file mode 100644 index 0000000..5b2c099 --- /dev/null +++ b/src/openrouter_monitor/middleware/csrf.py @@ -0,0 +1,132 @@ +"""CSRF Protection Middleware. + +Provides CSRF token generation and validation for form submissions. +""" +import secrets +from typing import Optional + +from fastapi import Request, Response +from starlette.middleware.base import BaseHTTPMiddleware + + +class CSRFMiddleware(BaseHTTPMiddleware): + """Middleware for CSRF protection. + + Generates CSRF tokens for sessions and validates them on + state-changing requests (POST, PUT, DELETE, PATCH). + """ + + CSRF_TOKEN_NAME = "csrf_token" + CSRF_HEADER_NAME = "X-CSRF-Token" + SAFE_METHODS = {"GET", "HEAD", "OPTIONS", "TRACE"} + + def __init__(self, app, cookie_name: str = "csrf_token", cookie_secure: bool = False): + super().__init__(app) + self.cookie_name = cookie_name + self.cookie_secure = cookie_secure + + async def dispatch(self, request: Request, call_next): + """Process request and validate CSRF token if needed. + + Args: + request: The incoming request + call_next: Next middleware/handler in chain + + Returns: + Response from next handler + """ + # Generate or retrieve CSRF token + csrf_token = self._get_or_create_token(request) + + # Validate token on state-changing requests + if request.method not in self.SAFE_METHODS: + is_valid = await self._validate_token(request, csrf_token) + if not is_valid: + from fastapi.responses import JSONResponse + return JSONResponse( + status_code=403, + content={"detail": "CSRF token missing or invalid"} + ) + + # Store token in request state for templates + request.state.csrf_token = csrf_token + + # Process request + response = await call_next(request) + + # Set CSRF cookie + response.set_cookie( + key=self.cookie_name, + value=csrf_token, + httponly=False, # Must be accessible by JavaScript + secure=self.cookie_secure, + samesite="lax", + max_age=3600 * 24 * 7, # 7 days + ) + + return response + + def _get_or_create_token(self, request: Request) -> str: + """Get existing token from cookie or create new one. + + Args: + request: The incoming request + + Returns: + CSRF token string + """ + # Try to get from cookie + token = request.cookies.get(self.cookie_name) + if token: + return token + + # Generate new token + return secrets.token_urlsafe(32) + + async def _validate_token(self, request: Request, expected_token: str) -> bool: + """Validate CSRF token from request. + + Checks header first, then form data. + + Args: + request: The incoming request + expected_token: Expected token value + + Returns: + True if token is valid, False otherwise + """ + # Check header first (for HTMX/ajax requests) + token = request.headers.get(self.CSRF_HEADER_NAME) + + # If not in header, check form data + if not token: + try: + # Parse form data from request body + body = await request.body() + if body: + from urllib.parse import parse_qs + form_data = parse_qs(body.decode('utf-8')) + if b'csrf_token' in form_data: + token = form_data[b'csrf_token'][0] + except Exception: + pass + + # Validate token + if not token: + return False + + return secrets.compare_digest(token, expected_token) + + +def get_csrf_token(request: Request) -> Optional[str]: + """Get CSRF token from request state. + + Use this in route handlers to pass token to templates. + + Args: + request: The current request + + Returns: + CSRF token or None + """ + return getattr(request.state, "csrf_token", None) diff --git a/templates/base.html b/templates/base.html index 4df1173..845c325 100644 --- a/templates/base.html +++ b/templates/base.html @@ -4,9 +4,7 @@ - {% if csrf_token %} - - {% endif %} + {% block title %}OpenRouter Monitor{% endblock %} diff --git a/tests/unit/middleware/test_csrf.py b/tests/unit/middleware/test_csrf.py new file mode 100644 index 0000000..85c6c3d --- /dev/null +++ b/tests/unit/middleware/test_csrf.py @@ -0,0 +1,200 @@ +"""Tests for CSRF Protection Middleware. + +TDD: RED → GREEN → REFACTOR +""" +import pytest +from fastapi import FastAPI, Request +from fastapi.responses import JSONResponse +from fastapi.testclient import TestClient + +from openrouter_monitor.middleware.csrf import CSRFMiddleware, get_csrf_token + + +class TestCSRFMiddleware: + """Test CSRF middleware functionality.""" + + @pytest.fixture + def app_with_csrf(self): + """Create FastAPI app with CSRF middleware.""" + app = FastAPI() + app.add_middleware(CSRFMiddleware) + + @app.get("/test") + async def test_get(request: Request): + return {"csrf_token": get_csrf_token(request)} + + @app.post("/test") + async def test_post(request: Request): + return {"message": "success"} + + @app.put("/test") + async def test_put(request: Request): + return {"message": "success"} + + @app.delete("/test") + async def test_delete(request: Request): + return {"message": "success"} + + return app + + def test_csrf_cookie_set_on_get_request(self, app_with_csrf): + """Test that CSRF cookie is set on GET request.""" + client = TestClient(app_with_csrf) + response = client.get("/test") + + assert response.status_code == 200 + assert "csrf_token" in response.cookies + assert len(response.cookies["csrf_token"]) > 0 + + def test_csrf_token_in_request_state(self, app_with_csrf): + """Test that CSRF token is available in request state.""" + client = TestClient(app_with_csrf) + response = client.get("/test") + + assert response.status_code == 200 + assert "csrf_token" in response.json() + assert response.json()["csrf_token"] == response.cookies["csrf_token"] + + def test_post_without_csrf_token_fails(self, app_with_csrf): + """Test that POST without CSRF token returns 403.""" + client = TestClient(app_with_csrf) + response = client.post("/test") + + assert response.status_code == 403 + assert "CSRF" in response.json()["detail"] + + def test_post_with_csrf_header_succeeds(self, app_with_csrf): + """Test that POST with CSRF header succeeds.""" + client = TestClient(app_with_csrf) + + # First get a CSRF token + get_response = client.get("/test") + csrf_token = get_response.cookies["csrf_token"] + + # Use token in POST request + response = client.post( + "/test", + headers={"X-CSRF-Token": csrf_token} + ) + + assert response.status_code == 200 + assert response.json()["message"] == "success" + + def test_put_without_csrf_token_fails(self, app_with_csrf): + """Test that PUT without CSRF token returns 403.""" + client = TestClient(app_with_csrf) + response = client.put("/test") + + assert response.status_code == 403 + + def test_put_with_csrf_header_succeeds(self, app_with_csrf): + """Test that PUT with CSRF header succeeds.""" + client = TestClient(app_with_csrf) + + # Get CSRF token + get_response = client.get("/test") + csrf_token = get_response.cookies["csrf_token"] + + response = client.put( + "/test", + headers={"X-CSRF-Token": csrf_token} + ) + + assert response.status_code == 200 + + def test_delete_without_csrf_token_fails(self, app_with_csrf): + """Test that DELETE without CSRF token returns 403.""" + client = TestClient(app_with_csrf) + response = client.delete("/test") + + assert response.status_code == 403 + + def test_delete_with_csrf_header_succeeds(self, app_with_csrf): + """Test that DELETE with CSRF header succeeds.""" + client = TestClient(app_with_csrf) + + # Get CSRF token + get_response = client.get("/test") + csrf_token = get_response.cookies["csrf_token"] + + response = client.delete( + "/test", + headers={"X-CSRF-Token": csrf_token} + ) + + assert response.status_code == 200 + + def test_safe_methods_without_csrf_succeed(self, app_with_csrf): + """Test that GET, HEAD, OPTIONS work without CSRF token.""" + client = TestClient(app_with_csrf) + + response = client.get("/test") + assert response.status_code == 200 + + def test_invalid_csrf_token_fails(self, app_with_csrf): + """Test that invalid CSRF token returns 403.""" + client = TestClient(app_with_csrf) + + response = client.post( + "/test", + headers={"X-CSRF-Token": "invalid-token"} + ) + + assert response.status_code == 403 + + def test_csrf_token_persists_across_requests(self, app_with_csrf): + """Test that CSRF token persists across requests.""" + client = TestClient(app_with_csrf) + + # First request + response1 = client.get("/test") + token1 = response1.cookies["csrf_token"] + + # Second request + response2 = client.get("/test") + token2 = response2.cookies["csrf_token"] + + # Tokens should be the same + assert token1 == token2 + + +class TestCSRFTokenGeneration: + """Test CSRF token generation.""" + + def test_token_has_sufficient_entropy(self): + """Test that generated tokens have sufficient entropy.""" + from openrouter_monitor.middleware.csrf import CSRFMiddleware + + app = FastAPI() + middleware = CSRFMiddleware(app) + + # Create a mock request without cookie + class MockRequest: + def __init__(self): + self.cookies = {} + + request = MockRequest() + token = middleware._get_or_create_token(request) + + # Token should be at least 32 characters (urlsafe base64 of 24 bytes) + assert len(token) >= 32 + + def test_token_is_unique(self): + """Test that generated tokens are unique.""" + from openrouter_monitor.middleware.csrf import CSRFMiddleware + + app = FastAPI() + middleware = CSRFMiddleware(app) + + class MockRequest: + def __init__(self): + self.cookies = {} + + tokens = set() + for _ in range(10): + request = MockRequest() + token = middleware._get_or_create_token(request) + tokens.add(token) + + # All tokens should be unique + assert len(tokens) == 10