"""API v2 scenarios endpoints with enhanced features.""" from uuid import UUID from datetime import datetime from typing import Optional, List from fastapi import APIRouter, Depends, Query, status, Request, Header from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func from src.api.deps import get_db from src.api.v2.rate_limiter import RateLimiter, TieredRateLimit from src.repositories.scenario import scenario_repository, ScenarioStatus from src.schemas.scenario import ( ScenarioCreate, ScenarioUpdate, ScenarioResponse, ScenarioList, ) from src.core.exceptions import NotFoundException, ValidationException from src.core.config import settings from src.core.cache import cache_manager, cached from src.core.monitoring import track_db_query, metrics from src.core.audit_logger import audit_logger, AuditEventType from src.core.logging_config import get_logger, set_correlation_id logger = get_logger(__name__) router = APIRouter() # Rate limiter rate_limiter = TieredRateLimit() @router.get( "", response_model=ScenarioList, summary="List scenarios", description="List all scenarios with advanced filtering and pagination.", responses={ 200: {"description": "List of scenarios"}, 429: {"description": "Rate limit exceeded"}, }, ) async def list_scenarios( request: Request, status: Optional[str] = Query(None, description="Filter by status"), region: Optional[str] = Query(None, description="Filter by region"), search: Optional[str] = Query(None, description="Search in name/description"), sort_by: str = Query("created_at", description="Sort field"), sort_order: str = Query("desc", description="Sort order (asc/desc)"), page: int = Query(1, ge=1, description="Page number"), page_size: int = Query( settings.default_page_size, ge=1, le=settings.max_page_size, description="Items per page", ), include_archived: bool = Query(False, description="Include archived scenarios"), db: AsyncSession = Depends(get_db), x_api_key: Optional[str] = Header(None, alias="X-API-Key"), ): """List scenarios with filtering and pagination. - **status**: Filter by scenario status (draft, running, completed, archived) - **region**: Filter by AWS region - **search**: Search in name and description - **sort_by**: Sort field (name, created_at, updated_at, status) - **sort_order**: Sort order (asc, desc) - **page**: Page number (1-based) - **page_size**: Number of items per page - **include_archived**: Include archived scenarios in results """ # Rate limiting await rate_limiter.check_rate_limit(request, x_api_key, tier="free") # Check cache for common queries cache_key = f"scenarios:list:{status}:{region}:{page}:{page_size}" cached_result = await cache_manager.get(cache_key) if cached_result and not search: # Don't cache search results metrics.track_cache_hit("l1") return ScenarioList(**cached_result) metrics.track_cache_miss("l1") skip = (page - 1) * page_size # Build filters filters = {} if status: filters["status"] = status if region: filters["region"] = region if not include_archived: filters["status__ne"] = "archived" # Get scenarios start_time = datetime.utcnow() scenarios = await scenario_repository.get_multi( db, skip=skip, limit=page_size, **filters ) total = await scenario_repository.count(db, **filters) # Track query time duration = (datetime.utcnow() - start_time).total_seconds() track_db_query("SELECT", "scenarios", duration) result = ScenarioList( items=scenarios, total=total, page=page, page_size=page_size, ) # Cache result if not search: await cache_manager.set( cache_key, result.model_dump(), ttl=cache_manager.TTL_L1_QUERIES, ) return result @router.post( "", response_model=ScenarioResponse, status_code=status.HTTP_201_CREATED, summary="Create scenario", description="Create a new scenario.", responses={ 201: {"description": "Scenario created successfully"}, 400: {"description": "Validation error"}, 409: {"description": "Scenario with name already exists"}, 429: {"description": "Rate limit exceeded"}, }, ) async def create_scenario( request: Request, scenario_in: ScenarioCreate, db: AsyncSession = Depends(get_db), x_api_key: Optional[str] = Header(None, alias="X-API-Key"), x_user_id: Optional[str] = Header(None, alias="X-User-ID"), ): """Create a new scenario. Creates a new cost simulation scenario with the specified configuration. """ # Rate limiting (stricter for writes) await rate_limiter.check_rate_limit(request, x_api_key, tier="free") # Check for duplicate name existing = await scenario_repository.get_by_name(db, scenario_in.name) if existing: raise ValidationException( f"Scenario with name '{scenario_in.name}' already exists" ) # Create scenario scenario = await scenario_repository.create(db, obj_in=scenario_in.model_dump()) # Track metrics metrics.increment_counter( "scenarios_created_total", labels={"region": scenario.region, "status": scenario.status}, ) # Audit log audit_logger.log_scenario_event( event_type=AuditEventType.SCENARIO_CREATED, scenario_id=scenario.id, user_id=UUID(x_user_id) if x_user_id else None, ip_address=request.client.host if request.client else None, details={"name": scenario.name, "region": scenario.region}, ) # Invalidate cache await cache_manager.invalidate_l1("list_scenarios") return scenario @router.get( "/{scenario_id}", response_model=ScenarioResponse, summary="Get scenario", description="Get a specific scenario by ID.", responses={ 200: {"description": "Scenario found"}, 404: {"description": "Scenario not found"}, 429: {"description": "Rate limit exceeded"}, }, ) async def get_scenario( request: Request, scenario_id: UUID, db: AsyncSession = Depends(get_db), x_api_key: Optional[str] = Header(None, alias="X-API-Key"), ): """Get a specific scenario by ID.""" # Rate limiting await rate_limiter.check_rate_limit(request, x_api_key, tier="free") # Check cache cache_key = f"scenario:{scenario_id}" cached = await cache_manager.get(cache_key) if cached: metrics.track_cache_hit("l1") return ScenarioResponse(**cached) metrics.track_cache_miss("l1") # Get from database scenario = await scenario_repository.get(db, scenario_id) if not scenario: raise NotFoundException("Scenario") # Cache result await cache_manager.set( cache_key, scenario.model_dump(), ttl=cache_manager.TTL_L1_QUERIES, ) return scenario @router.put( "/{scenario_id}", response_model=ScenarioResponse, summary="Update scenario", description="Update a scenario.", responses={ 200: {"description": "Scenario updated"}, 400: {"description": "Validation error"}, 404: {"description": "Scenario not found"}, 409: {"description": "Name conflict"}, 429: {"description": "Rate limit exceeded"}, }, ) async def update_scenario( request: Request, scenario_id: UUID, scenario_in: ScenarioUpdate, db: AsyncSession = Depends(get_db), x_api_key: Optional[str] = Header(None, alias="X-API-Key"), x_user_id: Optional[str] = Header(None, alias="X-User-ID"), ): """Update a scenario.""" # Rate limiting await rate_limiter.check_rate_limit(request, x_api_key, tier="free") scenario = await scenario_repository.get(db, scenario_id) if not scenario: raise NotFoundException("Scenario") # Check name conflict if scenario_in.name and scenario_in.name != scenario.name: existing = await scenario_repository.get_by_name(db, scenario_in.name) if existing: raise ValidationException( f"Scenario with name '{scenario_in.name}' already exists" ) # Update updated = await scenario_repository.update( db, db_obj=scenario, obj_in=scenario_in.model_dump(exclude_unset=True) ) # Audit log audit_logger.log_scenario_event( event_type=AuditEventType.SCENARIO_UPDATED, scenario_id=scenario_id, user_id=UUID(x_user_id) if x_user_id else None, ip_address=request.client.host if request.client else None, details={ "updated_fields": list(scenario_in.model_dump(exclude_unset=True).keys()) }, ) # Invalidate cache await cache_manager.delete(f"scenario:{scenario_id}") await cache_manager.invalidate_l1("list_scenarios") return updated @router.delete( "/{scenario_id}", status_code=status.HTTP_204_NO_CONTENT, summary="Delete scenario", description="Delete a scenario permanently.", responses={ 204: {"description": "Scenario deleted"}, 404: {"description": "Scenario not found"}, 429: {"description": "Rate limit exceeded"}, }, ) async def delete_scenario( request: Request, scenario_id: UUID, db: AsyncSession = Depends(get_db), x_api_key: Optional[str] = Header(None, alias="X-API-Key"), x_user_id: Optional[str] = Header(None, alias="X-User-ID"), ): """Delete a scenario permanently.""" # Rate limiting (stricter for deletes) await rate_limiter.check_rate_limit(request, x_api_key, tier="free", burst=5) scenario = await scenario_repository.get(db, scenario_id) if not scenario: raise NotFoundException("Scenario") await scenario_repository.delete(db, id=scenario_id) # Audit log audit_logger.log_scenario_event( event_type=AuditEventType.SCENARIO_DELETED, scenario_id=scenario_id, user_id=UUID(x_user_id) if x_user_id else None, ip_address=request.client.host if request.client else None, details={"name": scenario.name}, ) # Invalidate cache await cache_manager.delete(f"scenario:{scenario_id}") await cache_manager.invalidate_l1("list_scenarios") return None @router.post( "/bulk/delete", summary="Bulk delete scenarios", description="Delete multiple scenarios at once.", responses={ 200: {"description": "Bulk delete completed"}, 429: {"description": "Rate limit exceeded"}, }, ) async def bulk_delete_scenarios( request: Request, scenario_ids: List[UUID], db: AsyncSession = Depends(get_db), x_api_key: Optional[str] = Header(None, alias="X-API-Key"), x_user_id: Optional[str] = Header(None, alias="X-User-ID"), ): """Delete multiple scenarios at once. - **scenario_ids**: List of scenario IDs to delete """ # Rate limiting (strict for bulk operations) await rate_limiter.check_rate_limit(request, x_api_key, tier="premium", burst=1) deleted = [] failed = [] for scenario_id in scenario_ids: try: scenario = await scenario_repository.get(db, scenario_id) if scenario: await scenario_repository.delete(db, id=scenario_id) deleted.append(str(scenario_id)) # Invalidate cache await cache_manager.delete(f"scenario:{scenario_id}") else: failed.append({"id": str(scenario_id), "reason": "Not found"}) except Exception as e: failed.append({"id": str(scenario_id), "reason": str(e)}) # Invalidate list cache await cache_manager.invalidate_l1("list_scenarios") # Audit log audit_logger.log( event_type=AuditEventType.SCENARIO_DELETED, action="bulk_delete", user_id=UUID(x_user_id) if x_user_id else None, ip_address=request.client.host if request.client else None, details={"deleted_count": len(deleted), "failed_count": len(failed)}, ) return { "deleted": deleted, "failed": failed, "total_requested": len(scenario_ids), "total_deleted": len(deleted), }