""" Locust load testing suite for mockupAWS v1.0.0 Alternative to k6 for Python-based performance testing """ import json import random from datetime import datetime from locust import HttpUser, task, between, events from locust.runners import MasterRunner # Test data test_scenarios = [] test_users = [] class BaseUser(HttpUser): """Base user class with common functionality""" wait_time = between(1, 3) abstract = True def on_start(self): """Setup before test starts""" self.headers = { "Content-Type": "application/json", } self.scenario_id = None class RegularUser(BaseUser): """Simulates a regular user browsing and creating scenarios""" weight = 3 @task(5) def view_dashboard(self): """View dashboard with scenarios list""" with self.client.get( "/api/v1/scenarios?page=1&page_size=20", headers=self.headers, catch_response=True, name="/api/v1/scenarios", ) as response: if response.status_code == 200: response.success() elif response.status_code == 401: response.success() # Expected for unauthenticated else: response.failure(f"Unexpected status: {response.status_code}") @task(3) def view_metrics(self): """View dashboard metrics""" self.client.get( "/api/v1/metrics/dashboard", headers=self.headers, name="/api/v1/metrics/dashboard", ) @task(2) def view_reports(self): """View reports list""" self.client.get( "/api/v1/reports?page=1&page_size=10", headers=self.headers, name="/api/v1/reports", ) @task(1) def create_scenario(self): """Create a new scenario""" scenario_data = { "name": f"LocustTest_{random.randint(1, 100000)}", "description": "Scenario created during load test", "region": random.choice(["us-east-1", "eu-west-1", "ap-south-1"]), "tags": ["load-test", "locust"], } with self.client.post( "/api/v1/scenarios", json=scenario_data, headers=self.headers, catch_response=True, name="/api/v1/scenarios (POST)", ) as response: if response.status_code == 201: response.success() # Store scenario ID for future requests try: self.scenario_id = response.json().get("id") except: pass elif response.status_code == 401: response.success() else: response.failure(f"Create failed: {response.status_code}") class IngestUser(BaseUser): """Simulates high-volume log ingestion""" weight = 5 wait_time = between(0.1, 0.5) # Higher frequency @task(10) def ingest_log(self): """Send a single log entry""" log_data = { "message": f"Test log message {random.randint(1, 1000000)}", "source": "locust-test", "level": random.choice(["INFO", "WARN", "ERROR", "DEBUG"]), "timestamp": datetime.utcnow().isoformat(), "metadata": { "test_id": f"test_{random.randint(1, 10000)}", "request_id": f"req_{random.randint(1, 1000000)}", }, } headers = { **self.headers, "X-Scenario-ID": f"scenario_{random.randint(1, 100)}", } with self.client.post( "/ingest", json=log_data, headers=headers, catch_response=True, name="/ingest", ) as response: if response.status_code in [200, 202]: response.success() elif response.status_code == 429: response.success() # Rate limited - expected under load else: response.failure(f"Ingest failed: {response.status_code}") @task(2) def ingest_batch(self): """Send batch of logs""" logs = [] for _ in range(random.randint(5, 20)): logs.append( { "message": f"Batch log {random.randint(1, 1000000)}", "source": "locust-batch-test", "level": "INFO", } ) headers = { **self.headers, "X-Scenario-ID": f"batch_scenario_{random.randint(1, 50)}", } self.client.post( "/ingest/batch", json={"logs": logs}, headers=headers, name="/ingest/batch" ) class AuthUser(BaseUser): """Simulates authentication operations""" weight = 1 @task(3) def login(self): """Attempt login""" login_data = { "username": f"user_{random.randint(1, 1000)}@test.com", "password": "testpassword123", } with self.client.post( "/api/v1/auth/login", json=login_data, headers=self.headers, catch_response=True, name="/api/v1/auth/login", ) as response: if response.status_code == 200: response.success() # Store token try: token = response.json().get("access_token") if token: self.headers["Authorization"] = f"Bearer {token}" except: pass elif response.status_code == 401: response.success() # Invalid credentials - expected else: response.failure(f"Login error: {response.status_code}") @task(1) def register(self): """Attempt registration""" register_data = { "email": f"newuser_{random.randint(1, 100000)}@test.com", "password": "NewUserPass123!", "full_name": "Test User", } self.client.post( "/api/v1/auth/register", json=register_data, headers=self.headers, name="/api/v1/auth/register", ) class AdminUser(BaseUser): """Simulates admin operations""" weight = 1 @task(2) def view_all_scenarios(self): """View all scenarios with pagination""" self.client.get( f"/api/v1/scenarios?page=1&page_size=50", headers=self.headers, name="/api/v1/scenarios (admin)", ) @task(1) def generate_report(self): """Generate a report""" report_data = { "format": random.choice(["pdf", "csv"]), "include_logs": random.choice([True, False]), "date_range": "last_7_days", } scenario_id = f"scenario_{random.randint(1, 100)}" with self.client.post( f"/api/v1/scenarios/{scenario_id}/reports", json=report_data, headers=self.headers, catch_response=True, name="/api/v1/scenarios/[id]/reports", ) as response: if response.status_code in [200, 201, 202]: response.success() elif response.status_code == 401: response.success() else: response.failure(f"Report failed: {response.status_code}") @task(1) def view_comparison(self): """View scenario comparison""" scenario_ids = [f"scenario_{random.randint(1, 100)}" for _ in range(3)] ids_param = ",".join(scenario_ids) self.client.get( f"/api/v1/scenarios/compare?ids={ids_param}", headers=self.headers, name="/api/v1/scenarios/compare", ) # Event hooks @events.test_start.add_listener def on_test_start(environment, **kwargs): """Called when the test starts""" print(f"\n{'=' * 50}") print(f"Starting mockupAWS Load Test") print(f"Target: {environment.host}") print(f"{'=' * 50}\n") @events.test_stop.add_listener def on_test_stop(environment, **kwargs): """Called when the test stops""" print(f"\n{'=' * 50}") print(f"Load Test Completed") # Print statistics stats = environment.runner.stats print(f"\nTotal Requests: {stats.total.num_requests}") print(f"Failed Requests: {stats.total.num_failures}") print( f"Error Rate: {(stats.total.num_failures / max(stats.total.num_requests, 1) * 100):.2f}%" ) if stats.total.num_requests > 0: print(f"\nResponse Times:") print(f" Average: {stats.total.avg_response_time:.2f}ms") print(f" Min: {stats.total.min_response_time:.2f}ms") print(f" Max: {stats.total.max_response_time:.2f}ms") print(f" P50: {stats.total.get_response_time_percentile(0.5):.2f}ms") print(f" P95: {stats.total.get_response_time_percentile(0.95):.2f}ms") print(f"{'=' * 50}\n") @events.request.add_listener def on_request( request_type, name, response_time, response_length, response, context, exception, **kwargs, ): """Called on each request""" # Log slow requests if response_time > 1000: print(f"SLOW REQUEST: {name} took {response_time}ms") # Log errors if exception: print(f"ERROR: {name} - {exception}")