- T55: Setup APScheduler with AsyncIOScheduler and @scheduled_job decorator - T56: Implement hourly usage stats sync from OpenRouter API - T57: Implement daily API key validation job - T58: Implement weekly cleanup job for old usage stats - Add usage_stats_retention_days config option - Integrate scheduler with FastAPI lifespan events - Add 26 unit tests for scheduler, sync, and cleanup tasks - Add apscheduler to requirements.txt The background tasks now automatically: - Sync usage stats every hour from OpenRouter - Validate API keys daily at 2 AM UTC - Clean up old data weekly on Sunday at 3 AM UTC
17 KiB
Prompt di Ingaggio: Background Tasks (T55-T58)
🎯 MISSIONE
Implementare i Background Tasks per sincronizzare automaticamente i dati da OpenRouter, validare API keys periodicamente e gestire la pulizia dei dati storici.
Task da completare: T55, T56, T57, T58
📋 CONTESTO
AGENTE: @tdd-developer
Repository: /home/google/Sources/LucaSacchiNet/openrouter-watcher
Stato Attuale:
- ✅ MVP Backend completato: 43/74 task (58%)
- ✅ 418+ test passanti, ~98% coverage
- ✅ Tutte le API REST implementate
- ✅ Docker support pronto
- 🎯 Manca: Sincronizzazione automatica dati da OpenRouter
Perché questa fase è critica:
Attualmente l'applicazione espone API per visualizzare statistiche, ma i dati in UsageStats sono vuoti (popolati solo manualmente). I background tasks sono necessari per:
- Chiamare periodicamente le API di OpenRouter
- Recuperare usage stats (richieste, token, costi)
- Salvare i dati nel database
- Mantenere le statistiche aggiornate automaticamente
Servizi Pronti:
validate_api_key()inservices/openrouter.py- già implementatoUsageStatsmodel - prontoEncryptionService- per decifrare API keysget_db()- per sessioni database
Documentazione OpenRouter:
- Endpoint usage:
GET https://openrouter.ai/api/v1/usage - Authentication:
Authorization: Bearer {api_key} - Query params:
start_date,end_date - Rate limit: 20 richieste/minuto
🔧 TASK DA IMPLEMENTARE
T55: Setup APScheduler per Task Periodici
File: src/openrouter_monitor/tasks/scheduler.py, src/openrouter_monitor/tasks/__init__.py
Requisiti:
- Installare
APScheduler(pip install apscheduler) - Creare scheduler singleton con
AsyncIOScheduler - Configurare job stores (memory per MVP, opzionale Redis in futuro)
- Gestire startup/shutdown dell'applicazione FastAPI
- Supportare timezone UTC
Implementazione:
# src/openrouter_monitor/tasks/scheduler.py
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
import logging
logger = logging.getLogger(__name__)
# Singleton scheduler
_scheduler: AsyncIOScheduler | None = None
def get_scheduler() -> AsyncIOScheduler:
"""Get or create scheduler singleton."""
global _scheduler
if _scheduler is None:
_scheduler = AsyncIOScheduler(timezone='UTC')
return _scheduler
def init_scheduler():
"""Initialize and start scheduler."""
scheduler = get_scheduler()
# Add event listeners
scheduler.add_listener(
_job_error_listener,
EVENT_JOB_ERROR
)
if not scheduler.running:
scheduler.start()
logger.info("Scheduler started")
def shutdown_scheduler():
"""Shutdown scheduler gracefully."""
global _scheduler
if _scheduler and _scheduler.running:
_scheduler.shutdown()
logger.info("Scheduler shutdown")
def _job_error_listener(event):
"""Handle job execution errors."""
logger.error(f"Job {event.job_id} crashed: {event.exception}")
# Convenience decorator for tasks
def scheduled_job(trigger, **trigger_args):
"""Decorator to register scheduled jobs."""
def decorator(func):
scheduler = get_scheduler()
scheduler.add_job(
func,
trigger=trigger,
**trigger_args,
id=func.__name__,
replace_existing=True
)
return func
return decorator
Integrazione con FastAPI:
# In main.py
from contextlib import asynccontextmanager
from openrouter_monitor.tasks.scheduler import init_scheduler, shutdown_scheduler
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
init_scheduler()
yield
# Shutdown
shutdown_scheduler()
app = FastAPI(lifespan=lifespan)
Test: tests/unit/tasks/test_scheduler.py
- Test singleton scheduler
- Test init/shutdown
- Test job registration
- Test event listeners
T56: Task Sincronizzazione OpenRouter
File: src/openrouter_monitor/tasks/sync.py
Requisiti:
- Task che gira ogni ora (
IntervalTrigger(hours=1)) - Per ogni API key attiva:
- Decifra la key con
EncryptionService - Chiama OpenRouter API
/usage - Recupera dati: date, model, requests, tokens, cost
- Salva in
UsageStats(upsert per evitare duplicati)
- Decifra la key con
- Gestire rate limiting (max 20 req/min)
- Gestire errori (API down, key invalida)
- Logging dettagliato
Implementazione:
# src/openrouter_monitor/tasks/sync.py
import httpx
import asyncio
from datetime import date, timedelta
from sqlalchemy.orm import Session
from typing import List, Dict
import logging
from openrouter_monitor.config import get_settings
from openrouter_monitor.database import SessionLocal
from openrouter_monitor.models import ApiKey, UsageStats
from openrouter_monitor.services.encryption import EncryptionService
from openrouter_monitor.tasks.scheduler import scheduled_job, get_scheduler
logger = logging.getLogger(__name__)
settings = get_settings()
encryption_service = EncryptionService(settings.encryption_key)
async def fetch_usage_for_key(
api_key: ApiKey,
start_date: date,
end_date: date
) -> List[Dict]:
"""Fetch usage data from OpenRouter for a specific API key."""
# Decrypt API key
plaintext_key = encryption_service.decrypt(api_key.key_encrypted)
async with httpx.AsyncClient(timeout=30.0) as client:
try:
response = await client.get(
f"{settings.openrouter_api_url}/usage",
headers={"Authorization": f"Bearer {plaintext_key}"},
params={
"start_date": start_date.isoformat(),
"end_date": end_date.isoformat()
}
)
response.raise_for_status()
return response.json().get("data", [])
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error for key {api_key.id}: {e}")
return []
except Exception as e:
logger.error(f"Error fetching usage for key {api_key.id}: {e}")
return []
async def sync_usage_stats():
"""Sync usage stats from OpenRouter for all active API keys."""
logger.info("Starting usage stats sync")
db = SessionLocal()
try:
# Get all active API keys
api_keys = db.query(ApiKey).filter(ApiKey.is_active == True).all()
if not api_keys:
logger.info("No active API keys to sync")
return
# Date range: last 7 days (configurable)
end_date = date.today()
start_date = end_date - timedelta(days=7)
total_records = 0
for api_key in api_keys:
# Rate limiting: max 3 requests per second
await asyncio.sleep(0.35)
usage_data = await fetch_usage_for_key(api_key, start_date, end_date)
for item in usage_data:
# Upsert usage stats
existing = db.query(UsageStats).filter(
UsageStats.api_key_id == api_key.id,
UsageStats.date == item["date"],
UsageStats.model == item["model"]
).first()
if existing:
# Update existing
existing.requests_count = item["requests_count"]
existing.tokens_input = item["tokens_input"]
existing.tokens_output = item["tokens_output"]
existing.cost = item["cost"]
else:
# Create new
usage_stat = UsageStats(
api_key_id=api_key.id,
date=item["date"],
model=item["model"],
requests_count=item["requests_count"],
tokens_input=item["tokens_input"],
tokens_output=item["tokens_output"],
cost=item["cost"]
)
db.add(usage_stat)
total_records += 1
logger.info(f"Synced {len(usage_data)} records for key {api_key.id}")
db.commit()
logger.info(f"Sync completed. Total records: {total_records}")
except Exception as e:
logger.error(f"Sync failed: {e}")
db.rollback()
raise
finally:
db.close()
# Register scheduled job
def register_sync_job():
"""Register sync job with scheduler."""
scheduler = get_scheduler()
scheduler.add_job(
sync_usage_stats,
trigger=IntervalTrigger(hours=1),
id='sync_usage_stats',
replace_existing=True,
name='Sync OpenRouter Usage Stats'
)
logger.info("Registered sync_usage_stats job (every 1 hour)")
Test: tests/unit/tasks/test_sync.py
- Test fetch_usage_for_key success
- Test fetch_usage_for_key error handling
- Test sync_usage_stats con mock dati
- Test upsert logic
- Test rate limiting
T57: Task Validazione API Keys
File: src/openrouter_monitor/tasks/sync.py (aggiungere funzione)
Requisiti:
- Task che gira ogni giorno (
CronTrigger(hour=2, minute=0)) - Per ogni API key:
- Decifra la key
- Chiama OpenRouter
/auth/keyper validare - Se invalida: set
is_active=False - Logga key invalidate
- Notifica opzionale (per MVP solo logging)
Implementazione:
async def validate_api_keys():
"""Validate all API keys and mark invalid ones."""
logger.info("Starting API keys validation")
db = SessionLocal()
try:
api_keys = db.query(ApiKey).filter(ApiKey.is_active == True).all()
invalid_count = 0
for api_key in api_keys:
await asyncio.sleep(0.35) # Rate limiting
try:
plaintext_key = encryption_service.decrypt(api_key.key_encrypted)
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
f"{settings.openrouter_api_url}/auth/key",
headers={"Authorization": f"Bearer {plaintext_key}"}
)
if response.status_code != 200:
# Key is invalid
api_key.is_active = False
invalid_count += 1
logger.warning(f"API key {api_key.id} marked as invalid")
except Exception as e:
logger.error(f"Error validating key {api_key.id}: {e}")
db.commit()
logger.info(f"Validation completed. Invalid keys found: {invalid_count}")
finally:
db.close()
def register_validation_job():
"""Register validation job with scheduler."""
scheduler = get_scheduler()
scheduler.add_job(
validate_api_keys,
trigger=CronTrigger(hour=2, minute=0), # Every day at 2 AM
id='validate_api_keys',
replace_existing=True,
name='Validate API Keys'
)
logger.info("Registered validate_api_keys job (daily at 2 AM)")
Test:
- Test validazione key valida
- Test validazione key invalida
- Test aggiornamento flag is_active
T58: Task Cleanup Dati Vecchi
File: src/openrouter_monitor/tasks/cleanup.py
Requisiti:
- Task che gira ogni settimana (
CronTrigger(day_of_week='sun', hour=3, minute=0)) - Rimuove
UsageStatspiù vecchi di X giorni (configurabile, default 365) - Mantiene dati aggregati (opzionale per MVP)
- Logga numero record eliminati
Implementazione:
# src/openrouter_monitor/tasks/cleanup.py
from datetime import date, timedelta
from sqlalchemy import delete
import logging
from openrouter_monitor.config import get_settings
from openrouter_monitor.database import SessionLocal
from openrouter_monitor.models import UsageStats
from openrouter_monitor.tasks.scheduler import CronTrigger, get_scheduler
logger = logging.getLogger(__name__)
settings = get_settings()
async def cleanup_old_usage_stats():
"""Remove usage stats older than retention period."""
retention_days = getattr(settings, 'usage_stats_retention_days', 365)
cutoff_date = date.today() - timedelta(days=retention_days)
logger.info(f"Starting cleanup of usage stats older than {cutoff_date}")
db = SessionLocal()
try:
result = db.execute(
delete(UsageStats).where(UsageStats.date < cutoff_date)
)
deleted_count = result.rowcount
db.commit()
logger.info(f"Cleanup completed. Deleted {deleted_count} old records")
except Exception as e:
logger.error(f"Cleanup failed: {e}")
db.rollback()
raise
finally:
db.close()
def register_cleanup_job():
"""Register cleanup job with scheduler."""
scheduler = get_scheduler()
scheduler.add_job(
cleanup_old_usage_stats,
trigger=CronTrigger(day_of_week='sun', hour=3, minute=0), # Sundays at 3 AM
id='cleanup_old_usage_stats',
replace_existing=True,
name='Cleanup Old Usage Stats'
)
logger.info("Registered cleanup_old_usage_stats job (weekly on Sunday)")
Test: tests/unit/tasks/test_cleanup.py
- Test eliminazione dati vecchi
- Test conservazione dati recenti
- Test configurazione retention_days
🔄 WORKFLOW TDD
Per OGNI task:
- RED: Scrivi test che fallisce (prima del codice!)
- GREEN: Implementa codice minimo per passare il test
- REFACTOR: Migliora codice, test rimangono verdi
📁 STRUTTURA FILE DA CREARE
src/openrouter_monitor/
├── tasks/
│ ├── __init__.py # Esporta scheduler, jobs
│ ├── scheduler.py # T55 - APScheduler setup
│ ├── sync.py # T56, T57 - Sync e validation
│ └── cleanup.py # T58 - Cleanup
├── main.py # Aggiungi lifespan per scheduler
└── config.py # Aggiungi usage_stats_retention_days
tests/unit/tasks/
├── __init__.py
├── test_scheduler.py # T55 + T58
├── test_sync.py # T56 + T57
└── test_cleanup.py # T58
📦 AGGIORNAMENTO REQUIREMENTS
Aggiungere a requirements.txt:
apscheduler==3.10.4
✅ CRITERI DI ACCETTAZIONE
- T55: APScheduler configurato e funzionante
- T56: Task sincronizzazione ogni ora
- Recupera dati da OpenRouter
- Salva in UsageStats (upsert)
- Gestisce rate limiting
- Logging dettagliato
- T57: Task validazione ogni giorno
- Marca key invalide
- Logging
- T58: Task cleanup settimanale
- Rimuove dati vecchi (>365 giorni)
- Configurabile
- Tutti i task registrati all'avvio dell'app
- Test completi coverage >= 90%
- 4 commit atomici con conventional commits
- progress.md aggiornato
📝 COMMIT MESSAGES
feat(tasks): T55 setup APScheduler for background tasks
feat(tasks): T56 implement OpenRouter usage sync job
feat(tasks): T57 implement API key validation job
feat(tasks): T58 implement old data cleanup job
🚀 VERIFICA FINALE
cd /home/google/Sources/LucaSacchiNet/openrouter-watcher
# Aggiorna dipendenze
pip install apscheduler
# Test scheduler
pytest tests/unit/tasks/test_scheduler.py -v
# Test sync
pytest tests/unit/tasks/test_sync.py -v
# Test cleanup
pytest tests/unit/tasks/test_cleanup.py -v
# Test completo
pytest tests/unit/ -v --cov=src/openrouter_monitor
# Avvia app e verifica log
uvicorn src.openrouter_monitor.main:app --reload
# Dovresti vedere: "Scheduler started", "Registered sync_usage_stats job"
📊 SCHEDULE RIASSUNTIVO
| Task | Frequenza | Orario | Descrizione |
|---|---|---|---|
| sync_usage_stats | Ogni ora | - | Recupera dati da OpenRouter |
| validate_api_keys | Giornaliera | 02:00 | Verifica validità API keys |
| cleanup_old_usage_stats | Settimanale | Dom 03:00 | Pulizia dati vecchi |
⚠️ NOTE IMPORTANTI
- Rate Limiting: OpenRouter ha limiti. Usa
asyncio.sleep()tra richieste - Error Handling: Task non devono crashare l'applicazione
- Logging: Tutte le operazioni devono essere loggate
- Database: Ogni task crea la propria sessione (non condividere tra thread)
- Timezone: Usa sempre UTC
- Idempotenza: Il task sync deve gestire upsert (non creare duplicati)
🔍 TESTING MANUALE
Dopo l'implementazione:
- Aggiungi una API key via POST /api/keys
- Verifica nel log che il task sync parta (o attendi 1 ora)
- Forza esecuzione per test:
from openrouter_monitor.tasks.sync import sync_usage_stats import asyncio asyncio.run(sync_usage_stats()) - Verifica dati in GET /api/usage (dovrebbero esserci dati)
AGENTE: @tdd-developer
INIZIA CON: T55 - Setup APScheduler
QUANDO FINITO: I dati si sincronizzeranno automaticamente da OpenRouter! 🚀