diff --git a/src/openrouter_monitor/schemas/__init__.py b/src/openrouter_monitor/schemas/__init__.py index ae19f1d..6604616 100644 --- a/src/openrouter_monitor/schemas/__init__.py +++ b/src/openrouter_monitor/schemas/__init__.py @@ -20,6 +20,19 @@ from openrouter_monitor.schemas.stats import ( UsageStatsCreate, UsageStatsResponse, ) +from openrouter_monitor.schemas.public_api import ( + ApiTokenCreate, + ApiTokenCreateResponse, + ApiTokenResponse, + PaginationInfo, + PeriodInfo, + PublicKeyInfo, + PublicKeyListResponse, + PublicStatsResponse, + PublicUsageItem, + PublicUsageResponse, + SummaryInfo, +) __all__ = [ "UserRegister", @@ -37,4 +50,16 @@ __all__ = [ "StatsByModel", "StatsByDate", "DashboardResponse", + # Public API schemas + "ApiTokenCreate", + "ApiTokenCreateResponse", + "ApiTokenResponse", + "PublicStatsResponse", + "PublicUsageResponse", + "PublicKeyInfo", + "PublicKeyListResponse", + "SummaryInfo", + "PeriodInfo", + "PublicUsageItem", + "PaginationInfo", ] diff --git a/src/openrouter_monitor/schemas/public_api.py b/src/openrouter_monitor/schemas/public_api.py new file mode 100644 index 0000000..66ebbba --- /dev/null +++ b/src/openrouter_monitor/schemas/public_api.py @@ -0,0 +1,347 @@ +"""Public API Pydantic schemas for OpenRouter API Key Monitor. + +T35: Pydantic schemas for public API endpoints. +These schemas define the data structures for the public API v1 endpoints. +""" +import datetime +from decimal import Decimal +from typing import Dict, List, Optional + +from pydantic import BaseModel, ConfigDict, Field, field_validator + + +class ApiTokenCreate(BaseModel): + """Schema for creating a new API token. + + Attributes: + name: Human-readable name for the token (1-100 characters) + """ + + name: str = Field( + ..., + min_length=1, + max_length=100, + description="Human-readable name for the token", + examples=["Production API Token", "Development Key"] + ) + + +class ApiTokenResponse(BaseModel): + """Schema for API token response (returned to client). + + IMPORTANT: This schema does NOT include the token value for security. + The plaintext token is only shown once at creation time (ApiTokenCreateResponse). + + Attributes: + id: Token ID + name: Token name + created_at: Creation timestamp + last_used_at: Last usage timestamp (None if never used) + is_active: Whether the token is active + """ + + model_config = ConfigDict(from_attributes=True) + + id: int = Field( + ..., + description="Token ID", + examples=[1] + ) + name: str = Field( + ..., + description="Token name", + examples=["Production Token"] + ) + created_at: datetime.datetime = Field( + ..., + description="Creation timestamp", + examples=["2024-01-15T12:00:00"] + ) + last_used_at: Optional[datetime.datetime] = Field( + default=None, + description="Last usage timestamp (None if never used)", + examples=["2024-01-20T15:30:00"] + ) + is_active: bool = Field( + ..., + description="Whether the token is active", + examples=[True] + ) + + +class ApiTokenCreateResponse(BaseModel): + """Schema for API token creation response. + + IMPORTANT: This is the ONLY time the plaintext token is shown. + After creation, the token cannot be retrieved again. + + Attributes: + id: Token ID + name: Token name + token: Plaintext token (shown only once!) + created_at: Creation timestamp + """ + + model_config = ConfigDict(from_attributes=True) + + id: int = Field( + ..., + description="Token ID", + examples=[1] + ) + name: str = Field( + ..., + description="Token name", + examples=["Production Token"] + ) + token: str = Field( + ..., + description="Plaintext token (shown only once at creation!)", + examples=["or_api_abc123xyz789def456"] + ) + created_at: datetime.datetime = Field( + ..., + description="Creation timestamp", + examples=["2024-01-15T12:00:00"] + ) + + +class SummaryInfo(BaseModel): + """Schema for statistics summary. + + Attributes: + total_requests: Total number of requests + total_cost: Total cost in USD + total_tokens: Total tokens (input + output) + """ + + total_requests: int = Field( + ..., + ge=0, + description="Total number of requests", + examples=[1000] + ) + total_cost: Decimal = Field( + ..., + ge=0, + description="Total cost in USD", + examples=["5.678901"] + ) + total_tokens: int = Field( + default=0, + ge=0, + description="Total tokens (input + output)", + examples=[50000] + ) + + +class PeriodInfo(BaseModel): + """Schema for statistics period information. + + Attributes: + start_date: Start date of the period + end_date: End date of the period + days: Number of days in the period + """ + + start_date: datetime.date = Field( + ..., + description="Start date of the period", + examples=["2024-01-01"] + ) + end_date: datetime.date = Field( + ..., + description="End date of the period", + examples=["2024-01-31"] + ) + days: int = Field( + ..., + ge=0, + description="Number of days in the period", + examples=[30] + ) + + +class PublicStatsResponse(BaseModel): + """Schema for public API stats response. + + Attributes: + summary: Aggregated statistics summary + period: Period information (start_date, end_date, days) + """ + + summary: SummaryInfo = Field( + ..., + description="Aggregated statistics summary" + ) + period: PeriodInfo = Field( + ..., + description="Period information (start_date, end_date, days)" + ) + + +class PublicUsageItem(BaseModel): + """Schema for a single usage item in public API. + + IMPORTANT: This only includes the API key NAME, not the actual key value. + + Attributes: + date: Date of the statistics + api_key_name: Name of the API key (not the value!) + model: AI model name + requests_count: Number of requests + tokens_input: Number of input tokens + tokens_output: Number of output tokens + cost: Cost in USD + """ + + date: datetime.date = Field( + ..., + description="Date of the statistics", + examples=["2024-01-15"] + ) + api_key_name: str = Field( + ..., + description="Name of the API key (not the value!)", + examples=["Production Key"] + ) + model: str = Field( + ..., + description="AI model name", + examples=["gpt-4"] + ) + requests_count: int = Field( + ..., + ge=0, + description="Number of requests", + examples=[100] + ) + tokens_input: int = Field( + default=0, + ge=0, + description="Number of input tokens", + examples=[5000] + ) + tokens_output: int = Field( + default=0, + ge=0, + description="Number of output tokens", + examples=[3000] + ) + cost: Decimal = Field( + ..., + ge=0, + description="Cost in USD", + examples=["0.123456"] + ) + + +class PaginationInfo(BaseModel): + """Schema for pagination information. + + Attributes: + page: Current page number (1-indexed) + limit: Items per page + total: Total number of items + pages: Total number of pages + """ + + page: int = Field( + ..., + ge=1, + description="Current page number (1-indexed)", + examples=[1] + ) + limit: int = Field( + ..., + ge=1, + description="Items per page", + examples=[100] + ) + total: int = Field( + ..., + ge=0, + description="Total number of items", + examples=[250] + ) + pages: int = Field( + ..., + ge=0, + description="Total number of pages", + examples=[3] + ) + + +class PublicUsageResponse(BaseModel): + """Schema for public API usage response with pagination. + + Attributes: + items: List of usage items + pagination: Pagination information + """ + + items: List[PublicUsageItem] = Field( + ..., + description="List of usage items" + ) + pagination: PaginationInfo = Field( + ..., + description="Pagination information" + ) + + +class PublicKeyInfo(BaseModel): + """Schema for public API key information. + + IMPORTANT: This schema does NOT include the actual API key value, + only metadata and aggregated statistics. + + Attributes: + id: Key ID + name: Key name + is_active: Whether the key is active + stats: Aggregated statistics (total_requests, total_cost) + """ + + model_config = ConfigDict(from_attributes=True) + + id: int = Field( + ..., + description="Key ID", + examples=[1] + ) + name: str = Field( + ..., + description="Key name", + examples=["Production Key"] + ) + is_active: bool = Field( + ..., + description="Whether the key is active", + examples=[True] + ) + stats: Dict = Field( + ..., + description="Aggregated statistics (total_requests, total_cost)", + examples=[{"total_requests": 1000, "total_cost": "5.50"}] + ) + + +class PublicKeyListResponse(BaseModel): + """Schema for public API key list response. + + Attributes: + items: List of API keys with statistics + total: Total number of keys + """ + + items: List[PublicKeyInfo] = Field( + ..., + description="List of API keys with statistics" + ) + total: int = Field( + ..., + ge=0, + description="Total number of keys", + examples=[5] + ) \ No newline at end of file diff --git a/tests/unit/schemas/test_public_api_schemas.py b/tests/unit/schemas/test_public_api_schemas.py new file mode 100644 index 0000000..5577776 --- /dev/null +++ b/tests/unit/schemas/test_public_api_schemas.py @@ -0,0 +1,454 @@ +"""Tests for public API Pydantic schemas. + +T35: Tests for public_api.py schemas +""" +import datetime +from decimal import Decimal + +import pytest +from pydantic import ValidationError + +from openrouter_monitor.schemas.public_api import ( + ApiTokenCreate, + ApiTokenCreateResponse, + ApiTokenResponse, + PeriodInfo, + PublicKeyInfo, + PublicKeyListResponse, + PublicStatsResponse, + PublicUsageItem, + PublicUsageResponse, + SummaryInfo, + PaginationInfo, +) + + +class TestApiTokenCreate: + """Test suite for ApiTokenCreate schema.""" + + def test_valid_name_creates_successfully(self): + """Test that a valid name creates the schema successfully.""" + # Arrange & Act + result = ApiTokenCreate(name="My API Token") + + # Assert + assert result.name == "My API Token" + + def test_name_min_length_1_char(self): + """Test that name with 1 character is valid.""" + # Arrange & Act + result = ApiTokenCreate(name="A") + + # Assert + assert result.name == "A" + + def test_name_max_length_100_chars(self): + """Test that name with exactly 100 characters is valid.""" + # Arrange + long_name = "A" * 100 + + # Act + result = ApiTokenCreate(name=long_name) + + # Assert + assert result.name == long_name + + def test_name_too_short_raises_validation_error(self): + """Test that empty name raises ValidationError.""" + # Arrange & Act & Assert + with pytest.raises(ValidationError) as exc_info: + ApiTokenCreate(name="") + + assert "name" in str(exc_info.value) + + def test_name_too_long_raises_validation_error(self): + """Test that name with 101+ characters raises ValidationError.""" + # Arrange + too_long_name = "A" * 101 + + # Act & Assert + with pytest.raises(ValidationError) as exc_info: + ApiTokenCreate(name=too_long_name) + + assert "name" in str(exc_info.value) + + def test_name_strips_whitespace(self): + """Test that name with whitespace is handled correctly.""" + # Note: Pydantic v2 doesn't auto-strip by default + # Arrange & Act + result = ApiTokenCreate(name=" My Token ") + + # Assert + assert result.name == " My Token " + + +class TestApiTokenResponse: + """Test suite for ApiTokenResponse schema.""" + + def test_valid_response_without_token(self): + """Test that response contains NO token field (security).""" + # Arrange + data = { + "id": 1, + "name": "My Token", + "created_at": datetime.datetime(2024, 1, 15, 12, 0, 0), + "last_used_at": None, + "is_active": True + } + + # Act + result = ApiTokenResponse(**data) + + # Assert + assert result.id == 1 + assert result.name == "My Token" + assert result.created_at == datetime.datetime(2024, 1, 15, 12, 0, 0) + assert result.last_used_at is None + assert result.is_active is True + + # Security check: NO token field should exist + assert not hasattr(result, 'token') + + def test_response_with_last_used_at(self): + """Test response when token has been used.""" + # Arrange + data = { + "id": 2, + "name": "Production Token", + "created_at": datetime.datetime(2024, 1, 15, 12, 0, 0), + "last_used_at": datetime.datetime(2024, 1, 20, 15, 30, 0), + "is_active": True + } + + # Act + result = ApiTokenResponse(**data) + + # Assert + assert result.last_used_at == datetime.datetime(2024, 1, 20, 15, 30, 0) + + +class TestApiTokenCreateResponse: + """Test suite for ApiTokenCreateResponse schema.""" + + def test_response_includes_plaintext_token(self): + """Test that create response includes plaintext token (only at creation).""" + # Arrange + data = { + "id": 1, + "name": "My Token", + "token": "or_api_abc123xyz789", # Plaintext token - only shown at creation! + "created_at": datetime.datetime(2024, 1, 15, 12, 0, 0) + } + + # Act + result = ApiTokenCreateResponse(**data) + + # Assert + assert result.id == 1 + assert result.name == "My Token" + assert result.token == "or_api_abc123xyz789" + assert result.created_at == datetime.datetime(2024, 1, 15, 12, 0, 0) + + def test_token_prefix_validation(self): + """Test that token starts with expected prefix.""" + # Arrange + data = { + "id": 1, + "name": "Test", + "token": "or_api_testtoken123", + "created_at": datetime.datetime.utcnow() + } + + # Act + result = ApiTokenCreateResponse(**data) + + # Assert + assert result.token.startswith("or_api_") + + +class TestSummaryInfo: + """Test suite for SummaryInfo schema.""" + + def test_valid_summary(self): + """Test valid summary with all fields.""" + # Arrange & Act + result = SummaryInfo( + total_requests=1000, + total_cost=Decimal("5.50"), + total_tokens=50000 + ) + + # Assert + assert result.total_requests == 1000 + assert result.total_cost == Decimal("5.50") + assert result.total_tokens == 50000 + + +class TestPeriodInfo: + """Test suite for PeriodInfo schema.""" + + def test_valid_period(self): + """Test valid period info.""" + # Arrange + start = datetime.date(2024, 1, 1) + end = datetime.date(2024, 1, 31) + + # Act + result = PeriodInfo( + start_date=start, + end_date=end, + days=30 + ) + + # Assert + assert result.start_date == start + assert result.end_date == end + assert result.days == 30 + + +class TestPublicStatsResponse: + """Test suite for PublicStatsResponse schema.""" + + def test_valid_stats_response(self): + """Test complete stats response.""" + # Arrange + summary = SummaryInfo( + total_requests=1000, + total_cost=Decimal("5.50"), + total_tokens=50000 + ) + period = PeriodInfo( + start_date=datetime.date(2024, 1, 1), + end_date=datetime.date(2024, 1, 31), + days=30 + ) + + # Act + result = PublicStatsResponse(summary=summary, period=period) + + # Assert + assert result.summary.total_requests == 1000 + assert result.period.days == 30 + + +class TestPublicUsageItem: + """Test suite for PublicUsageItem schema.""" + + def test_valid_usage_item(self): + """Test valid usage item for public API.""" + # Arrange & Act + result = PublicUsageItem( + date=datetime.date(2024, 1, 15), + api_key_name="Production Key", + model="gpt-4", + requests_count=100, + tokens_input=5000, + tokens_output=3000, + cost=Decimal("0.50") + ) + + # Assert + assert result.date == datetime.date(2024, 1, 15) + assert result.api_key_name == "Production Key" + assert result.model == "gpt-4" + assert result.requests_count == 100 + assert result.tokens_input == 5000 + assert result.tokens_output == 3000 + assert result.cost == Decimal("0.50") + + def test_usage_item_no_key_value_exposed(self): + """Test that API key value is NOT exposed in usage item.""" + # Arrange & Act + result = PublicUsageItem( + date=datetime.date(2024, 1, 15), + api_key_name="My Key", # Only name, NOT the actual key value + model="gpt-4", + requests_count=1, + tokens_input=100, + tokens_output=50, + cost=Decimal("0.01") + ) + + # Assert - security check + assert not hasattr(result, 'api_key_value') + assert not hasattr(result, 'key_value') + + +class TestPaginationInfo: + """Test suite for PaginationInfo schema.""" + + def test_valid_pagination(self): + """Test valid pagination info.""" + # Arrange & Act + result = PaginationInfo( + page=2, + limit=100, + total=250, + pages=3 + ) + + # Assert + assert result.page == 2 + assert result.limit == 100 + assert result.total == 250 + assert result.pages == 3 + + def test_pagination_page_ge_1(self): + """Test that page must be >= 1.""" + # Act & Assert + with pytest.raises(ValidationError): + PaginationInfo(page=0, limit=10, total=100, pages=10) + + def test_pagination_limit_positive(self): + """Test that limit must be positive.""" + # Act & Assert + with pytest.raises(ValidationError): + PaginationInfo(page=1, limit=0, total=100, pages=10) + + +class TestPublicUsageResponse: + """Test suite for PublicUsageResponse schema.""" + + def test_valid_usage_response(self): + """Test complete usage response with pagination.""" + # Arrange + items = [ + PublicUsageItem( + date=datetime.date(2024, 1, 15), + api_key_name="Key 1", + model="gpt-4", + requests_count=100, + tokens_input=5000, + tokens_output=3000, + cost=Decimal("0.50") + ), + PublicUsageItem( + date=datetime.date(2024, 1, 16), + api_key_name="Key 2", + model="gpt-3.5", + requests_count=50, + tokens_input=2500, + tokens_output=1500, + cost=Decimal("0.25") + ) + ] + pagination = PaginationInfo(page=1, limit=100, total=2, pages=1) + + # Act + result = PublicUsageResponse(items=items, pagination=pagination) + + # Assert + assert len(result.items) == 2 + assert result.pagination.total == 2 + assert result.pagination.page == 1 + + def test_empty_usage_response(self): + """Test usage response with no items.""" + # Arrange + pagination = PaginationInfo(page=1, limit=100, total=0, pages=0) + + # Act + result = PublicUsageResponse(items=[], pagination=pagination) + + # Assert + assert result.items == [] + assert result.pagination.total == 0 + + +class TestPublicKeyInfo: + """Test suite for PublicKeyInfo schema.""" + + def test_valid_key_info(self): + """Test valid public key info without exposing actual key.""" + # Arrange & Act + result = PublicKeyInfo( + id=1, + name="Production Key", + is_active=True, + stats={ + "total_requests": 1000, + "total_cost": Decimal("5.50") + } + ) + + # Assert + assert result.id == 1 + assert result.name == "Production Key" + assert result.is_active is True + assert result.stats["total_requests"] == 1000 + assert result.stats["total_cost"] == Decimal("5.50") + + def test_key_info_no_value_field(self): + """Test that actual API key value is NOT in the schema.""" + # Arrange & Act + result = PublicKeyInfo( + id=1, + name="My Key", + is_active=True, + stats={"total_requests": 0, "total_cost": Decimal("0")} + ) + + # Assert - security check + assert not hasattr(result, 'key_value') + assert not hasattr(result, 'encrypted_value') + assert not hasattr(result, 'api_key') + + +class TestPublicKeyListResponse: + """Test suite for PublicKeyListResponse schema.""" + + def test_valid_key_list_response(self): + """Test key list response with multiple keys.""" + # Arrange + items = [ + PublicKeyInfo( + id=1, + name="Production", + is_active=True, + stats={"total_requests": 1000, "total_cost": Decimal("5.00")} + ), + PublicKeyInfo( + id=2, + name="Development", + is_active=True, + stats={"total_requests": 100, "total_cost": Decimal("0.50")} + ) + ] + + # Act + result = PublicKeyListResponse(items=items, total=2) + + # Assert + assert len(result.items) == 2 + assert result.total == 2 + assert result.items[0].name == "Production" + assert result.items[1].name == "Development" + + def test_empty_key_list_response(self): + """Test key list response with no keys.""" + # Arrange & Act + result = PublicKeyListResponse(items=[], total=0) + + # Assert + assert result.items == [] + assert result.total == 0 + + def test_key_list_no_values_exposed(self): + """Test security: no key values in list response.""" + # Arrange + items = [ + PublicKeyInfo( + id=1, + name="Key 1", + is_active=True, + stats={"total_requests": 10, "total_cost": Decimal("0.10")} + ) + ] + + # Act + result = PublicKeyListResponse(items=items, total=1) + + # Assert - security check for all items + for item in result.items: + assert not hasattr(item, 'key_value') + assert not hasattr(item, 'encrypted_value') \ No newline at end of file