feat: initial project setup with scenarios, database and web ui

Add complete mockupAWS platform for AWS cost estimation:
- FastAPI backend with scenario management
- PostgreSQL database schema for scenarios, metrics, logs
- AWS pricing table with real pricing data
- React frontend dashboard (planned)
- PII detection and token counting
- Report generation (PDF/CSV)
- Complete test suite with pytest
- Docker Compose setup
- Documentation: README, PRD, Architecture
- OpenCode configuration (.opencode/)
This commit is contained in:
Luca Sacchi Ricciardi
2026-04-07 12:52:18 +02:00
parent b539134280
commit 59e5cf48f0
23 changed files with 2982 additions and 29 deletions

121
src/main.py Normal file
View File

@@ -0,0 +1,121 @@
from fastapi import FastAPI
from pydantic import BaseModel
from src.profiler import count_tokens, calculate_sqs_blocks
import asyncio
from typing import Set
app = FastAPI(title="LogWhispererAI Mockup AWS")
# Stato in memoria per le metriche (valido per la simulazione locale)
class Metrics(BaseModel):
total_requests: int = 0
sqs_billing_blocks: int = 0
safety_violations_detected: int = 0
llm_estimated_input_tokens: int = 0
lambda_simulated_invocations: int = 0
state_metrics = Metrics()
# Coda asincrona per simulare il batching Lambda
message_queue: list[dict] = []
queue_lock = asyncio.Lock()
processed_messages: Set[str] = set() # Per deduplicazione
# Struttura attesa del log inviato da Logstash
class LogPayload(BaseModel):
message: str
source: str = "unknown"
async def process_batch():
"""
Worker asincrono che simula l'invocazione Lambda.
Processa i messaggi in batch con deduplicazione.
"""
global message_queue, processed_messages
async with queue_lock:
if not message_queue:
return
# Prendiamo tutti i messaggi dalla coda
batch = message_queue.copy()
message_queue = []
# Deduplicazione: processiamo solo messaggi unici
unique_messages = {}
for msg in batch:
msg_key = msg["message"]
if msg_key not in unique_messages:
unique_messages[msg_key] = msg
# Simuliamo l'invocazione Lambda
state_metrics.lambda_simulated_invocations += 1
# Calcoliamo i token solo per i messaggi unici (deduplicazione)
for msg in unique_messages.values():
tokens = count_tokens(msg["message"])
state_metrics.llm_estimated_input_tokens += tokens
async def schedule_batch_processing():
"""Schedula il processamento batch dopo un breve delay."""
await asyncio.sleep(0.1) # Piccolo delay per accumulare messaggi
await process_batch()
@app.post("/ingest")
async def ingest_log(payload: LogPayload):
"""
Riceve i log da Logstash, esegue le validazioni di sicurezza
e calcola le metriche per la stima dei costi cloud.
"""
global message_queue
# 1. Calcolo richieste in ingresso
state_metrics.total_requests += 1
# 2. Calcolo blocchi fatturabili SQS
payload_json_str = payload.model_dump_json()
state_metrics.sqs_billing_blocks += calculate_sqs_blocks(payload_json_str)
# 3. Safety First: Controllo base per dati non offuscati (es. email)
if "@" in payload.message and ".com" in payload.message:
state_metrics.safety_violations_detected += 1
# 4. Aggiungiamo il messaggio alla coda per processamento batch (Little Often)
async with queue_lock:
message_queue.append({"message": payload.message, "source": payload.source})
# Scheduliamo il processamento asincrono
asyncio.create_task(schedule_batch_processing())
return {"status": "accepted", "message": "Log accodato per processamento"}
@app.get("/metrics")
async def get_metrics():
"""Restituisce le metriche correnti di profilazione."""
return state_metrics.model_dump()
@app.post("/reset")
async def reset_metrics():
"""Resetta tutte le metriche (utile per i test)."""
global state_metrics, message_queue, processed_messages
state_metrics = Metrics()
message_queue = []
processed_messages = set()
return {"status": "reset"}
@app.post("/flush")
async def flush_queue():
"""Forza il processamento immediato della coda (utile per i test)."""
await process_batch()
return {"status": "flushed"}

26
src/profiler.py Normal file
View File

@@ -0,0 +1,26 @@
import sys
import tiktoken
# Inizializziamo l'encoder globalmente.
# cl100k_base è lo standard di fatto per le misurazioni token moderne.
_encoder = tiktoken.get_encoding("cl100k_base")
def count_tokens(text: str) -> int:
"""
Calcola il numero esatto di token in ingresso per una data stringa.
Fondamentale per il calcolo accurato dei costi LLM.
"""
if not text:
return 0
return len(_encoder.encode(text))
def calculate_sqs_blocks(payload_json: str) -> int:
"""
Calcola i blocchi fatturabili per Amazon SQS.
AWS addebita 1 richiesta per ogni payload (o frammento) fino a 64 KB (65536 bytes).
"""
# sys.getsizeof restituisce la dimensione in byte della stringa in memoria
payload_size_bytes = sys.getsizeof(payload_json)
# Calcolo dei blocchi (divisione intera + 1)
return (payload_size_bytes // 65536) + 1