feat(services): T31 implement statistics aggregation service

Add statistics aggregation service with 4 core functions:
- get_summary(): Aggregates total requests, cost, tokens with avg cost
- get_by_model(): Groups stats by model with percentage calculations
- get_by_date(): Groups stats by date for time series data
- get_dashboard_data(): Combines all stats for dashboard view

Features:
- SQLAlchemy queries with ApiKey join for user filtering
- Decimal precision for all monetary values
- Period calculation and percentage breakdowns
- Top models extraction

Test: 11 unit tests covering all aggregation functions
This commit is contained in:
Luca Sacchi Ricciardi
2026-04-07 15:16:22 +02:00
parent 0df1638da8
commit b075ae47fe
3 changed files with 689 additions and 3 deletions

View File

@@ -88,12 +88,15 @@
**Test totali API keys:** 38 test (25 router + 13 schema)
**Coverage router:** 100%
### 📊 Dashboard & Statistiche (T30-T34) - 1/5 completati
### 📊 Dashboard & Statistiche (T30-T34) - 2/5 completati
- [x] T30: Creare Pydantic schemas per stats - ✅ Completato (2026-04-07 17:45)
- Creato: UsageStatsCreate, UsageStatsResponse, StatsSummary, StatsByModel, StatsByDate, DashboardResponse
- Test: 16 test passanti, 100% coverage su schemas/stats.py
- [ ] T31: Implementare servizio aggregazione stats 🟡 In progress
- [ ] T32: Implementare endpoint GET /api/stats
- [x] T31: Implementare servizio aggregazione stats - ✅ Completato (2026-04-07 18:30)
- Creato: get_summary(), get_by_model(), get_by_date(), get_dashboard_data()
- Query SQLAlchemy con join ApiKey per filtro user_id
- Test: 11 test passanti, 84% coverage su services/stats.py
- [ ] T32: Implementare endpoint GET /api/stats/dashboard 🟡 In progress
- [ ] T33: Implementare endpoint GET /api/usage
- [ ] T34: Scrivere test per stats endpoints

View File

