Files
openrouter-watcher/tests/unit/models/test_user_model.py
Luca Sacchi Ricciardi ea198e8b0d feat(models): T07-T10 create SQLAlchemy models for User, ApiKey, UsageStats, ApiToken
- Add User model with email unique constraint and relationships
- Add ApiKey model with encrypted key storage and user relationship
- Add UsageStats model with unique constraint (api_key_id, date, model)
- Add ApiToken model with token_hash indexing
- Configure all cascade delete relationships
- Add 49 comprehensive tests with 95% coverage

Models:
- User: id, email, password_hash, created_at, updated_at, is_active
- ApiKey: id, user_id, name, key_encrypted, is_active, created_at, last_used_at
- UsageStats: id, api_key_id, date, model, requests_count, tokens_input, tokens_output, cost
- ApiToken: id, user_id, token_hash, name, created_at, last_used_at, is_active

Tests: 49 passed, coverage 95%
2026-04-07 11:09:12 +02:00

281 lines
9.3 KiB
Python

"""Tests for User model (T07).
T07: Creare model User (SQLAlchemy)
"""
import pytest
from datetime import datetime
from sqlalchemy import create_engine, inspect
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import IntegrityError
# Import models to register them with Base
from openrouter_monitor.models import User, ApiKey, UsageStats, ApiToken
from openrouter_monitor.database import Base
@pytest.mark.unit
class TestUserModelBasics:
"""Test User model basic attributes and creation."""
def test_user_model_exists(self):
"""Test that User model can be imported."""
# Assert
assert User is not None
assert hasattr(User, '__tablename__')
assert User.__tablename__ == 'users'
def test_user_has_required_fields(self):
"""Test that User model has all required fields."""
# Arrange
from sqlalchemy import Column, Integer, String, DateTime, Boolean
# Assert
assert hasattr(User, 'id')
assert hasattr(User, 'email')
assert hasattr(User, 'password_hash')
assert hasattr(User, 'created_at')
assert hasattr(User, 'updated_at')
assert hasattr(User, 'is_active')
def test_user_create_with_valid_data(self, tmp_path):
"""Test creating User with valid data."""
# Arrange
engine = create_engine(f'sqlite:///{tmp_path}/test_user_data.db')
Session = sessionmaker(bind=engine)
Base.metadata.create_all(bind=engine)
session = Session()
# Act
user = User(
email="test@example.com",
password_hash="hashed_password_here"
)
session.add(user)
session.flush() # Apply defaults without committing
# Assert
assert user.email == "test@example.com"
assert user.password_hash == "hashed_password_here"
assert user.is_active is True
assert user.created_at is not None
session.close()
def test_user_default_is_active_true(self, tmp_path):
"""Test that is_active defaults to True."""
# Arrange
engine = create_engine(f'sqlite:///{tmp_path}/test_active_default.db')
Session = sessionmaker(bind=engine)
Base.metadata.create_all(bind=engine)
session = Session()
# Act
user = User(email="test@example.com", password_hash="hash")
session.add(user)
session.flush()
# Assert
assert user.is_active is True
session.close()
def test_user_timestamps_auto_set(self, tmp_path):
"""Test that created_at is automatically set."""
# Arrange
engine = create_engine(f'sqlite:///{tmp_path}/test_timestamps.db')
Session = sessionmaker(bind=engine)
Base.metadata.create_all(bind=engine)
session = Session()
# Act
before = datetime.utcnow()
user = User(email="test@example.com", password_hash="hash")
session.add(user)
session.flush()
after = datetime.utcnow()
# Assert
assert user.created_at is not None
assert before <= user.created_at <= after
session.close()
@pytest.mark.unit
class TestUserConstraints:
"""Test User model constraints and validations."""
def test_user_email_unique_constraint(self, tmp_path):
"""Test that email must be unique."""
# Arrange
engine = create_engine(f'sqlite:///{tmp_path}/test_unique.db')
Session = sessionmaker(bind=engine)
Base.metadata.create_all(bind=engine)
session = Session()
# Act - Create first user
user1 = User(email="unique@example.com", password_hash="hash1")
session.add(user1)
session.commit()
# Act - Try to create second user with same email
user2 = User(email="unique@example.com", password_hash="hash2")
session.add(user2)
# Assert - Should raise IntegrityError
with pytest.raises(IntegrityError):
session.commit()
session.close()
def test_user_email_index_exists(self):
"""Test that email has an index."""
# Act - Check indexes on users table
inspector = inspect(User.__table__)
indexes = inspector.indexes
# Assert
index_names = [idx.name for idx in indexes]
assert any('email' in name for name in index_names)
@pytest.mark.unit
class TestUserRelationships:
"""Test User model relationships."""
def test_user_has_api_keys_relationship(self):
"""Test that User has api_keys relationship."""
# Assert
assert hasattr(User, 'api_keys')
def test_user_has_api_tokens_relationship(self):
"""Test that User has api_tokens relationship."""
# Assert
assert hasattr(User, 'api_tokens')
def test_api_keys_cascade_delete(self):
"""Test that api_keys have cascade delete."""
# Arrange
from sqlalchemy.orm import RelationshipProperty
# Act
api_keys_rel = getattr(User.api_keys, 'property', None)
# Assert
if api_keys_rel:
assert 'delete' in str(api_keys_rel.cascade).lower() or 'all' in str(api_keys_rel.cascade).lower()
def test_api_tokens_cascade_delete(self):
"""Test that api_tokens have cascade delete."""
# Arrange
from sqlalchemy.orm import RelationshipProperty
# Act
api_tokens_rel = getattr(User.api_tokens, 'property', None)
# Assert
if api_tokens_rel:
assert 'delete' in str(api_tokens_rel.cascade).lower() or 'all' in str(api_tokens_rel.cascade).lower()
@pytest.mark.integration
class TestUserDatabaseIntegration:
"""Integration tests for User model with database."""
def test_user_persist_and_retrieve(self, tmp_path):
"""Test persisting and retrieving user from database."""
# Arrange
engine = create_engine(f'sqlite:///{tmp_path}/test_user.db')
Session = sessionmaker(bind=engine)
Base.metadata.create_all(bind=engine)
session = Session()
# Act
user = User(email="persist@example.com", password_hash="hashed123")
session.add(user)
session.commit()
# Retrieve
retrieved = session.query(User).filter_by(email="persist@example.com").first()
# Assert
assert retrieved is not None
assert retrieved.email == "persist@example.com"
assert retrieved.password_hash == "hashed123"
assert retrieved.id is not None
session.close()
def test_user_email_filtering(self, tmp_path):
"""Test filtering users by email."""
# Arrange
engine = create_engine(f'sqlite:///{tmp_path}/test_filter.db')
Session = sessionmaker(bind=engine)
Base.metadata.create_all(bind=engine)
session = Session()
# Create multiple users
user1 = User(email="alice@example.com", password_hash="hash1")
user2 = User(email="bob@example.com", password_hash="hash2")
session.add_all([user1, user2])
session.commit()
# Act
result = session.query(User).filter_by(email="alice@example.com").first()
# Assert
assert result is not None
assert result.email == "alice@example.com"
session.close()
def test_user_is_active_filtering(self, tmp_path):
"""Test filtering users by is_active status."""
# Arrange
engine = create_engine(f'sqlite:///{tmp_path}/test_active.db')
Session = sessionmaker(bind=engine)
Base.metadata.create_all(bind=engine)
session = Session()
# Create users
active_user = User(email="active@example.com", password_hash="hash1", is_active=True)
inactive_user = User(email="inactive@example.com", password_hash="hash2", is_active=False)
session.add_all([active_user, inactive_user])
session.commit()
# Act
active_users = session.query(User).filter_by(is_active=True).all()
inactive_users = session.query(User).filter_by(is_active=False).all()
# Assert
assert len(active_users) == 1
assert len(inactive_users) == 1
assert active_users[0].email == "active@example.com"
assert inactive_users[0].email == "inactive@example.com"
session.close()
def test_user_update_timestamp(self, tmp_path):
"""Test that updated_at can be set and retrieved."""
# Arrange
engine = create_engine(f'sqlite:///{tmp_path}/test_update.db')
Session = sessionmaker(bind=engine)
Base.metadata.create_all(bind=engine)
session = Session()
# Create user
user = User(email="update@example.com", password_hash="hash")
session.add(user)
session.commit()
# Act - Update
original_updated_at = user.updated_at
user.password_hash = "new_hash"
session.commit()
# Assert
retrieved = session.query(User).filter_by(email="update@example.com").first()
assert retrieved.updated_at is not None
session.close()