- Add User model with email unique constraint and relationships - Add ApiKey model with encrypted key storage and user relationship - Add UsageStats model with unique constraint (api_key_id, date, model) - Add ApiToken model with token_hash indexing - Configure all cascade delete relationships - Add 49 comprehensive tests with 95% coverage Models: - User: id, email, password_hash, created_at, updated_at, is_active - ApiKey: id, user_id, name, key_encrypted, is_active, created_at, last_used_at - UsageStats: id, api_key_id, date, model, requests_count, tokens_input, tokens_output, cost - ApiToken: id, user_id, token_hash, name, created_at, last_used_at, is_active Tests: 49 passed, coverage 95%
217 lines
7.0 KiB
Python
217 lines
7.0 KiB
Python
"""Tests for ApiToken model (T10).
|
|
|
|
T10: Creare model ApiToken (SQLAlchemy)
|
|
"""
|
|
import pytest
|
|
from datetime import datetime
|
|
from sqlalchemy import create_engine, inspect
|
|
from sqlalchemy.orm import sessionmaker
|
|
from sqlalchemy.exc import IntegrityError
|
|
|
|
# Import models to register them with Base
|
|
from openrouter_monitor.models import User, ApiKey, UsageStats, ApiToken
|
|
from openrouter_monitor.database import Base
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestApiTokenModelBasics:
|
|
"""Test ApiToken model basic attributes and creation."""
|
|
|
|
def test_api_token_model_exists(self):
|
|
"""Test that ApiToken model can be imported."""
|
|
# Assert
|
|
assert ApiToken is not None
|
|
assert hasattr(ApiToken, '__tablename__')
|
|
assert ApiToken.__tablename__ == 'api_tokens'
|
|
|
|
def test_api_token_has_required_fields(self):
|
|
"""Test that ApiToken model has all required fields."""
|
|
# Assert
|
|
assert hasattr(ApiToken, 'id')
|
|
assert hasattr(ApiToken, 'user_id')
|
|
assert hasattr(ApiToken, 'token_hash')
|
|
assert hasattr(ApiToken, 'name')
|
|
assert hasattr(ApiToken, 'created_at')
|
|
assert hasattr(ApiToken, 'last_used_at')
|
|
assert hasattr(ApiToken, 'is_active')
|
|
|
|
def test_api_token_create_with_valid_data(self, tmp_path):
|
|
"""Test creating ApiToken with valid data."""
|
|
# Arrange
|
|
engine = create_engine(f'sqlite:///{tmp_path}/test_api_token.db')
|
|
Session = sessionmaker(bind=engine)
|
|
Base.metadata.create_all(bind=engine)
|
|
session = Session()
|
|
|
|
# Act
|
|
token = ApiToken(
|
|
user_id=1,
|
|
token_hash="sha256_hash_here",
|
|
name="Integration Token"
|
|
)
|
|
session.add(token)
|
|
session.flush()
|
|
|
|
# Assert
|
|
assert token.token_hash == "sha256_hash_here"
|
|
assert token.name == "Integration Token"
|
|
assert token.is_active is True
|
|
assert token.created_at is not None
|
|
assert token.last_used_at is None
|
|
|
|
session.close()
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestApiTokenConstraints:
|
|
"""Test ApiToken model constraints."""
|
|
|
|
def test_api_token_user_id_index_exists(self):
|
|
"""Test that user_id has an index."""
|
|
# Act
|
|
inspector = inspect(ApiToken.__table__)
|
|
indexes = inspector.indexes
|
|
|
|
# Assert
|
|
index_names = [idx.name for idx in indexes]
|
|
assert any('user' in name for name in index_names)
|
|
|
|
def test_api_token_token_hash_index_exists(self):
|
|
"""Test that token_hash has an index."""
|
|
# Act
|
|
inspector = inspect(ApiToken.__table__)
|
|
indexes = inspector.indexes
|
|
|
|
# Assert
|
|
index_names = [idx.name for idx in indexes]
|
|
assert any('token' in name or 'hash' in name for name in index_names)
|
|
|
|
def test_api_token_is_active_index_exists(self):
|
|
"""Test that is_active has an index."""
|
|
# Act
|
|
inspector = inspect(ApiToken.__table__)
|
|
indexes = inspector.indexes
|
|
|
|
# Assert
|
|
index_names = [idx.name for idx in indexes]
|
|
assert any('active' in name for name in index_names)
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestApiTokenRelationships:
|
|
"""Test ApiToken model relationships."""
|
|
|
|
def test_api_token_has_user_relationship(self):
|
|
"""Test that ApiToken has user relationship."""
|
|
# Assert
|
|
assert hasattr(ApiToken, 'user')
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestApiTokenDatabaseIntegration:
|
|
"""Integration tests for ApiToken model with database."""
|
|
|
|
def test_api_token_persist_and_retrieve(self, tmp_path):
|
|
"""Test persisting and retrieving API token from database."""
|
|
# Arrange
|
|
engine = create_engine(f'sqlite:///{tmp_path}/test_token_persist.db')
|
|
Session = sessionmaker(bind=engine)
|
|
Base.metadata.create_all(bind=engine)
|
|
session = Session()
|
|
|
|
# Act
|
|
token = ApiToken(
|
|
user_id=1,
|
|
token_hash="abc123hash456",
|
|
name="My Token"
|
|
)
|
|
session.add(token)
|
|
session.commit()
|
|
|
|
# Retrieve by hash
|
|
retrieved = session.query(ApiToken).filter_by(token_hash="abc123hash456").first()
|
|
|
|
# Assert
|
|
assert retrieved is not None
|
|
assert retrieved.name == "My Token"
|
|
assert retrieved.id is not None
|
|
|
|
session.close()
|
|
|
|
def test_api_token_lookup_by_hash(self, tmp_path):
|
|
"""Test looking up token by hash."""
|
|
# Arrange
|
|
engine = create_engine(f'sqlite:///{tmp_path}/test_lookup.db')
|
|
Session = sessionmaker(bind=engine)
|
|
Base.metadata.create_all(bind=engine)
|
|
session = Session()
|
|
|
|
# Create multiple tokens
|
|
token1 = ApiToken(user_id=1, token_hash="hash1", name="Token 1")
|
|
token2 = ApiToken(user_id=1, token_hash="hash2", name="Token 2")
|
|
session.add_all([token1, token2])
|
|
session.commit()
|
|
|
|
# Act - Look up by specific hash
|
|
result = session.query(ApiToken).filter_by(token_hash="hash2").first()
|
|
|
|
# Assert
|
|
assert result is not None
|
|
assert result.name == "Token 2"
|
|
|
|
session.close()
|
|
|
|
def test_api_token_last_used_at_can_be_set(self, tmp_path):
|
|
"""Test that last_used_at can be set."""
|
|
# Arrange
|
|
engine = create_engine(f'sqlite:///{tmp_path}/test_last_used.db')
|
|
Session = sessionmaker(bind=engine)
|
|
Base.metadata.create_all(bind=engine)
|
|
session = Session()
|
|
|
|
now = datetime.utcnow()
|
|
|
|
# Act
|
|
token = ApiToken(
|
|
user_id=1,
|
|
token_hash="test_hash",
|
|
name="Test Token",
|
|
last_used_at=now
|
|
)
|
|
session.add(token)
|
|
session.commit()
|
|
|
|
# Retrieve
|
|
retrieved = session.query(ApiToken).first()
|
|
|
|
# Assert
|
|
assert retrieved.last_used_at is not None
|
|
|
|
session.close()
|
|
|
|
def test_api_token_is_active_filtering(self, tmp_path):
|
|
"""Test filtering tokens by is_active status."""
|
|
# Arrange
|
|
engine = create_engine(f'sqlite:///{tmp_path}/test_active_filter.db')
|
|
Session = sessionmaker(bind=engine)
|
|
Base.metadata.create_all(bind=engine)
|
|
session = Session()
|
|
|
|
# Create tokens
|
|
active = ApiToken(user_id=1, token_hash="active_hash", name="Active", is_active=True)
|
|
inactive = ApiToken(user_id=1, token_hash="inactive_hash", name="Inactive", is_active=False)
|
|
session.add_all([active, inactive])
|
|
session.commit()
|
|
|
|
# Act
|
|
active_tokens = session.query(ApiToken).filter_by(is_active=True).all()
|
|
inactive_tokens = session.query(ApiToken).filter_by(is_active=False).all()
|
|
|
|
# Assert
|
|
assert len(active_tokens) == 1
|
|
assert len(inactive_tokens) == 1
|
|
assert active_tokens[0].name == "Active"
|
|
assert inactive_tokens[0].name == "Inactive"
|
|
|
|
session.close()
|