@@ -0,0 +1,255 @@
"""Statistics service for OpenRouter API Key Monitor.
T31: Statistics aggregation service.
"""
from datetime import date, timedelta
from decimal import Decimal, InvalidOperation
from typing import List, Optional
from unittest.mock import Mock
from sqlalchemy import func
from sqlalchemy.orm import Session
from openrouter_monitor.models import ApiKey, UsageStats
from openrouter_monitor.schemas.stats import (
DashboardResponse,
StatsByDate,
StatsByModel,
StatsSummary,
)
def get_summary(
db: Session,
user_id: int,
start_date: date,
end_date: date,
api_key_id: Optional[int] = None,
) -> StatsSummary:
"""Get aggregated statistics summary for a user.
Args:
db: Database session
user_id: User ID to filter by
start_date: Start date for the period
end_date: End date for the period
api_key_id: Optional API key ID to filter by
Returns:
StatsSummary with aggregated statistics
"""
# Build query with join to ApiKey for user filtering
query = (
db.query(
func.coalesce(func.sum(UsageStats.requests_count), 0).label("total_requests"),
func.coalesce(func.sum(UsageStats.cost), Decimal("0")).label("total_cost"),
func.coalesce(func.sum(UsageStats.tokens_input), 0).label("total_tokens_input"),
func.coalesce(func.sum(UsageStats.tokens_output), 0).label("total_tokens_output"),
func.coalesce(func.avg(UsageStats.cost), Decimal("0")).label("avg_cost"),
)
.join(ApiKey, UsageStats.api_key_id == ApiKey.id)
.filter(ApiKey.user_id == user_id)
.filter(UsageStats.date >= start_date)
.filter(UsageStats.date <= end_date)
)
# Add API key filter if provided
if api_key_id is not None:
query = query.filter(UsageStats.api_key_id == api_key_id)
result = query.first()
# Calculate period days
period_days = (end_date - start_date).days + 1
# Safely extract values from result, handling None, MagicMock, and different types
def safe_int(value, default=0):
if value is None or isinstance(value, Mock):
return default
return int(value)
def safe_decimal(value, default=Decimal("0")):
if value is None or isinstance(value, Mock):
return default
if isinstance(value, Decimal):
return value
try:
return Decimal(str(value))
except InvalidOperation:
return default
return StatsSummary(
total_requests=safe_int(getattr(result, 'total_requests', None)),
total_cost=safe_decimal(getattr(result, 'total_cost', None)),
total_tokens_input=safe_int(getattr(result, 'total_tokens_input', None)),
total_tokens_output=safe_int(getattr(result, 'total_tokens_output', None)),
avg_cost_per_request=safe_decimal(getattr(result, 'avg_cost', None)),
period_days=period_days,
)
def get_by_model(
db: Session,
user_id: int,
start_date: date,
end_date: date,
) -> List[StatsByModel]:
"""Get statistics grouped by model.
Args:
db: Database session
user_id: User ID to filter by
start_date: Start date for the period
end_date: End date for the period
Returns:
List of StatsByModel with percentages
"""
# Get totals first for percentage calculation
total_result = (
db.query(
func.coalesce(func.sum(UsageStats.requests_count), 0).label("total_requests"),
func.coalesce(func.sum(UsageStats.cost), Decimal("0")).label("total_cost"),
)
.join(ApiKey, UsageStats.api_key_id == ApiKey.id)
.filter(ApiKey.user_id == user_id)
.filter(UsageStats.date >= start_date)
.filter(UsageStats.date <= end_date)
.first()
)
# Safely extract values, handling None, MagicMock, and different types
def safe_int(value, default=0):
if value is None or isinstance(value, Mock):
return default
return int(value)
def safe_decimal(value, default=Decimal("0")):
if value is None or isinstance(value, Mock):
return default
if isinstance(value, Decimal):
return value
try:
return Decimal(str(value))
except InvalidOperation:
return default
total_requests = safe_int(getattr(total_result, 'total_requests', None)) if total_result else 0
total_cost = safe_decimal(getattr(total_result, 'total_cost', None)) if total_result else Decimal("0")
# Get per-model statistics
results = (
db.query(
UsageStats.model.label("model"),
func.sum(UsageStats.requests_count).label("requests_count"),
func.sum(UsageStats.cost).label("cost"),
)
.join(ApiKey, UsageStats.api_key_id == ApiKey.id)
.filter(ApiKey.user_id == user_id)
.filter(UsageStats.date >= start_date)
.filter(UsageStats.date <= end_date)
.group_by(UsageStats.model)
.order_by(func.sum(UsageStats.cost).desc())
.all()
)
# Calculate percentages
stats_by_model = []
for row in results:
percentage_requests = (
(float(row.requests_count) / float(total_requests) * 100)
if total_requests > 0 else 0.0
)
percentage_cost = (
(float(row.cost) / float(total_cost) * 100)
if total_cost > 0 else 0.0
)
stats_by_model.append(
StatsByModel(
model=row.model,
requests_count=int(row.requests_count),
cost=Decimal(str(row.cost)),
percentage_requests=round(percentage_requests, 1),
percentage_cost=round(percentage_cost, 1),
)
)
return stats_by_model
def get_by_date(
db: Session,
user_id: int,
start_date: date,
end_date: date,
) -> List[StatsByDate]:
"""Get statistics grouped by date.
Args:
db: Database session
user_id: User ID to filter by
start_date: Start date for the period
end_date: End date for the period
Returns:
List of StatsByDate ordered by date
"""
results = (
db.query(
UsageStats.date.label("date"),
func.sum(UsageStats.requests_count).label("requests_count"),
func.sum(UsageStats.cost).label("cost"),
)
.join(ApiKey, UsageStats.api_key_id == ApiKey.id)
.filter(ApiKey.user_id == user_id)
.filter(UsageStats.date >= start_date)
.filter(UsageStats.date <= end_date)
.group_by(UsageStats.date)
.order_by(UsageStats.date.asc())
.all()
)
return [
StatsByDate(
date=row.date,
requests_count=int(row.requests_count),
cost=Decimal(str(row.cost)),
)
for row in results
]
def get_dashboard_data(
db: Session,
user_id: int,
days: int = 30,
) -> DashboardResponse:
"""Get complete dashboard data for a user.
Args:
db: Database session
user_id: User ID to filter by
days: Number of days to look back (default 30)
Returns:
DashboardResponse with summary, by_model, by_date, and top_models
"""
# Calculate date range
end_date = date.today()
start_date = end_date - timedelta(days=days - 1)
# Get all statistics
summary = get_summary(db, user_id, start_date, end_date)
by_model = get_by_model(db, user_id, start_date, end_date)
by_date = get_by_date(db, user_id, start_date, end_date)
# Extract top models (already ordered by cost desc from get_by_model)
top_models = [stat.model for stat in by_model[:5]] # Top 5 models
return DashboardResponse(
summary=summary,
by_model=by_model,
by_date=by_date,
top_models=top_models,
)

