Files
2026-04-26 14:46:49 +02:00

464 lines
15 KiB
Python

"""Supabase pinger con API FastAPI, storage storico a buffer circolare e dashboard."""
from __future__ import annotations
import logging
import os
import sqlite3
import sys
import threading
import time
from contextlib import asynccontextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import psycopg2
import uvicorn
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Query, Request
from fastapi.responses import HTMLResponse
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
stream=sys.stdout,
)
log = logging.getLogger("supabase-pinger")
REQUIRED_VARS = [
"SUPABASE_DB_HOST",
"SUPABASE_DB_PORT",
"SUPABASE_DB_NAME",
"SUPABASE_DB_USER",
"SUPABASE_DB_PASSWORD",
]
@dataclass
class Settings:
db_host: str
db_port: int
db_name: str
db_user: str
db_password: str
ping_query: str
ping_interval_minutes: int
web_host: str
web_port: int
rrd_db_path: str
rrd_retention_hours: int
@property
def max_samples(self) -> int:
samples = int((self.rrd_retention_hours * 60) / self.ping_interval_minutes)
return max(1, samples)
def load_config() -> Settings:
"""Carica e valida la configurazione da ambiente/.env."""
load_dotenv()
missing = [name for name in REQUIRED_VARS if not os.environ.get(name)]
if missing:
raise RuntimeError(
f"Variabili obbligatorie mancanti: {', '.join(missing)}"
)
try:
db_port = int(os.environ["SUPABASE_DB_PORT"])
ping_interval = float(os.environ.get("PING_INTERVAL_MINUTES", "4320"))
web_port = int(os.environ.get("WEB_PORT", "8080"))
retention_hours = int(os.environ.get("RRD_RETENTION_HOURS", "48"))
if ping_interval <= 0 or web_port <= 0 or retention_hours <= 0:
raise ValueError
except ValueError as exc:
raise RuntimeError(
"SUPABASE_DB_PORT, PING_INTERVAL_MINUTES, WEB_PORT e "
"RRD_RETENTION_HOURS devono essere numeri positivi."
) from exc
return Settings(
db_host=os.environ["SUPABASE_DB_HOST"],
db_port=db_port,
db_name=os.environ["SUPABASE_DB_NAME"],
db_user=os.environ["SUPABASE_DB_USER"],
db_password=os.environ["SUPABASE_DB_PASSWORD"],
ping_query=os.environ.get("PING_QUERY", "SELECT 1;"),
ping_interval_minutes=ping_interval,
web_host=os.environ.get("WEB_HOST", "0.0.0.0"),
web_port=web_port,
rrd_db_path=os.environ.get("RRD_DB_PATH", "data/connection_rrd.sqlite3"),
rrd_retention_hours=retention_hours,
)
class RRDStore:
"""Storage storico a buffer circolare, dimensionato a finestra 48h."""
def __init__(self, path: str, max_samples: int) -> None:
self.path = path
self.max_samples = max_samples
Path(path).parent.mkdir(parents=True, exist_ok=True)
self._init_db()
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.path)
conn.row_factory = sqlite3.Row
return conn
def _init_db(self) -> None:
with self._connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS samples (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts INTEGER NOT NULL,
success INTEGER NOT NULL,
latency_ms REAL,
error_message TEXT
);
"""
)
conn.commit()
def add_sample(
self, *, ts: int, success: bool, latency_ms: float | None, error_message: str | None
) -> None:
with self._connect() as conn:
conn.execute(
"""
INSERT INTO samples (ts, success, latency_ms, error_message)
VALUES (?, ?, ?, ?)
""",
(ts, 1 if success else 0, latency_ms, error_message),
)
conn.execute(
"""
DELETE FROM samples
WHERE id NOT IN (
SELECT id FROM samples ORDER BY id DESC LIMIT ?
)
""",
(self.max_samples,),
)
conn.commit()
def latest(self) -> dict[str, Any] | None:
with self._connect() as conn:
row = conn.execute(
"""
SELECT ts, success, latency_ms, error_message
FROM samples
ORDER BY id DESC
LIMIT 1
"""
).fetchone()
return self._row_to_dict(row) if row else None
def history(self, hours: int) -> list[dict[str, Any]]:
min_ts = int(time.time()) - (hours * 3600)
with self._connect() as conn:
rows = conn.execute(
"""
SELECT ts, success, latency_ms, error_message
FROM samples
WHERE ts >= ?
ORDER BY ts ASC
""",
(min_ts,),
).fetchall()
return [self._row_to_dict(row) for row in rows]
@staticmethod
def _row_to_dict(row: sqlite3.Row) -> dict[str, Any]:
return {
"timestamp": int(row["ts"]),
"success": bool(row["success"]),
"latency_ms": row["latency_ms"],
"error_message": row["error_message"],
}
def run_ping(settings: Settings) -> tuple[bool, float | None, str | None]:
"""Esegue il keep-alive e restituisce esito, latenza, eventuale errore."""
conn = None
started = time.perf_counter()
try:
conn = psycopg2.connect(
host=settings.db_host,
port=settings.db_port,
dbname=settings.db_name,
user=settings.db_user,
password=settings.db_password,
connect_timeout=10,
)
with conn.cursor() as cur:
cur.execute(settings.ping_query)
latency_ms = (time.perf_counter() - started) * 1000
return True, latency_ms, None
except psycopg2.Error as exc:
return False, None, str(exc)
finally:
if conn is not None:
try:
conn.close()
except Exception:
pass
def collector_loop(settings: Settings, store: RRDStore, stop_event: threading.Event) -> None:
interval_seconds = settings.ping_interval_minutes * 60
log.info(
"Collector keep-alive avviato (interval=%s min, retention=%s ore, max_samples=%s)",
settings.ping_interval_minutes,
settings.rrd_retention_hours,
settings.max_samples,
)
while not stop_event.is_set():
success, latency_ms, error_message = run_ping(settings)
sample_ts = int(time.time())
store.add_sample(
ts=sample_ts,
success=success,
latency_ms=latency_ms,
error_message=error_message,
)
if success:
log.info("Keep-alive OK (latency_ms=%.2f)", latency_ms or 0.0)
else:
log.warning("Keep-alive failed: %s", error_message)
wait_seconds = 0
while wait_seconds < interval_seconds and not stop_event.is_set():
time.sleep(min(5, interval_seconds - wait_seconds))
wait_seconds += 5
log.info("Collector keep-alive arrestato")
def _build_dashboard_html() -> str:
return """
<!doctype html>
<html lang=\"it\">
<head>
<meta charset=\"utf-8\" />
<meta name=\"viewport\" content=\"width=device-width, initial-scale=1\" />
<title>Supabase Pinger Dashboard</title>
<style>
:root {
--bg: #101418;
--panel: #17202a;
--line: #3ecf8e;
--danger: #ff6b6b;
--text: #edf2f7;
--muted: #9fb3c8;
}
body { margin: 0; font-family: 'Segoe UI', sans-serif; background: radial-gradient(circle at 20% 20%, #1f2937, var(--bg)); color: var(--text); }
.wrap { max-width: 1100px; margin: 0 auto; padding: 24px; }
.panel { background: color-mix(in srgb, var(--panel), #000 15%); border: 1px solid #283544; border-radius: 14px; padding: 18px; }
.top { display: flex; justify-content: space-between; gap: 16px; flex-wrap: wrap; }
.badge { padding: 6px 10px; border-radius: 999px; font-weight: 700; font-size: 13px; }
.ok { background: rgba(62, 207, 142, .15); color: #7ff5bd; }
.ko { background: rgba(255, 107, 107, .15); color: #ff9f9f; }
.meta { color: var(--muted); font-size: 14px; }
canvas { margin-top: 18px; }
</style>
</head>
<body>
<div class=\"wrap\">
<div class=\"panel\">
<div class=\"top\">
<h1>Supabase Pinger - SmokePing Style</h1>
<div>
<span id=\"statusBadge\" class=\"badge\">N/A</span>
</div>
</div>
<div class=\"meta\" id=\"meta\">Caricamento...</div>
<canvas id=\"latencyChart\"></canvas>
</div>
</div>
<script src=\"https://cdn.jsdelivr.net/npm/chart.js\"></script>
<script>
const badge = document.getElementById('statusBadge');
const meta = document.getElementById('meta');
const ctx = document.getElementById('latencyChart').getContext('2d');
// Buffer locale per campioni - mantiene solo i dati nel grafico
let samples = [];
const MAX_VISIBLE_SAMPLES = 100; // Limite visualizzazione per performance
const chart = new Chart(ctx, {
type: 'line',
data: { labels: [], datasets: [
{ label: 'Latenza ms', data: [], borderColor: '#3ecf8e', tension: 0.2 },
{ label: 'Errori', data: [], borderColor: '#ff6b6b', stepped: true, yAxisID: 'y1' }
] },
options: {
responsive: true,
scales: {
y: { title: { display: true, text: 'ms' } },
y1: { position: 'right', min: 0, max: 1, ticks: { stepSize: 1 } }
}
}
});
async function loadHistory() {
// Carica history iniziale una sola volta
const historyRes = await fetch('/api/history?hours=48');
const history = await historyRes.json();
samples = history.samples || [];
updateChart();
}
function updateChart() {
// Limita i campioni visibili per performance
const visibleSamples = samples.slice(-MAX_VISIBLE_SAMPLES);
chart.data.labels = visibleSamples.map(s => {
const d = new Date(s.timestamp * 1000);
return d.toLocaleTimeString([], { hour: '2-digit', minute: '2-digit' });
});
chart.data.datasets[0].data = visibleSamples.map(s => s.latency_ms);
chart.data.datasets[1].data = visibleSamples.map(s => s.success ? 0 : 1);
chart.update('none'); // update senza animazione per performance
}
async function pollStatus() {
try {
const statusRes = await fetch('/api/status');
const status = await statusRes.json();
// Aggiorna badge
if (status.success === true) {
badge.textContent = 'UP';
badge.className = 'badge ok';
} else if (status.success === false) {
badge.textContent = 'DOWN';
badge.className = 'badge ko';
} else {
badge.textContent = 'N/A';
badge.className = 'badge';
}
// Aggiorna meta con timestamp corrente
meta.textContent = 'Ultimo aggiornamento: ' + new Date().toLocaleTimeString() + ' | Campioni: ' + samples.length;
// Aggiunge nuovo sample se non gia presente
if (status.timestamp && !samples.find(s => s.timestamp === status.timestamp)) {
samples.push({
timestamp: status.timestamp,
success: status.success,
latency_ms: status.latency_ms,
error_message: status.error_message
});
// Rimuove campioni piu vecchi di 48h
const cutoff = Math.floor(Date.now() / 1000) - (48 * 3600);
samples = samples.filter(s => s.timestamp >= cutoff);
updateChart();
}
} catch (err) {
meta.textContent = 'Errore: ' + err.message;
}
}
// Inizializzazione
loadHistory().catch((err) => {
meta.textContent = 'Errore caricamento history: ' + err;
});
// Poll ogni 10 secondi per aggiornamenti real-time
setInterval(pollStatus, 10000);
</script>
</body>
</html>
"""
def create_app(
config_override: Settings | None = None,
start_collector: bool = True,
) -> FastAPI:
@asynccontextmanager
async def lifespan(app: FastAPI):
settings = config_override or load_config()
store = RRDStore(settings.rrd_db_path, settings.max_samples)
app.state.settings = settings
app.state.store = store
app.state.collector_stop_event = threading.Event()
app.state.collector_thread = None
if start_collector:
thread = threading.Thread(
target=collector_loop,
args=(settings, store, app.state.collector_stop_event),
daemon=True,
name="collector-thread",
)
thread.start()
app.state.collector_thread = thread
yield
app.state.collector_stop_event.set()
if app.state.collector_thread is not None:
app.state.collector_thread.join(timeout=5)
app = FastAPI(
title="Supabase Pinger API",
version="1.1.0",
description=(
"API read-only per monitoraggio storico connessione Supabase "
"(retention RRD 48h) e dashboard SmokePing-style."
),
lifespan=lifespan,
)
@app.get("/", response_class=HTMLResponse, tags=["dashboard"])
def dashboard() -> str:
return _build_dashboard_html()
@app.get("/api/status", tags=["monitoring"])
def api_status(request: Request) -> dict[str, Any]:
latest = request.app.state.store.latest()
if latest is None:
return {
"success": None,
"timestamp": None,
"latency_ms": None,
"error_message": "Nessun campione disponibile",
}
return latest
@app.get("/api/history", tags=["monitoring"])
def api_history(
request: Request,
hours: int = Query(48, ge=1, le=48),
) -> dict[str, Any]:
if hours > request.app.state.settings.rrd_retention_hours:
raise HTTPException(status_code=400, detail="Finestra richiesta oltre retention")
samples = request.app.state.store.history(hours)
return {
"hours": hours,
"count": len(samples),
"samples": samples,
}
return app
app = create_app()
if __name__ == "__main__":
cfg = load_config()
uvicorn.run("app:app", host=cfg.web_host, port=cfg.web_port, log_level="info")