#!/usr/bin/env python3 """ Database Performance Benchmark Tool for mockupAWS v1.0.0 Usage: python scripts/benchmark_db.py --before # Run before optimization python scripts/benchmark_db.py --after # Run after optimization python scripts/benchmark_db.py --compare # Compare before/after """ import asyncio import argparse import json import time import statistics from datetime import datetime from typing import List, Dict, Any from contextlib import asynccontextmanager import asyncpg from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy import select, func, text from sqlalchemy.orm import selectinload from src.core.database import DATABASE_URL from src.models.scenario import Scenario from src.models.scenario_log import ScenarioLog from src.models.scenario_metric import ScenarioMetric from src.models.report import Report class DatabaseBenchmark: """Benchmark database query performance.""" def __init__(self, database_url: str): self.database_url = database_url self.results: Dict[str, List[float]] = {} self.engine = create_async_engine( database_url, pool_size=10, max_overflow=20, echo=False, ) @asynccontextmanager async def get_session(self): """Get database session.""" async with AsyncSession(self.engine) as session: yield session async def run_query_benchmark( self, name: str, query_func, iterations: int = 10 ) -> Dict[str, Any]: """Benchmark a query function.""" times = [] for i in range(iterations): start = time.perf_counter() try: await query_func() except Exception as e: print(f" Error in {name} (iter {i}): {e}") end = time.perf_counter() times.append((end - start) * 1000) # Convert to ms result = { "query_name": name, "iterations": iterations, "min_ms": round(min(times), 2), "max_ms": round(max(times), 2), "avg_ms": round(statistics.mean(times), 2), "median_ms": round(statistics.median(times), 2), "p95_ms": round(sorted(times)[int(len(times) * 0.95)], 2), "p99_ms": round(sorted(times)[int(len(times) * 0.99)], 2), } self.results[name] = times return result # ========================================================================= # BENCHMARK QUERIES # ========================================================================= async def benchmark_scenario_list(self): """Benchmark: List scenarios with pagination.""" async with self.get_session() as db: result = await db.execute( select(Scenario).order_by(Scenario.created_at.desc()).limit(100) ) scenarios = result.scalars().all() _ = [s.id for s in scenarios] # Force evaluation async def benchmark_scenario_by_status(self): """Benchmark: List scenarios filtered by status.""" async with self.get_session() as db: result = await db.execute( select(Scenario) .where(Scenario.status == "running") .order_by(Scenario.created_at.desc()) .limit(50) ) scenarios = result.scalars().all() _ = [s.id for s in scenarios] async def benchmark_scenario_with_relations(self): """Benchmark: Load scenario with logs and metrics (N+1 test).""" async with self.get_session() as db: result = await db.execute( select(Scenario) .options(selectinload(Scenario.logs), selectinload(Scenario.metrics)) .limit(10) ) scenarios = result.scalars().all() for s in scenarios: _ = len(s.logs) _ = len(s.metrics) async def benchmark_logs_by_scenario(self): """Benchmark: Get logs for a scenario.""" async with self.get_session() as db: # Get first scenario result = await db.execute(select(Scenario).limit(1)) scenario = result.scalar_one_or_none() if scenario: result = await db.execute( select(ScenarioLog) .where(ScenarioLog.scenario_id == scenario.id) .order_by(ScenarioLog.received_at.desc()) .limit(100) ) logs = result.scalars().all() _ = [l.id for l in logs] async def benchmark_logs_by_scenario_and_date(self): """Benchmark: Get logs filtered by scenario and date range.""" async with self.get_session() as db: result = await db.execute(select(Scenario).limit(1)) scenario = result.scalar_one_or_none() if scenario: from datetime import datetime, timedelta date_from = datetime.utcnow() - timedelta(days=7) result = await db.execute( select(ScenarioLog) .where( (ScenarioLog.scenario_id == scenario.id) & (ScenarioLog.received_at >= date_from) ) .order_by(ScenarioLog.received_at.desc()) .limit(100) ) logs = result.scalars().all() _ = [l.id for l in logs] async def benchmark_logs_aggregate(self): """Benchmark: Aggregate log statistics.""" async with self.get_session() as db: result = await db.execute( select( ScenarioLog.scenario_id, func.count(ScenarioLog.id).label("count"), func.sum(ScenarioLog.size_bytes).label("total_size"), func.avg(ScenarioLog.size_bytes).label("avg_size"), ) .group_by(ScenarioLog.scenario_id) .limit(100) ) _ = result.all() async def benchmark_metrics_time_series(self): """Benchmark: Time-series metrics query.""" async with self.get_session() as db: result = await db.execute(select(Scenario).limit(1)) scenario = result.scalar_one_or_none() if scenario: from datetime import datetime, timedelta date_from = datetime.utcnow() - timedelta(days=30) result = await db.execute( select(ScenarioMetric) .where( (ScenarioMetric.scenario_id == scenario.id) & (ScenarioMetric.timestamp >= date_from) & (ScenarioMetric.metric_type == "lambda") ) .order_by(ScenarioMetric.timestamp) .limit(1000) ) metrics = result.scalars().all() _ = [m.id for m in metrics] async def benchmark_pii_detection_query(self): """Benchmark: Query logs with PII.""" async with self.get_session() as db: result = await db.execute( select(ScenarioLog) .where(ScenarioLog.has_pii == True) .order_by(ScenarioLog.received_at.desc()) .limit(100) ) logs = result.scalars().all() _ = [l.id for l in logs] async def benchmark_reports_by_scenario(self): """Benchmark: Get reports for scenario.""" async with self.get_session() as db: result = await db.execute(select(Scenario).limit(1)) scenario = result.scalar_one_or_none() if scenario: result = await db.execute( select(Report) .where(Report.scenario_id == scenario.id) .order_by(Report.created_at.desc()) .limit(50) ) reports = result.scalars().all() _ = [r.id for r in reports] async def benchmark_materialized_view(self): """Benchmark: Query materialized view.""" async with self.get_session() as db: result = await db.execute( text(""" SELECT * FROM mv_scenario_daily_stats WHERE log_date > NOW() - INTERVAL '7 days' LIMIT 100 """) ) _ = result.all() async def benchmark_count_by_status(self): """Benchmark: Count scenarios by status.""" async with self.get_session() as db: result = await db.execute( select(Scenario.status, func.count(Scenario.id)).group_by( Scenario.status ) ) _ = result.all() # ========================================================================= # MAIN BENCHMARK RUNNER # ========================================================================= async def run_all_benchmarks(self, iterations: int = 10) -> List[Dict[str, Any]]: """Run all benchmark queries.""" benchmarks = [ ("scenario_list", self.benchmark_scenario_list), ("scenario_by_status", self.benchmark_scenario_by_status), ("scenario_with_relations", self.benchmark_scenario_with_relations), ("logs_by_scenario", self.benchmark_logs_by_scenario), ("logs_by_scenario_and_date", self.benchmark_logs_by_scenario_and_date), ("logs_aggregate", self.benchmark_logs_aggregate), ("metrics_time_series", self.benchmark_metrics_time_series), ("pii_detection_query", self.benchmark_pii_detection_query), ("reports_by_scenario", self.benchmark_reports_by_scenario), ("materialized_view", self.benchmark_materialized_view), ("count_by_status", self.benchmark_count_by_status), ] results = [] print( f"\nRunning {len(benchmarks)} benchmarks with {iterations} iterations each..." ) print("=" * 80) for name, func in benchmarks: print(f"\nBenchmarking: {name}") result = await self.run_query_benchmark(name, func, iterations) results.append(result) print( f" Avg: {result['avg_ms']}ms | P95: {result['p95_ms']}ms | P99: {result['p99_ms']}ms" ) await self.engine.dispose() return results def save_results(results: List[Dict[str, Any]], filename: str): """Save benchmark results to JSON file.""" output = { "timestamp": datetime.utcnow().isoformat(), "version": "1.0.0", "results": results, "summary": { "total_queries": len(results), "avg_response_ms": round( statistics.mean([r["avg_ms"] for r in results]), 2 ), "max_response_ms": max([r["max_ms"] for r in results]), "min_response_ms": min([r["min_ms"] for r in results]), }, } with open(filename, "w") as f: json.dump(output, f, indent=2) print(f"\nResults saved to: {filename}") def compare_results(before_file: str, after_file: str): """Compare before and after benchmark results.""" with open(before_file) as f: before = json.load(f) with open(after_file) as f: after = json.load(f) print("\n" + "=" * 100) print("PERFORMANCE COMPARISON: BEFORE vs AFTER OPTIMIZATION") print("=" * 100) print( f"{'Query':<40} {'Before':>12} {'After':>12} {'Improvement':>15} {'Change':>10}" ) print("-" * 100) before_results = {r["query_name"]: r for r in before["results"]} after_results = {r["query_name"]: r for r in after["results"]} improvements = [] for name in before_results: if name in after_results: before_avg = before_results[name]["avg_ms"] after_avg = after_results[name]["avg_ms"] improvement = before_avg - after_avg pct_change = ( ((before_avg - after_avg) / before_avg * 100) if before_avg > 0 else 0 ) improvements.append( { "query": name, "before": before_avg, "after": after_avg, "improvement_ms": improvement, "pct_change": pct_change, } ) status = "✓ FASTER" if improvement > 0 else "✗ SLOWER" print( f"{name:<40} {before_avg:>10}ms {after_avg:>10}ms {improvement:>12}ms {status:>10}" ) print("-" * 100) avg_improvement = statistics.mean([i["pct_change"] for i in improvements]) total_improvement_ms = sum([i["improvement_ms"] for i in improvements]) print(f"\nAverage improvement: {avg_improvement:.1f}%") print(f"Total time saved: {total_improvement_ms:.2f}ms across all queries") print( f"Overall status: {'✓ OPTIMIZATION SUCCESSFUL' if avg_improvement > 10 else '⚠ MODERATE IMPROVEMENT'}" ) async def main(): parser = argparse.ArgumentParser(description="Database Performance Benchmark") parser.add_argument("--before", action="store_true", help="Run before optimization") parser.add_argument("--after", action="store_true", help="Run after optimization") parser.add_argument("--compare", action="store_true", help="Compare before/after") parser.add_argument( "--iterations", type=int, default=10, help="Number of iterations" ) parser.add_argument("--database-url", default=DATABASE_URL, help="Database URL") args = parser.parse_args() if args.compare: compare_results("benchmark_before.json", "benchmark_after.json") return benchmark = DatabaseBenchmark(args.database_url) results = await benchmark.run_all_benchmarks(iterations=args.iterations) if args.before: save_results(results, "benchmark_before.json") elif args.after: save_results(results, "benchmark_after.json") else: save_results(results, "benchmark_results.json") # Print summary print("\n" + "=" * 80) print("BENCHMARK SUMMARY") print("=" * 80) print(f"Total queries tested: {len(results)}") print( f"Average response time: {statistics.mean([r['avg_ms'] for r in results]):.2f}ms" ) print(f"Slowest query: {max([r['avg_ms'] for r in results]):.2f}ms") print(f"Fastest query: {min([r['avg_ms'] for r in results]):.2f}ms") # Find queries > 200ms (SLA target) slow_queries = [r for r in results if r["avg_ms"] > 200] if slow_queries: print(f"\n⚠ Queries exceeding 200ms SLA target: {len(slow_queries)}") for q in slow_queries: print(f" - {q['query_name']}: {q['avg_ms']}ms") else: print("\n✓ All queries meet <200ms SLA target") if __name__ == "__main__": asyncio.run(main())