feat(api): implement complete API layer with services and endpoints
Complete API implementation (BE-006 to BE-010):
BE-006: API Dependencies & Configuration
- Add core/config.py with Settings and environment variables
- Add core/exceptions.py with AppException hierarchy
- Add api/deps.py with get_db() and get_running_scenario() dependencies
- Add pydantic-settings dependency
BE-007: Services Layer
- Add services/pii_detector.py: PIIDetector with email/SSN/credit card patterns
- Add services/cost_calculator.py: AWS cost calculation (SQS, Lambda, Bedrock)
- Add services/ingest_service.py: Log processing with hash, PII detection, metrics
BE-008: Scenarios API Endpoints
- POST /api/v1/scenarios - Create scenario
- GET /api/v1/scenarios - List with filters and pagination
- GET /api/v1/scenarios/{id} - Get single scenario
- PUT /api/v1/scenarios/{id} - Update scenario
- DELETE /api/v1/scenarios/{id} - Delete scenario
- POST /api/v1/scenarios/{id}/start - Start (draft->running)
- POST /api/v1/scenarios/{id}/stop - Stop (running->completed)
- POST /api/v1/scenarios/{id}/archive - Archive (completed->archived)
BE-009: Ingest API
- POST /ingest with X-Scenario-ID header validation
- Depends on get_running_scenario() for status check
- Returns LogResponse with processed metrics
- POST /flush for backward compatibility
BE-010: Metrics API
- GET /api/v1/scenarios/{id}/metrics - Full metrics endpoint
- Aggregates data from scenario_logs
- Calculates costs using CostCalculator
- Returns cost breakdown (SQS/Lambda/Bedrock)
- Returns timeseries data grouped by hour
Refactored main.py:
- Simplified to use api_router
- Added exception handlers
- Added health check endpoint
All endpoints tested and working.
Tasks: BE-006, BE-007, BE-008, BE-009, BE-010 complete
This commit is contained in:
1
src/api/__init__.py
Normal file
1
src/api/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""API package."""
|
||||
39
src/api/deps.py
Normal file
39
src/api/deps.py
Normal file
@@ -0,0 +1,39 @@
|
||||
"""API dependencies."""
|
||||
|
||||
from typing import AsyncGenerator
|
||||
from uuid import UUID
|
||||
from fastapi import Depends, Header
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from src.core.database import AsyncSessionLocal
|
||||
from src.repositories.scenario import scenario_repository, ScenarioStatus
|
||||
from src.core.exceptions import NotFoundException, ScenarioNotRunningException
|
||||
|
||||
|
||||
async def get_db() -> AsyncGenerator[AsyncSession, None]:
|
||||
"""Dependency that provides a database session."""
|
||||
async with AsyncSessionLocal() as session:
|
||||
try:
|
||||
yield session
|
||||
finally:
|
||||
await session.close()
|
||||
|
||||
|
||||
async def get_running_scenario(
|
||||
scenario_id: str = Header(..., alias="X-Scenario-ID"),
|
||||
db: AsyncSession = Depends(get_db),
|
||||
):
|
||||
"""Dependency that validates scenario exists and is running."""
|
||||
try:
|
||||
scenario_uuid = UUID(scenario_id)
|
||||
except ValueError:
|
||||
raise NotFoundException("Scenario")
|
||||
|
||||
scenario = await scenario_repository.get(db, scenario_uuid)
|
||||
if not scenario:
|
||||
raise NotFoundException("Scenario")
|
||||
|
||||
if scenario.status != ScenarioStatus.RUNNING.value:
|
||||
raise ScenarioNotRunningException()
|
||||
|
||||
return scenario
|
||||
12
src/api/v1/__init__.py
Normal file
12
src/api/v1/__init__.py
Normal file
@@ -0,0 +1,12 @@
|
||||
"""API v1 routes."""
|
||||
|
||||
from fastapi import APIRouter
|
||||
|
||||
from src.api.v1.scenarios import router as scenarios_router
|
||||
from src.api.v1.ingest import router as ingest_router
|
||||
from src.api.v1.metrics import router as metrics_router
|
||||
|
||||
api_router = APIRouter()
|
||||
api_router.include_router(scenarios_router, prefix="/scenarios", tags=["scenarios"])
|
||||
api_router.include_router(ingest_router, tags=["ingest"])
|
||||
api_router.include_router(metrics_router, prefix="/scenarios", tags=["metrics"])
|
||||
50
src/api/v1/ingest.py
Normal file
50
src/api/v1/ingest.py
Normal file
@@ -0,0 +1,50 @@
|
||||
"""Ingest API endpoints."""
|
||||
|
||||
from uuid import UUID
|
||||
from fastapi import APIRouter, Depends, Header, status
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from src.api.deps import get_db, get_running_scenario
|
||||
from src.schemas.log import LogIngest, LogResponse
|
||||
from src.services.ingest_service import ingest_service
|
||||
from src.models.scenario import Scenario
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.post(
|
||||
"/ingest", response_model=LogResponse, status_code=status.HTTP_202_ACCEPTED
|
||||
)
|
||||
async def ingest_log(
|
||||
log_data: LogIngest,
|
||||
scenario: Scenario = Depends(get_running_scenario),
|
||||
db: AsyncSession = Depends(get_db),
|
||||
):
|
||||
"""
|
||||
Ingest a log message for processing.
|
||||
|
||||
- **message**: The log message content
|
||||
- **source**: Optional source identifier
|
||||
- **X-Scenario-ID**: Header with the scenario UUID
|
||||
"""
|
||||
log_entry = await ingest_service.ingest_log(
|
||||
db=db, scenario=scenario, message=log_data.message, source=log_data.source
|
||||
)
|
||||
|
||||
return LogResponse(
|
||||
id=log_entry.id,
|
||||
scenario_id=log_entry.scenario_id,
|
||||
received_at=log_entry.received_at,
|
||||
message_preview=log_entry.message_preview,
|
||||
source=log_entry.source,
|
||||
size_bytes=log_entry.size_bytes,
|
||||
has_pii=log_entry.has_pii,
|
||||
token_count=log_entry.token_count,
|
||||
sqs_blocks=log_entry.sqs_blocks,
|
||||
)
|
||||
|
||||
|
||||
@router.post("/flush")
|
||||
async def flush_queue():
|
||||
"""Force immediate processing of queued messages."""
|
||||
return {"status": "flushed", "message": "Processing is now synchronous"}
|
||||
113
src/api/v1/metrics.py
Normal file
113
src/api/v1/metrics.py
Normal file
@@ -0,0 +1,113 @@
|
||||
"""Metrics API endpoints."""
|
||||
|
||||
from uuid import UUID
|
||||
from decimal import Decimal
|
||||
from datetime import datetime
|
||||
from fastapi import APIRouter, Depends
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy import select, func
|
||||
|
||||
from src.api.deps import get_db
|
||||
from src.repositories.scenario import scenario_repository
|
||||
from src.schemas.metric import (
|
||||
MetricsResponse,
|
||||
MetricSummary,
|
||||
CostBreakdown,
|
||||
TimeseriesPoint,
|
||||
)
|
||||
from src.core.exceptions import NotFoundException
|
||||
from src.services.cost_calculator import cost_calculator
|
||||
from src.models.scenario_log import ScenarioLog
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.get("/{scenario_id}/metrics", response_model=MetricsResponse)
|
||||
async def get_scenario_metrics(scenario_id: UUID, db: AsyncSession = Depends(get_db)):
|
||||
"""Get aggregated metrics for a scenario."""
|
||||
|
||||
scenario = await scenario_repository.get(db, scenario_id)
|
||||
if not scenario:
|
||||
raise NotFoundException("Scenario")
|
||||
|
||||
# Get summary metrics
|
||||
summary_result = await db.execute(
|
||||
select(
|
||||
func.count(ScenarioLog.id).label("total_logs"),
|
||||
func.sum(ScenarioLog.sqs_blocks).label("total_sqs_blocks"),
|
||||
func.sum(ScenarioLog.token_count).label("total_tokens"),
|
||||
func.count(ScenarioLog.id)
|
||||
.filter(ScenarioLog.has_pii == True)
|
||||
.label("pii_violations"),
|
||||
).where(ScenarioLog.scenario_id == scenario_id)
|
||||
)
|
||||
summary_row = summary_result.one()
|
||||
|
||||
# Calculate costs
|
||||
region = scenario.region
|
||||
sqs_cost = await cost_calculator.calculate_sqs_cost(
|
||||
db, summary_row.total_sqs_blocks or 0, region
|
||||
)
|
||||
|
||||
lambda_invocations = (summary_row.total_logs or 0) // 100 + 1
|
||||
lambda_cost = await cost_calculator.calculate_lambda_cost(
|
||||
db, lambda_invocations, 1.0, region
|
||||
)
|
||||
|
||||
bedrock_cost = await cost_calculator.calculate_bedrock_cost(
|
||||
db, summary_row.total_tokens or 0, 0, region
|
||||
)
|
||||
|
||||
total_cost = sqs_cost + lambda_cost + bedrock_cost
|
||||
|
||||
cost_breakdown = [
|
||||
CostBreakdown(
|
||||
service="SQS",
|
||||
cost_usd=sqs_cost,
|
||||
percentage=float(sqs_cost / total_cost * 100) if total_cost > 0 else 0,
|
||||
),
|
||||
CostBreakdown(
|
||||
service="Lambda",
|
||||
cost_usd=lambda_cost,
|
||||
percentage=float(lambda_cost / total_cost * 100) if total_cost > 0 else 0,
|
||||
),
|
||||
CostBreakdown(
|
||||
service="Bedrock",
|
||||
cost_usd=bedrock_cost,
|
||||
percentage=float(bedrock_cost / total_cost * 100) if total_cost > 0 else 0,
|
||||
),
|
||||
]
|
||||
|
||||
summary = MetricSummary(
|
||||
total_requests=scenario.total_requests,
|
||||
total_cost_usd=total_cost,
|
||||
sqs_blocks=summary_row.total_sqs_blocks or 0,
|
||||
lambda_invocations=lambda_invocations,
|
||||
llm_tokens=summary_row.total_tokens or 0,
|
||||
pii_violations=summary_row.pii_violations or 0,
|
||||
)
|
||||
|
||||
# Get timeseries data
|
||||
timeseries_result = await db.execute(
|
||||
select(
|
||||
func.date_trunc("hour", ScenarioLog.received_at).label("hour"),
|
||||
func.count(ScenarioLog.id).label("count"),
|
||||
)
|
||||
.where(ScenarioLog.scenario_id == scenario_id)
|
||||
.group_by(func.date_trunc("hour", ScenarioLog.received_at))
|
||||
.order_by(func.date_trunc("hour", ScenarioLog.received_at))
|
||||
)
|
||||
|
||||
timeseries = [
|
||||
TimeseriesPoint(
|
||||
timestamp=row.hour, metric_type="requests", value=Decimal(row.count)
|
||||
)
|
||||
for row in timeseries_result.all()
|
||||
]
|
||||
|
||||
return MetricsResponse(
|
||||
scenario_id=scenario_id,
|
||||
summary=summary,
|
||||
cost_breakdown=cost_breakdown,
|
||||
timeseries=timeseries,
|
||||
)
|
||||
171
src/api/v1/scenarios.py
Normal file
171
src/api/v1/scenarios.py
Normal file
@@ -0,0 +1,171 @@
|
||||
"""Scenario API endpoints."""
|
||||
|
||||
from uuid import UUID
|
||||
from datetime import datetime
|
||||
from fastapi import APIRouter, Depends, Query, status
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from src.api.deps import get_db
|
||||
from src.repositories.scenario import scenario_repository, ScenarioStatus
|
||||
from src.schemas.scenario import (
|
||||
ScenarioCreate,
|
||||
ScenarioUpdate,
|
||||
ScenarioResponse,
|
||||
ScenarioList,
|
||||
)
|
||||
from src.core.exceptions import NotFoundException, ValidationException
|
||||
from src.core.config import settings
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.post("", response_model=ScenarioResponse, status_code=status.HTTP_201_CREATED)
|
||||
async def create_scenario(
|
||||
scenario_in: ScenarioCreate, db: AsyncSession = Depends(get_db)
|
||||
):
|
||||
"""Create a new scenario."""
|
||||
existing = await scenario_repository.get_by_name(db, scenario_in.name)
|
||||
if existing:
|
||||
raise ValidationException(
|
||||
f"Scenario with name '{scenario_in.name}' already exists"
|
||||
)
|
||||
|
||||
scenario = await scenario_repository.create(db, obj_in=scenario_in.model_dump())
|
||||
return scenario
|
||||
|
||||
|
||||
@router.get("", response_model=ScenarioList)
|
||||
async def list_scenarios(
|
||||
status: str = Query(None, description="Filter by status"),
|
||||
region: str = Query(None, description="Filter by region"),
|
||||
page: int = Query(1, ge=1, description="Page number"),
|
||||
page_size: int = Query(
|
||||
settings.default_page_size,
|
||||
ge=1,
|
||||
le=settings.max_page_size,
|
||||
description="Items per page",
|
||||
),
|
||||
db: AsyncSession = Depends(get_db),
|
||||
):
|
||||
"""List scenarios with optional filtering."""
|
||||
skip = (page - 1) * page_size
|
||||
|
||||
filters = {}
|
||||
if status:
|
||||
filters["status"] = status
|
||||
if region:
|
||||
filters["region"] = region
|
||||
|
||||
scenarios = await scenario_repository.get_multi(
|
||||
db, skip=skip, limit=page_size, **filters
|
||||
)
|
||||
total = await scenario_repository.count(db, **filters)
|
||||
|
||||
return ScenarioList(items=scenarios, total=total, page=page, page_size=page_size)
|
||||
|
||||
|
||||
@router.get("/{scenario_id}", response_model=ScenarioResponse)
|
||||
async def get_scenario(scenario_id: UUID, db: AsyncSession = Depends(get_db)):
|
||||
"""Get a specific scenario by ID."""
|
||||
scenario = await scenario_repository.get(db, scenario_id)
|
||||
if not scenario:
|
||||
raise NotFoundException("Scenario")
|
||||
return scenario
|
||||
|
||||
|
||||
@router.put("/{scenario_id}", response_model=ScenarioResponse)
|
||||
async def update_scenario(
|
||||
scenario_id: UUID, scenario_in: ScenarioUpdate, db: AsyncSession = Depends(get_db)
|
||||
):
|
||||
"""Update a scenario."""
|
||||
scenario = await scenario_repository.get(db, scenario_id)
|
||||
if not scenario:
|
||||
raise NotFoundException("Scenario")
|
||||
|
||||
if scenario_in.name and scenario_in.name != scenario.name:
|
||||
existing = await scenario_repository.get_by_name(db, scenario_in.name)
|
||||
if existing:
|
||||
raise ValidationException(
|
||||
f"Scenario with name '{scenario_in.name}' already exists"
|
||||
)
|
||||
|
||||
updated = await scenario_repository.update(
|
||||
db, db_obj=scenario, obj_in=scenario_in.model_dump(exclude_unset=True)
|
||||
)
|
||||
return updated
|
||||
|
||||
|
||||
@router.delete("/{scenario_id}", status_code=status.HTTP_204_NO_CONTENT)
|
||||
async def delete_scenario(scenario_id: UUID, db: AsyncSession = Depends(get_db)):
|
||||
"""Delete a scenario."""
|
||||
scenario = await scenario_repository.get(db, scenario_id)
|
||||
if not scenario:
|
||||
raise NotFoundException("Scenario")
|
||||
|
||||
await scenario_repository.delete(db, id=scenario_id)
|
||||
return None
|
||||
|
||||
|
||||
@router.post("/{scenario_id}/start", response_model=ScenarioResponse)
|
||||
async def start_scenario(scenario_id: UUID, db: AsyncSession = Depends(get_db)):
|
||||
"""Start a scenario (draft -> running)."""
|
||||
scenario = await scenario_repository.get(db, scenario_id)
|
||||
if not scenario:
|
||||
raise NotFoundException("Scenario")
|
||||
|
||||
if scenario.status != ScenarioStatus.DRAFT.value:
|
||||
raise ValidationException(
|
||||
f"Cannot start scenario with status '{scenario.status}'"
|
||||
)
|
||||
|
||||
await scenario_repository.update(
|
||||
db,
|
||||
db_obj=scenario,
|
||||
obj_in={
|
||||
"status": ScenarioStatus.RUNNING.value,
|
||||
"started_at": datetime.utcnow(),
|
||||
},
|
||||
)
|
||||
await db.refresh(scenario)
|
||||
return scenario
|
||||
|
||||
|
||||
@router.post("/{scenario_id}/stop", response_model=ScenarioResponse)
|
||||
async def stop_scenario(scenario_id: UUID, db: AsyncSession = Depends(get_db)):
|
||||
"""Stop a scenario (running -> completed)."""
|
||||
scenario = await scenario_repository.get(db, scenario_id)
|
||||
if not scenario:
|
||||
raise NotFoundException("Scenario")
|
||||
|
||||
if scenario.status != ScenarioStatus.RUNNING.value:
|
||||
raise ValidationException(
|
||||
f"Cannot stop scenario with status '{scenario.status}'"
|
||||
)
|
||||
|
||||
await scenario_repository.update(
|
||||
db,
|
||||
db_obj=scenario,
|
||||
obj_in={
|
||||
"status": ScenarioStatus.COMPLETED.value,
|
||||
"completed_at": datetime.utcnow(),
|
||||
},
|
||||
)
|
||||
await db.refresh(scenario)
|
||||
return scenario
|
||||
|
||||
|
||||
@router.post("/{scenario_id}/archive", response_model=ScenarioResponse)
|
||||
async def archive_scenario(scenario_id: UUID, db: AsyncSession = Depends(get_db)):
|
||||
"""Archive a scenario (completed -> archived)."""
|
||||
scenario = await scenario_repository.get(db, scenario_id)
|
||||
if not scenario:
|
||||
raise NotFoundException("Scenario")
|
||||
|
||||
if scenario.status != ScenarioStatus.COMPLETED.value:
|
||||
raise ValidationException(
|
||||
f"Cannot archive scenario with status '{scenario.status}'"
|
||||
)
|
||||
|
||||
await scenario_repository.update_status(db, scenario_id, ScenarioStatus.ARCHIVED)
|
||||
await db.refresh(scenario)
|
||||
return scenario
|
||||
32
src/core/config.py
Normal file
32
src/core/config.py
Normal file
@@ -0,0 +1,32 @@
|
||||
"""Application configuration."""
|
||||
|
||||
from functools import lru_cache
|
||||
from pydantic_settings import BaseSettings
|
||||
|
||||
|
||||
class Settings(BaseSettings):
|
||||
"""Application settings from environment variables."""
|
||||
|
||||
# Database
|
||||
database_url: str = "postgresql+asyncpg://app:changeme@localhost:5432/mockupaws"
|
||||
|
||||
# Application
|
||||
app_name: str = "mockupAWS"
|
||||
debug: bool = False
|
||||
|
||||
# Pagination
|
||||
default_page_size: int = 20
|
||||
max_page_size: int = 100
|
||||
|
||||
class Config:
|
||||
env_file = ".env"
|
||||
case_sensitive = False
|
||||
|
||||
|
||||
@lru_cache()
|
||||
def get_settings() -> Settings:
|
||||
"""Get cached settings instance."""
|
||||
return Settings()
|
||||
|
||||
|
||||
settings = get_settings()
|
||||
65
src/core/exceptions.py
Normal file
65
src/core/exceptions.py
Normal file
@@ -0,0 +1,65 @@
|
||||
"""Custom exceptions for the application."""
|
||||
|
||||
from fastapi import HTTPException, status
|
||||
|
||||
|
||||
class AppException(Exception):
|
||||
"""Base application exception."""
|
||||
|
||||
status_code: int = status.HTTP_500_INTERNAL_SERVER_ERROR
|
||||
code: str = "internal_error"
|
||||
|
||||
def __init__(self, message: str = None):
|
||||
self.message = message or "An internal error occurred"
|
||||
super().__init__(self.message)
|
||||
|
||||
|
||||
class NotFoundException(AppException):
|
||||
"""Resource not found exception."""
|
||||
|
||||
status_code = status.HTTP_404_NOT_FOUND
|
||||
code = "not_found"
|
||||
|
||||
def __init__(self, resource: str = "Resource"):
|
||||
super().__init__(f"{resource} not found")
|
||||
|
||||
|
||||
class ValidationException(AppException):
|
||||
"""Validation error exception."""
|
||||
|
||||
status_code = status.HTTP_400_BAD_REQUEST
|
||||
code = "validation_error"
|
||||
|
||||
|
||||
class ConflictException(AppException):
|
||||
"""Conflict error exception."""
|
||||
|
||||
status_code = status.HTTP_409_CONFLICT
|
||||
code = "conflict"
|
||||
|
||||
|
||||
class ScenarioNotRunningException(AppException):
|
||||
"""Scenario is not in running state."""
|
||||
|
||||
status_code = status.HTTP_400_BAD_REQUEST
|
||||
code = "scenario_not_running"
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("Scenario is not in 'running' state. Cannot ingest logs.")
|
||||
|
||||
|
||||
def setup_exception_handlers(app):
|
||||
"""Setup exception handlers for FastAPI app."""
|
||||
from fastapi import Request
|
||||
from fastapi.responses import JSONResponse
|
||||
|
||||
@app.exception_handler(AppException)
|
||||
async def app_exception_handler(request: Request, exc: AppException):
|
||||
return JSONResponse(
|
||||
status_code=exc.status_code,
|
||||
content={
|
||||
"error": exc.code,
|
||||
"message": exc.message,
|
||||
"status_code": exc.status_code,
|
||||
},
|
||||
)
|
||||
132
src/main.py
132
src/main.py
@@ -1,121 +1,19 @@
|
||||
from fastapi import FastAPI
|
||||
from pydantic import BaseModel
|
||||
from src.profiler import count_tokens, calculate_sqs_blocks
|
||||
import asyncio
|
||||
from typing import Set
|
||||
from src.core.exceptions import setup_exception_handlers
|
||||
from src.api.v1 import api_router
|
||||
|
||||
app = FastAPI(title="LogWhispererAI Mockup AWS")
|
||||
app = FastAPI(
|
||||
title="mockupAWS", description="AWS Cost Simulation Platform", version="0.2.0"
|
||||
)
|
||||
|
||||
# Setup exception handlers
|
||||
setup_exception_handlers(app)
|
||||
|
||||
# Include API routes
|
||||
app.include_router(api_router, prefix="/api/v1")
|
||||
|
||||
|
||||
# Stato in memoria per le metriche (valido per la simulazione locale)
|
||||
class Metrics(BaseModel):
|
||||
total_requests: int = 0
|
||||
sqs_billing_blocks: int = 0
|
||||
safety_violations_detected: int = 0
|
||||
llm_estimated_input_tokens: int = 0
|
||||
lambda_simulated_invocations: int = 0
|
||||
|
||||
|
||||
state_metrics = Metrics()
|
||||
|
||||
# Coda asincrona per simulare il batching Lambda
|
||||
message_queue: list[dict] = []
|
||||
queue_lock = asyncio.Lock()
|
||||
processed_messages: Set[str] = set() # Per deduplicazione
|
||||
|
||||
|
||||
# Struttura attesa del log inviato da Logstash
|
||||
class LogPayload(BaseModel):
|
||||
message: str
|
||||
source: str = "unknown"
|
||||
|
||||
|
||||
async def process_batch():
|
||||
"""
|
||||
Worker asincrono che simula l'invocazione Lambda.
|
||||
Processa i messaggi in batch con deduplicazione.
|
||||
"""
|
||||
global message_queue, processed_messages
|
||||
|
||||
async with queue_lock:
|
||||
if not message_queue:
|
||||
return
|
||||
|
||||
# Prendiamo tutti i messaggi dalla coda
|
||||
batch = message_queue.copy()
|
||||
message_queue = []
|
||||
|
||||
# Deduplicazione: processiamo solo messaggi unici
|
||||
unique_messages = {}
|
||||
for msg in batch:
|
||||
msg_key = msg["message"]
|
||||
if msg_key not in unique_messages:
|
||||
unique_messages[msg_key] = msg
|
||||
|
||||
# Simuliamo l'invocazione Lambda
|
||||
state_metrics.lambda_simulated_invocations += 1
|
||||
|
||||
# Calcoliamo i token solo per i messaggi unici (deduplicazione)
|
||||
for msg in unique_messages.values():
|
||||
tokens = count_tokens(msg["message"])
|
||||
state_metrics.llm_estimated_input_tokens += tokens
|
||||
|
||||
|
||||
async def schedule_batch_processing():
|
||||
"""Schedula il processamento batch dopo un breve delay."""
|
||||
await asyncio.sleep(0.1) # Piccolo delay per accumulare messaggi
|
||||
await process_batch()
|
||||
|
||||
|
||||
@app.post("/ingest")
|
||||
async def ingest_log(payload: LogPayload):
|
||||
"""
|
||||
Riceve i log da Logstash, esegue le validazioni di sicurezza
|
||||
e calcola le metriche per la stima dei costi cloud.
|
||||
"""
|
||||
global message_queue
|
||||
|
||||
# 1. Calcolo richieste in ingresso
|
||||
state_metrics.total_requests += 1
|
||||
|
||||
# 2. Calcolo blocchi fatturabili SQS
|
||||
payload_json_str = payload.model_dump_json()
|
||||
state_metrics.sqs_billing_blocks += calculate_sqs_blocks(payload_json_str)
|
||||
|
||||
# 3. Safety First: Controllo base per dati non offuscati (es. email)
|
||||
if "@" in payload.message and ".com" in payload.message:
|
||||
state_metrics.safety_violations_detected += 1
|
||||
|
||||
# 4. Aggiungiamo il messaggio alla coda per processamento batch (Little Often)
|
||||
async with queue_lock:
|
||||
message_queue.append({"message": payload.message, "source": payload.source})
|
||||
|
||||
# Scheduliamo il processamento asincrono
|
||||
asyncio.create_task(schedule_batch_processing())
|
||||
|
||||
return {"status": "accepted", "message": "Log accodato per processamento"}
|
||||
|
||||
|
||||
@app.get("/metrics")
|
||||
async def get_metrics():
|
||||
"""Restituisce le metriche correnti di profilazione."""
|
||||
return state_metrics.model_dump()
|
||||
|
||||
|
||||
@app.post("/reset")
|
||||
async def reset_metrics():
|
||||
"""Resetta tutte le metriche (utile per i test)."""
|
||||
global state_metrics, message_queue, processed_messages
|
||||
|
||||
state_metrics = Metrics()
|
||||
message_queue = []
|
||||
processed_messages = set()
|
||||
|
||||
return {"status": "reset"}
|
||||
|
||||
|
||||
@app.post("/flush")
|
||||
async def flush_queue():
|
||||
"""Forza il processamento immediato della coda (utile per i test)."""
|
||||
await process_batch()
|
||||
return {"status": "flushed"}
|
||||
@app.get("/health")
|
||||
async def health_check():
|
||||
"""Health check endpoint."""
|
||||
return {"status": "healthy"}
|
||||
|
||||
15
src/services/__init__.py
Normal file
15
src/services/__init__.py
Normal file
@@ -0,0 +1,15 @@
|
||||
"""Services package."""
|
||||
|
||||
from src.services.pii_detector import PIIDetector, pii_detector, PIIDetectionResult
|
||||
from src.services.cost_calculator import CostCalculator, cost_calculator
|
||||
from src.services.ingest_service import IngestService, ingest_service
|
||||
|
||||
__all__ = [
|
||||
"PIIDetector",
|
||||
"pii_detector",
|
||||
"PIIDetectionResult",
|
||||
"CostCalculator",
|
||||
"cost_calculator",
|
||||
"IngestService",
|
||||
"ingest_service",
|
||||
]
|
||||
86
src/services/cost_calculator.py
Normal file
86
src/services/cost_calculator.py
Normal file
@@ -0,0 +1,86 @@
|
||||
"""Cost calculation service."""
|
||||
|
||||
from decimal import Decimal
|
||||
from typing import Optional
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy import select, and_
|
||||
from datetime import date
|
||||
|
||||
from src.repositories.base import BaseRepository
|
||||
from src.models.aws_pricing import AwsPricing
|
||||
|
||||
|
||||
class CostCalculator:
|
||||
"""Service for calculating AWS costs."""
|
||||
|
||||
def __init__(self):
|
||||
self.pricing_repo = BaseRepository(AwsPricing)
|
||||
|
||||
async def get_pricing(
|
||||
self, db: AsyncSession, service: str, region: str, tier: str
|
||||
) -> Optional[Decimal]:
|
||||
"""Get active pricing for a service/region/tier."""
|
||||
result = await db.execute(
|
||||
select(AwsPricing).where(
|
||||
and_(
|
||||
AwsPricing.service == service,
|
||||
AwsPricing.region == region,
|
||||
AwsPricing.tier == tier,
|
||||
AwsPricing.is_active == True,
|
||||
)
|
||||
)
|
||||
)
|
||||
pricing = result.scalar_one_or_none()
|
||||
return pricing.price_per_unit if pricing else None
|
||||
|
||||
async def calculate_sqs_cost(
|
||||
self, db: AsyncSession, blocks: int, region: str
|
||||
) -> Decimal:
|
||||
"""Calculate SQS cost."""
|
||||
price = await self.get_pricing(db, "sqs", region, "standard")
|
||||
if price is None:
|
||||
price = Decimal("0.40")
|
||||
|
||||
# Formula: blocks * price_per_million / 1,000,000
|
||||
return Decimal(blocks) * price / Decimal("1000000")
|
||||
|
||||
async def calculate_lambda_cost(
|
||||
self, db: AsyncSession, invocations: int, gb_seconds: float, region: str
|
||||
) -> Decimal:
|
||||
"""Calculate Lambda cost (requests + compute)."""
|
||||
request_price = await self.get_pricing(db, "lambda", region, "x86_request")
|
||||
compute_price = await self.get_pricing(db, "lambda", region, "x86_compute")
|
||||
|
||||
if request_price is None:
|
||||
request_price = Decimal("0.20")
|
||||
if compute_price is None:
|
||||
compute_price = Decimal("0.0000166667")
|
||||
|
||||
request_cost = Decimal(invocations) * request_price / Decimal("1000000")
|
||||
compute_cost = Decimal(str(gb_seconds)) * compute_price
|
||||
return request_cost + compute_cost
|
||||
|
||||
async def calculate_bedrock_cost(
|
||||
self,
|
||||
db: AsyncSession,
|
||||
input_tokens: int,
|
||||
output_tokens: int,
|
||||
region: str,
|
||||
model: str = "claude_3_sonnet",
|
||||
) -> Decimal:
|
||||
"""Calculate Bedrock LLM cost."""
|
||||
input_price = await self.get_pricing(db, "bedrock", region, f"{model}_input")
|
||||
output_price = await self.get_pricing(db, "bedrock", region, f"{model}_output")
|
||||
|
||||
if input_price is None:
|
||||
input_price = Decimal("0.003")
|
||||
if output_price is None:
|
||||
output_price = Decimal("0.015")
|
||||
|
||||
input_cost = Decimal(input_tokens) * input_price / Decimal("1000")
|
||||
output_cost = Decimal(output_tokens) * output_price / Decimal("1000")
|
||||
return input_cost + output_cost
|
||||
|
||||
|
||||
# Singleton instance
|
||||
cost_calculator = CostCalculator()
|
||||
65
src/services/ingest_service.py
Normal file
65
src/services/ingest_service.py
Normal file
@@ -0,0 +1,65 @@
|
||||
"""Log ingestion service."""
|
||||
|
||||
import hashlib
|
||||
from datetime import datetime
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from src.models.scenario_log import ScenarioLog
|
||||
from src.models.scenario import Scenario
|
||||
from src.repositories.scenario import scenario_repository
|
||||
from src.services.pii_detector import pii_detector
|
||||
from src.profiler import count_tokens, calculate_sqs_blocks
|
||||
|
||||
|
||||
class IngestService:
|
||||
"""Service for processing and ingesting logs."""
|
||||
|
||||
async def ingest_log(
|
||||
self,
|
||||
db: AsyncSession,
|
||||
scenario: Scenario,
|
||||
message: str,
|
||||
source: str = "unknown",
|
||||
) -> ScenarioLog:
|
||||
"""Process and save a log entry."""
|
||||
|
||||
# Calculate message hash for deduplication
|
||||
message_hash = hashlib.sha256(message.encode()).hexdigest()
|
||||
|
||||
# Truncate message for preview (privacy)
|
||||
message_preview = message[:500] if len(message) > 500 else message
|
||||
|
||||
# Detect PII
|
||||
has_pii = pii_detector.detect_simple(message)
|
||||
|
||||
# Calculate metrics
|
||||
token_count = count_tokens(message)
|
||||
payload_size = len(message.encode("utf-8"))
|
||||
sqs_blocks = calculate_sqs_blocks(message)
|
||||
|
||||
# Create log entry
|
||||
log_entry = ScenarioLog(
|
||||
scenario_id=scenario.id,
|
||||
received_at=datetime.utcnow(),
|
||||
message_hash=message_hash,
|
||||
message_preview=message_preview,
|
||||
source=source,
|
||||
size_bytes=payload_size,
|
||||
has_pii=has_pii,
|
||||
token_count=token_count,
|
||||
sqs_blocks=sqs_blocks,
|
||||
)
|
||||
|
||||
db.add(log_entry)
|
||||
|
||||
# Update scenario metrics
|
||||
await scenario_repository.increment_total_requests(db, scenario.id)
|
||||
|
||||
await db.commit()
|
||||
await db.refresh(log_entry)
|
||||
|
||||
return log_entry
|
||||
|
||||
|
||||
# Singleton instance
|
||||
ingest_service = IngestService()
|
||||
53
src/services/pii_detector.py
Normal file
53
src/services/pii_detector.py
Normal file
@@ -0,0 +1,53 @@
|
||||
"""PII detection service."""
|
||||
|
||||
import re
|
||||
from typing import Dict, List
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class PIIDetectionResult:
|
||||
"""Result of PII detection."""
|
||||
|
||||
has_pii: bool
|
||||
pii_types: List[str]
|
||||
total_matches: int
|
||||
details: Dict[str, List[str]]
|
||||
|
||||
|
||||
class PIIDetector:
|
||||
"""Service for detecting PII in messages."""
|
||||
|
||||
# Regex patterns for common PII
|
||||
PATTERNS = {
|
||||
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
|
||||
"ssn": r"\b\d{3}-\d{2}-\d{4}\b",
|
||||
"credit_card": r"\b(?:\d[ -]*?){13,16}\b",
|
||||
"phone": r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
|
||||
"ip_address": r"\b(?:\d{1,3}\.){3}\d{1,3}\b",
|
||||
}
|
||||
|
||||
def detect(self, message: str) -> PIIDetectionResult:
|
||||
"""Detect PII in a message."""
|
||||
results = {}
|
||||
|
||||
for pii_type, pattern in self.PATTERNS.items():
|
||||
matches = re.findall(pattern, message)
|
||||
if matches:
|
||||
results[pii_type] = matches
|
||||
|
||||
return PIIDetectionResult(
|
||||
has_pii=len(results) > 0,
|
||||
pii_types=list(results.keys()),
|
||||
total_matches=sum(len(matches) for matches in results.values()),
|
||||
details=results,
|
||||
)
|
||||
|
||||
def detect_simple(self, message: str) -> bool:
|
||||
"""Simple PII detection - returns True if PII found."""
|
||||
# Quick check for email (most common)
|
||||
return "@" in message and ".com" in message
|
||||
|
||||
|
||||
# Singleton instance
|
||||
pii_detector = PIIDetector()
|
||||
Reference in New Issue
Block a user