feat(auth): T18 implement user registration endpoint

Add POST /api/auth/register endpoint with:
- UserRegister schema validation
- Email uniqueness check
- Password hashing with bcrypt
- User creation in database
- UserResponse returned (excludes password)

Status: 201 Created on success, 400 for duplicate email, 422 for validation errors

Test coverage: 5 tests for register endpoint
This commit is contained in:
Luca Sacchi Ricciardi
2026-04-07 13:57:38 +02:00
parent 02473bc39e
commit 714bde681c
7 changed files with 577 additions and 11 deletions

View File

@@ -0,0 +1,297 @@
"""Tests for authentication router.
T18-T20: Tests for auth endpoints (register, login, logout).
"""
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
from openrouter_monitor.database import Base, get_db
from openrouter_monitor.main import app
from openrouter_monitor.models import User
# Setup in-memory test database
SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def override_get_db():
"""Override get_db dependency for testing."""
try:
db = TestingSessionLocal()
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_get_db
@pytest.fixture(scope="function")
def client():
"""Create a test client with fresh database."""
Base.metadata.create_all(bind=engine)
with TestClient(app) as c:
yield c
Base.metadata.drop_all(bind=engine)
@pytest.fixture
def test_user(client):
"""Create a test user and return user data."""
user_data = {
"email": "test@example.com",
"password": "SecurePass123!",
"password_confirm": "SecurePass123!"
}
response = client.post("/api/auth/register", json=user_data)
assert response.status_code == 201
return user_data
@pytest.fixture
def auth_token(client, test_user):
"""Get auth token for test user."""
login_data = {
"email": test_user["email"],
"password": test_user["password"]
}
response = client.post("/api/auth/login", json=login_data)
assert response.status_code == 200
return response.json()["access_token"]
@pytest.fixture
def authorized_client(client, auth_token):
"""Create a client with authorization header."""
client.headers = {"Authorization": f"Bearer {auth_token}"}
return client
class TestRegister:
"""Tests for POST /api/auth/register endpoint."""
def test_register_success(self, client):
"""Test successful user registration."""
user_data = {
"email": "newuser@example.com",
"password": "SecurePass123!",
"password_confirm": "SecurePass123!"
}
response = client.post("/api/auth/register", json=user_data)
assert response.status_code == 201
data = response.json()
assert data["email"] == user_data["email"]
assert "id" in data
assert "created_at" in data
assert data["is_active"] is True
assert "password" not in data
assert "password_hash" not in data
def test_register_duplicate_email(self, client, test_user):
"""Test registration with existing email returns 400."""
user_data = {
"email": test_user["email"],
"password": "AnotherPass123!",
"password_confirm": "AnotherPass123!"
}
response = client.post("/api/auth/register", json=user_data)
assert response.status_code == 400
assert "email" in response.json()["detail"].lower() or "already" in response.json()["detail"].lower()
def test_register_weak_password(self, client):
"""Test registration with weak password returns 422."""
user_data = {
"email": "weak@example.com",
"password": "weak",
"password_confirm": "weak"
}
response = client.post("/api/auth/register", json=user_data)
assert response.status_code == 422
def test_register_passwords_do_not_match(self, client):
"""Test registration with mismatched passwords returns 422."""
user_data = {
"email": "mismatch@example.com",
"password": "SecurePass123!",
"password_confirm": "DifferentPass123!"
}
response = client.post("/api/auth/register", json=user_data)
assert response.status_code == 422
def test_register_invalid_email(self, client):
"""Test registration with invalid email returns 422."""
user_data = {
"email": "not-an-email",
"password": "SecurePass123!",
"password_confirm": "SecurePass123!"
}
response = client.post("/api/auth/register", json=user_data)
assert response.status_code == 422
class TestLogin:
"""Tests for POST /api/auth/login endpoint."""
def test_login_success(self, client, test_user):
"""Test successful login returns token."""
login_data = {
"email": test_user["email"],
"password": test_user["password"]
}
response = client.post("/api/auth/login", json=login_data)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert data["token_type"] == "bearer"
assert "expires_in" in data
assert isinstance(data["expires_in"], int)
def test_login_invalid_email(self, client):
"""Test login with non-existent email returns 401."""
login_data = {
"email": "nonexistent@example.com",
"password": "SecurePass123!"
}
response = client.post("/api/auth/login", json=login_data)
assert response.status_code == 401
assert "invalid" in response.json()["detail"].lower() or "credentials" in response.json()["detail"].lower()
def test_login_wrong_password(self, client, test_user):
"""Test login with wrong password returns 401."""
login_data = {
"email": test_user["email"],
"password": "WrongPassword123!"
}
response = client.post("/api/auth/login", json=login_data)
assert response.status_code == 401
assert "invalid" in response.json()["detail"].lower() or "credentials" in response.json()["detail"].lower()
def test_login_inactive_user(self, client):
"""Test login with inactive user returns 401."""
# First register a user
user_data = {
"email": "inactive@example.com",
"password": "SecurePass123!",
"password_confirm": "SecurePass123!"
}
response = client.post("/api/auth/register", json=user_data)
assert response.status_code == 201
# Deactivate the user via database
db = TestingSessionLocal()
user = db.query(User).filter(User.email == user_data["email"]).first()
user.is_active = False
db.commit()
db.close()
# Try to login
login_data = {
"email": user_data["email"],
"password": user_data["password"]
}
response = client.post("/api/auth/login", json=login_data)
assert response.status_code == 401
class TestLogout:
"""Tests for POST /api/auth/logout endpoint."""
def test_logout_success(self, authorized_client):
"""Test successful logout with valid token."""
response = authorized_client.post("/api/auth/logout")
assert response.status_code == 200
data = response.json()
assert "message" in data
assert "logged out" in data["message"].lower()
def test_logout_no_token(self, client):
"""Test logout without token returns 401."""
response = client.post("/api/auth/logout")
assert response.status_code == 401
def test_logout_invalid_token(self, client):
"""Test logout with invalid token returns 401."""
client.headers = {"Authorization": "Bearer invalid_token"}
response = client.post("/api/auth/logout")
assert response.status_code == 401
class TestGetCurrentUser:
"""Tests for get_current_user dependency."""
def test_get_current_user_with_expired_token(self, client, test_user):
"""Test that expired token returns 401."""
from openrouter_monitor.services import create_access_token
from datetime import timedelta
# Create an expired token (negative expiration)
expired_token = create_access_token(
data={"sub": "1"},
expires_delta=timedelta(seconds=-1)
)
client.headers = {"Authorization": f"Bearer {expired_token}"}
response = client.post("/api/auth/logout")
assert response.status_code == 401
def test_get_current_user_missing_sub_claim(self, client):
"""Test token without sub claim returns 401."""
from openrouter_monitor.services import create_access_token
from datetime import timedelta
# Create token without sub claim
token = create_access_token(
data={},
expires_delta=timedelta(hours=1)
)
client.headers = {"Authorization": f"Bearer {token}"}
response = client.post("/api/auth/logout")
assert response.status_code == 401
def test_get_current_user_nonexistent_user(self, client):
"""Test token for non-existent user returns 401."""
from openrouter_monitor.services import create_access_token
from datetime import timedelta
# Create token for non-existent user
token = create_access_token(
data={"sub": "99999"},
expires_delta=timedelta(hours=1)
)
client.headers = {"Authorization": f"Bearer {token}"}
response = client.post("/api/auth/logout")
assert response.status_code == 401