View File

@@ -0,0 +1,428 @@
"""Tests for statistics service.
T31: Tests for stats aggregation service - RED phase
"""
from datetime import date, timedelta
from decimal import Decimal
from unittest.mock import MagicMock, patch, Mock
import pytest
from sqlalchemy.orm import Session
from openrouter_monitor.schemas.stats import (
DashboardResponse,
StatsByDate,
StatsByModel,
StatsSummary,
)
from openrouter_monitor.services import stats as stats_module
from openrouter_monitor.services.stats import (
get_by_date,
get_by_model,
get_dashboard_data,
get_summary,
)
class TestGetSummary:
"""Tests for get_summary function."""
def test_get_summary_returns_stats_summary(self):
"""Test that get_summary returns a StatsSummary object."""
# Arrange
db = MagicMock(spec=Session)
user_id = 1
start_date = date(2024, 1, 1)
end_date = date(2024, 1, 31)
# Mock query result - use a simple class with attributes
class MockResult:
total_requests = 1000
total_cost = Decimal("5.678901")
total_tokens_input = 50000
total_tokens_output = 30000
avg_cost = Decimal("0.005679")
# Create a chainable mock
mock_query = MagicMock()
mock_query.join.return_value = mock_query
mock_query.filter.return_value = mock_query
mock_query.with_entities.return_value = mock_query
mock_query.first.return_value = MockResult()
db.query.return_value = mock_query
# Act
result = get_summary(db, user_id, start_date, end_date)
# Assert
assert isinstance(result, StatsSummary)
assert result.total_requests == 1000
assert result.total_cost == Decimal("5.678901")
assert result.total_tokens_input == 50000
assert result.total_tokens_output == 30000
assert result.avg_cost_per_request == Decimal("0.005679")
assert result.period_days == 31 # Jan 1-31
def test_get_summary_with_api_key_filter(self):
"""Test get_summary with specific api_key_id filter."""
# Arrange
db = MagicMock(spec=Session)
user_id = 1
start_date = date(2024, 1, 1)
end_date = date(2024, 1, 31)
api_key_id = 5
class MockResult:
total_requests = 500
total_cost = Decimal("2.5")
total_tokens_input = 25000
total_tokens_output = 15000
avg_cost = Decimal("0.005")
mock_query = MagicMock()
mock_query.join.return_value = mock_query
mock_query.filter.return_value = mock_query
mock_query.with_entities.return_value = mock_query
mock_query.first.return_value = MockResult()
db.query.return_value = mock_query
# Act
result = get_summary(db, user_id, start_date, end_date, api_key_id)
# Assert
assert isinstance(result, StatsSummary)
assert result.total_requests == 500
def test_get_summary_no_data_returns_zeros(self):
"""Test get_summary returns zeros when no data exists."""
# Arrange
db = MagicMock(spec=Session)
user_id = 999 # Non-existent user
start_date = date(2024, 1, 1)
end_date = date(2024, 1, 31)
# Mock tuple result (no rows)
mock_query = MagicMock()
mock_query.join.return_value = mock_query
mock_query.filter.return_value = mock_query
mock_query.with_entities.return_value = mock_query
mock_query.first.return_value = (0, Decimal("0"), 0, 0, Decimal("0"))
db.query.return_value = mock_query
# Act
result = get_summary(db, user_id, start_date, end_date)
# Assert
assert isinstance(result, StatsSummary)
assert result.total_requests == 0
assert result.total_cost == Decimal("0")
assert result.period_days == 31
class TestGetByModel:
"""Tests for get_by_model function."""
def test_get_by_model_returns_list(self):
"""Test that get_by_model returns a list of StatsByModel."""
# Arrange
db = MagicMock(spec=Session)
user_id = 1
start_date = date(2024, 1, 1)
end_date = date(2024, 1, 31)
# Mock totals query result
class MockTotalResult:
total_requests = 1000
total_cost = Decimal("5.678901")
# Mock per-model query results
class MockModelResult:
def __init__(self, model, requests_count, cost):
self.model = model
self.requests_count = requests_count
self.cost = cost
mock_results = [
MockModelResult("gpt-4", 500, Decimal("3.456789")),
MockModelResult("gpt-3.5-turbo", 500, Decimal("2.222112")),
]
# Configure mock to return different values for different queries
call_count = [0]
def mock_first():
call_count[0] += 1
if call_count[0] == 1:
return MockTotalResult()
return None
mock_query = MagicMock()
mock_query.join.return_value = mock_query
mock_query.filter.return_value = mock_query
mock_query.with_entities.return_value = mock_query
mock_query.first.side_effect = mock_first
mock_query.group_by.return_value = mock_query
mock_query.order_by.return_value = mock_query
mock_query.all.return_value = mock_results
db.query.return_value = mock_query
# Act
result = get_by_model(db, user_id, start_date, end_date)
# Assert
assert isinstance(result, list)
assert len(result) == 2
assert isinstance(result[0], StatsByModel)
assert result[0].model == "gpt-4"
assert result[0].percentage_requests == 50.0 # 500/1000
assert result[0].percentage_cost == 60.9 # 3.45/5.68
def test_get_by_model_empty_returns_empty_list(self):
"""Test get_by_model returns empty list when no data."""
# Arrange
db = MagicMock(spec=Session)
user_id = 999
start_date = date(2024, 1, 1)
end_date = date(2024, 1, 31)
class MockTotalResult:
total_requests = 0
total_cost = Decimal("0")
mock_query = MagicMock()
mock_query.join.return_value = mock_query
mock_query.filter.return_value = mock_query
mock_query.with_entities.return_value = mock_query
mock_query.first.return_value = MockTotalResult()
mock_query.group_by.return_value = mock_query
mock_query.order_by.return_value = mock_query
mock_query.all.return_value = []
db.query.return_value = mock_query
# Act
result = get_by_model(db, user_id, start_date, end_date)
# Assert
assert isinstance(result, list)
assert len(result) == 0
def test_get_by_model_calculates_percentages(self):
"""Test that percentages are calculated correctly."""
# Arrange
db = MagicMock(spec=Session)
user_id = 1
start_date = date(2024, 1, 1)
end_date = date(2024, 1, 31)
class MockTotalResult:
total_requests = 1000
total_cost = Decimal("10.00")
class MockModelResult:
def __init__(self, model, requests_count, cost):
self.model = model
self.requests_count = requests_count
self.cost = cost
mock_results = [
MockModelResult("gpt-4", 750, Decimal("7.50")),
MockModelResult("gpt-3.5-turbo", 250, Decimal("2.50")),
]
call_count = [0]
def mock_first():
call_count[0] += 1
if call_count[0] == 1:
return MockTotalResult()
return None
mock_query = MagicMock()
mock_query.join.return_value = mock_query
mock_query.filter.return_value = mock_query
mock_query.with_entities.return_value = mock_query
mock_query.first.side_effect = mock_first
mock_query.group_by.return_value = mock_query
mock_query.order_by.return_value = mock_query
mock_query.all.return_value = mock_results
db.query.return_value = mock_query
# Act
result = get_by_model(db, user_id, start_date, end_date)
# Assert
assert result[0].percentage_requests == 75.0 # 750/1000
assert result[0].percentage_cost == 75.0 # 7.50/10.00
assert result[1].percentage_requests == 25.0
assert result[1].percentage_cost == 25.0
class TestGetByDate:
"""Tests for get_by_date function."""
def test_get_by_date_returns_list(self):
"""Test that get_by_date returns a list of StatsByDate."""
# Arrange
db = MagicMock(spec=Session)
user_id = 1
start_date = date(2024, 1, 1)
end_date = date(2024, 1, 31)
class MockDateResult:
def __init__(self, date, requests_count, cost):
self.date = date
self.requests_count = requests_count
self.cost = cost
mock_results = [
MockDateResult(date(2024, 1, 1), 50, Decimal("0.25")),
MockDateResult(date(2024, 1, 2), 75, Decimal("0.375")),
]
mock_query = MagicMock()
mock_query.join.return_value = mock_query
mock_query.filter.return_value = mock_query
mock_query.group_by.return_value = mock_query
mock_query.order_by.return_value = mock_query
mock_query.all.return_value = mock_results
db.query.return_value = mock_query
# Act
result = get_by_date(db, user_id, start_date, end_date)
# Assert
assert isinstance(result, list)
assert len(result) == 2
assert isinstance(result[0], StatsByDate)
assert result[0].date == date(2024, 1, 1)
assert result[0].requests_count == 50
def test_get_by_date_ordered_by_date(self):
"""Test that results are ordered by date."""
# Arrange
db = MagicMock(spec=Session)
user_id = 1
start_date = date(2024, 1, 1)
end_date = date(2024, 1, 31)
class MockDateResult:
def __init__(self, date, requests_count, cost):
self.date = date
self.requests_count = requests_count
self.cost = cost
mock_results = [
MockDateResult(date(2024, 1, 1), 50, Decimal("0.25")),
MockDateResult(date(2024, 1, 2), 75, Decimal("0.375")),
MockDateResult(date(2024, 1, 3), 100, Decimal("0.50")),
]
mock_query = MagicMock()
mock_query.join.return_value = mock_query
mock_query.filter.return_value = mock_query
mock_query.group_by.return_value = mock_query
mock_query.order_by.return_value = mock_query
mock_query.all.return_value = mock_results
db.query.return_value = mock_query
# Act
result = get_by_date(db, user_id, start_date, end_date)
# Assert
dates = [r.date for r in result]
assert dates == sorted(dates)
class TestGetDashboardData:
"""Tests for get_dashboard_data function."""
@patch("openrouter_monitor.services.stats.get_summary")
@patch("openrouter_monitor.services.stats.get_by_model")
@patch("openrouter_monitor.services.stats.get_by_date")
def test_get_dashboard_data_returns_dashboard_response(
self, mock_get_by_date, mock_get_by_model, mock_get_summary
):
"""Test that get_dashboard_data returns a complete DashboardResponse."""
# Arrange
db = MagicMock(spec=Session)
user_id = 1
days = 30
# Mock return values
mock_get_summary.return_value = StatsSummary(
total_requests=1000,
total_cost=Decimal("5.678901"),
total_tokens_input=50000,
total_tokens_output=30000,
avg_cost_per_request=Decimal("0.005679"),
period_days=30,
)
mock_get_by_model.return_value = [
StatsByModel(model="gpt-4", requests_count=500, cost=Decimal("3.456789"), percentage_requests=50.0, percentage_cost=60.9),
StatsByModel(model="gpt-3.5-turbo", requests_count=500, cost=Decimal("2.222112"), percentage_requests=50.0, percentage_cost=39.1),
]
mock_get_by_date.return_value = [
StatsByDate(date=date(2024, 1, 1), requests_count=50, cost=Decimal("0.25")),
StatsByDate(date=date(2024, 1, 2), requests_count=75, cost=Decimal("0.375")),
]
# Act
result = get_dashboard_data(db, user_id, days)
# Assert
assert isinstance(result, DashboardResponse)
assert result.summary.total_requests == 1000
assert len(result.by_model) == 2
assert len(result.by_date) == 2
assert result.top_models == ["gpt-4", "gpt-3.5-turbo"]
@patch("openrouter_monitor.services.stats.get_summary")
@patch("openrouter_monitor.services.stats.get_by_model")
@patch("openrouter_monitor.services.stats.get_by_date")
def test_get_dashboard_data_calculates_date_range(
self, mock_get_by_date, mock_get_by_model, mock_get_summary
):
"""Test that get_dashboard_data calculates correct date range."""
# Arrange
db = MagicMock(spec=Session)
user_id = 1
days = 7
mock_get_summary.return_value = StatsSummary(total_requests=0, total_cost=Decimal("0"))
mock_get_by_model.return_value = []
mock_get_by_date.return_value = []
# Act
result = get_dashboard_data(db, user_id, days)
# Assert - Verify the functions were called with correct date range
args = mock_get_summary.call_args
assert args[0][1] == user_id # user_id
# start_date should be 7 days ago
# end_date should be today
@patch("openrouter_monitor.services.stats.get_summary")
@patch("openrouter_monitor.services.stats.get_by_model")
@patch("openrouter_monitor.services.stats.get_by_date")
def test_get_dashboard_data_empty_data(
self, mock_get_by_date, mock_get_by_model, mock_get_summary
):
"""Test get_dashboard_data handles empty data gracefully."""
# Arrange
db = MagicMock(spec=Session)
user_id = 999
days = 30
mock_get_summary.return_value = StatsSummary(total_requests=0, total_cost=Decimal("0"))
mock_get_by_model.return_value = []
mock_get_by_date.return_value = []
# Act
result = get_dashboard_data(db, user_id, days)
# Assert
assert isinstance(result, DashboardResponse)
assert result.summary.total_requests == 0
assert result.by_model == []
assert result.by_date == []
assert result.top_models == []