"""Web routes for HTML interface. Provides HTML pages for the web interface using Jinja2 templates and HTMX. """ from typing import Optional from fastapi import APIRouter, Depends, Form, HTTPException, Request, Response from fastapi.responses import HTMLResponse, RedirectResponse from sqlalchemy.orm import Session from openrouter_monitor.database import get_db from openrouter_monitor.models import ApiKey, ApiToken, User from openrouter_monitor.services.password import verify_password from openrouter_monitor.templates_config import templates from openrouter_monitor.services.jwt import create_access_token from openrouter_monitor.services.stats import get_dashboard_data router = APIRouter(tags=["web"]) # Helper function to handle authentication check def require_auth(request: Request, db: Session = Depends(get_db)) -> Optional[User]: """Get current user or return None.""" from openrouter_monitor.dependencies.auth import get_current_user_optional return get_current_user_optional(request, db) def get_auth_user(request: Request, db: Session = Depends(get_db)) -> User: """Get authenticated user or redirect to login.""" from openrouter_monitor.dependencies.auth import get_current_user_optional user = get_current_user_optional(request, db) if not user: raise HTTPException(status_code=302, headers={"Location": "/login"}) return user # ============================================================================ # Authentication Routes # ============================================================================ @router.get("/login", response_class=HTMLResponse) async def login_page( request: Request, user: Optional[User] = Depends(require_auth), ): """Render login page.""" # If already logged in, redirect to dashboard if user: return RedirectResponse(url="/dashboard", status_code=302) return templates.TemplateResponse( request, "auth/login.html", {"user": None, "error": None} ) @router.post("/login", response_class=HTMLResponse) async def login_submit( request: Request, response: Response, email: str = Form(...), password: str = Form(...), remember: bool = Form(False), db: Session = Depends(get_db), ): """Handle login form submission.""" # Find user by email user = db.query(User).filter(User.email == email).first() # Verify credentials if not user or not verify_password(password, user.hashed_password): return templates.TemplateResponse( request, "auth/login.html", { "user": None, "error": "Invalid email or password" }, status_code=401 ) # Create JWT token access_token = create_access_token( data={"sub": str(user.id)}, expires_delta=None if remember else 30 # 30 minutes if not remembered ) # Set cookie and redirect redirect_response = RedirectResponse(url="/dashboard", status_code=302) redirect_response.set_cookie( key="access_token", value=f"Bearer {access_token}", httponly=True, max_age=60 * 60 * 24 * 30 if remember else 60 * 30, # 30 days or 30 min samesite="lax" ) return redirect_response @router.get("/register", response_class=HTMLResponse) async def register_page( request: Request, user: Optional[User] = Depends(require_auth), ): """Render registration page.""" # If already logged in, redirect to dashboard if user: return RedirectResponse(url="/dashboard", status_code=302) return templates.TemplateResponse( request, "auth/register.html", { "user": None, "error": None} ) @router.post("/register", response_class=HTMLResponse) async def register_submit( request: Request, email: str = Form(...), password: str = Form(...), password_confirm: str = Form(...), db: Session = Depends(get_db), ): """Handle registration form submission.""" # Validate passwords match if password != password_confirm: return templates.TemplateResponse( request, "auth/register.html", { "user": None, "error": "Passwords do not match" }, status_code=400 ) # Check if user already exists existing_user = db.query(User).filter(User.email == email).first() if existing_user: return templates.TemplateResponse( request, "auth/register.html", { "user": None, "error": "Email already registered" }, status_code=400 ) # Create new user from openrouter_monitor.services.password import hash_password new_user = User( email=email, hashed_password=hash_password(password) ) db.add(new_user) db.commit() db.refresh(new_user) # Redirect to login return RedirectResponse(url="/login", status_code=302) @router.post("/logout") async def logout(): """Handle logout.""" response = RedirectResponse(url="/login", status_code=302) response.delete_cookie(key="access_token") return response # ============================================================================ # Protected Routes (Require Authentication) # ============================================================================ @router.get("/dashboard", response_class=HTMLResponse) async def dashboard( request: Request, db: Session = Depends(get_db), ): """Render dashboard page.""" # Get authenticated user from openrouter_monitor.dependencies.auth import get_current_user_optional user = get_current_user_optional(request, db) if not user: return RedirectResponse(url="/login", status_code=302) # Get dashboard data dashboard_data = get_dashboard_data(db, user.id, days=30) return templates.TemplateResponse( request, "dashboard/index.html", { "user": user, "stats": dashboard_data.get("summary", {}), "recent_usage": dashboard_data.get("recent_usage", []), "chart_data": dashboard_data.get("chart_data", {"labels": [], "data": []}), "models_data": dashboard_data.get("models_data", {"labels": [], "data": []}), } ) @router.get("/keys", response_class=HTMLResponse) async def api_keys_page( request: Request, db: Session = Depends(get_db), ): """Render API keys management page.""" from openrouter_monitor.dependencies.auth import get_current_user_optional user = get_current_user_optional(request, db) if not user: return RedirectResponse(url="/login", status_code=302) # Get user's API keys (metadata only, no key values) api_keys = db.query(ApiKey).filter( ApiKey.user_id == user.id, ApiKey.is_active == True ).all() return templates.TemplateResponse( request, "keys/index.html", { "user": user, "api_keys": api_keys } ) @router.post("/keys", response_class=HTMLResponse) async def create_api_key( request: Request, name: str = Form(...), key_value: str = Form(...), db: Session = Depends(get_db), ): """Handle API key creation.""" from openrouter_monitor.dependencies.auth import get_current_user_optional user = get_current_user_optional(request, db) if not user: if request.headers.get("HX-Request"): return HTMLResponse("
Please log in
") return RedirectResponse(url="/login", status_code=302) # Encrypt and save key from openrouter_monitor.services.encryption import EncryptionService encryption_service = EncryptionService() encrypted_key = encryption_service.encrypt(key_value) new_key = ApiKey( user_id=user.id, name=name, encrypted_key=encrypted_key, is_active=True ) db.add(new_key) db.commit() db.refresh(new_key) # Return row for HTMX or redirect if request.headers.get("HX-Request"): # Return just the row HTML for HTMX return templates.TemplateResponse( request, "keys/index.html", { "user": user, "api_keys": [new_key] } ) return RedirectResponse(url="/keys", status_code=302) @router.delete("/keys/{key_id}") async def delete_api_key( request: Request, key_id: int, db: Session = Depends(get_db), ): """Handle API key deletion (soft delete).""" from openrouter_monitor.dependencies.auth import get_current_user_optional user = get_current_user_optional(request, db) if not user: raise HTTPException(status_code=401, detail="Not authenticated") # Find key and verify ownership api_key = db.query(ApiKey).filter( ApiKey.id == key_id, ApiKey.user_id == user.id ).first() if not api_key: raise HTTPException(status_code=404, detail="Key not found") # Soft delete api_key.is_active = False db.commit() if request.headers.get("HX-Request"): return HTMLResponse("") # Empty response removes the row return RedirectResponse(url="/keys", status_code=302) @router.get("/stats", response_class=HTMLResponse) async def stats_page( request: Request, start_date: Optional[str] = None, end_date: Optional[str] = None, api_key_id: Optional[int] = None, model: Optional[str] = None, page: int = 1, db: Session = Depends(get_db), ): """Render detailed stats page.""" from openrouter_monitor.dependencies.auth import get_current_user_optional user = get_current_user_optional(request, db) if not user: return RedirectResponse(url="/login", status_code=302) # Get user's API keys for filter dropdown api_keys = db.query(ApiKey).filter( ApiKey.user_id == user.id, ApiKey.is_active == True ).all() # TODO: Implement stats query with filters # For now, return empty data return templates.TemplateResponse( request, "stats/index.html", { "user": user, "api_keys": api_keys, "filters": { "start_date": start_date, "end_date": end_date, "api_key_id": api_key_id, "model": model }, "stats": [], "summary": { "total_requests": 0, "total_tokens": 0, "total_cost": 0.0 }, "page": page, "total_pages": 1, "query_string": "" } ) @router.get("/tokens", response_class=HTMLResponse) async def tokens_page( request: Request, new_token: Optional[str] = None, db: Session = Depends(get_db), ): """Render API tokens management page.""" from openrouter_monitor.dependencies.auth import get_current_user_optional user = get_current_user_optional(request, db) if not user: return RedirectResponse(url="/login", status_code=302) # Get user's API tokens api_tokens = db.query(ApiToken).filter( ApiToken.user_id == user.id ).order_by(ApiToken.created_at.desc()).all() return templates.TemplateResponse( request, "tokens/index.html", { "user": user, "api_tokens": api_tokens, "new_token": new_token } ) @router.post("/tokens") async def create_token( request: Request, name: str = Form(...), db: Session = Depends(get_db), ): """Handle API token creation.""" from openrouter_monitor.dependencies.auth import get_current_user_optional from openrouter_monitor.services.token import generate_api_token, hash_api_token from openrouter_monitor.config import get_settings user = get_current_user_optional(request, db) if not user: raise HTTPException(status_code=401, detail="Not authenticated") settings = get_settings() # Check token limit existing_tokens = db.query(ApiToken).filter( ApiToken.user_id == user.id, ApiToken.is_active == True ).count() if existing_tokens >= settings.MAX_API_TOKENS_PER_USER: raise HTTPException( status_code=400, detail=f"Maximum number of tokens ({settings.MAX_API_TOKENS_PER_USER}) reached" ) # Generate token token_plaintext = generate_api_token() token_hash = hash_api_token(token_plaintext) # Save to database new_token = ApiToken( user_id=user.id, name=name, token_hash=token_hash, is_active=True ) db.add(new_token) db.commit() # Redirect with token in query param (shown only once) return RedirectResponse( url=f"/tokens?new_token={token_plaintext}", status_code=302 ) @router.delete("/tokens/{token_id}") async def revoke_token( request: Request, token_id: int, db: Session = Depends(get_db), ): """Handle API token revocation (soft delete).""" from openrouter_monitor.dependencies.auth import get_current_user_optional user = get_current_user_optional(request, db) if not user: raise HTTPException(status_code=401, detail="Not authenticated") # Find token and verify ownership api_token = db.query(ApiToken).filter( ApiToken.id == token_id, ApiToken.user_id == user.id ).first() if not api_token: raise HTTPException(status_code=404, detail="Token not found") # Soft delete (revoke) api_token.is_active = False db.commit() if request.headers.get("HX-Request"): return HTMLResponse("") return RedirectResponse(url="/tokens", status_code=302) @router.get("/profile", response_class=HTMLResponse) async def profile_page( request: Request, password_message: Optional[str] = None, password_success: bool = False, db: Session = Depends(get_db), ): """Render user profile page.""" from openrouter_monitor.dependencies.auth import get_current_user_optional user = get_current_user_optional(request, db) if not user: return RedirectResponse(url="/login", status_code=302) return templates.TemplateResponse( request, "profile/index.html", { "user": user, "password_message": password_message, "password_success": password_success } ) @router.post("/profile/password") async def change_password( request: Request, current_password: str = Form(...), new_password: str = Form(...), new_password_confirm: str = Form(...), db: Session = Depends(get_db), ): """Handle password change.""" from openrouter_monitor.dependencies.auth import get_current_user_optional from openrouter_monitor.services.password import verify_password, hash_password user = get_current_user_optional(request, db) if not user: raise HTTPException(status_code=401, detail="Not authenticated") # Verify current password if not verify_password(current_password, user.hashed_password): return templates.TemplateResponse( request, "profile/index.html", { "user": user, "password_message": "Current password is incorrect", "password_success": False }, status_code=400 ) # Verify passwords match if new_password != new_password_confirm: return templates.TemplateResponse( request, "profile/index.html", { "user": user, "password_message": "New passwords do not match", "password_success": False }, status_code=400 ) # Update password user.hashed_password = hash_password(new_password) db.commit() return templates.TemplateResponse( request, "profile/index.html", { "user": user, "password_message": "Password updated successfully", "password_success": True } ) @router.delete("/profile") async def delete_account( request: Request, db: Session = Depends(get_db), ): """Handle account deletion.""" from openrouter_monitor.dependencies.auth import get_current_user_optional user = get_current_user_optional(request, db) if not user: raise HTTPException(status_code=401, detail="Not authenticated") # Delete user and all associated data db.delete(user) db.commit() # Clear cookie and redirect response = RedirectResponse(url="/", status_code=302) response.delete_cookie(key="access_token") return response