"""Supabase pinger con API FastAPI, storage storico a buffer circolare e dashboard.""" from __future__ import annotations import logging import os import sqlite3 import sys import threading import time from contextlib import asynccontextmanager from dataclasses import dataclass from pathlib import Path from typing import Any import psycopg2 import uvicorn from dotenv import load_dotenv from fastapi import FastAPI, HTTPException, Query, Request from fastapi.responses import HTMLResponse logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", stream=sys.stdout, ) log = logging.getLogger("supabase-pinger") REQUIRED_VARS = [ "SUPABASE_DB_HOST", "SUPABASE_DB_PORT", "SUPABASE_DB_NAME", "SUPABASE_DB_USER", "SUPABASE_DB_PASSWORD", ] @dataclass class Settings: db_host: str db_port: int db_name: str db_user: str db_password: str ping_query: str ping_interval_minutes: int web_host: str web_port: int rrd_db_path: str rrd_retention_hours: int @property def max_samples(self) -> int: samples = int((self.rrd_retention_hours * 60) / self.ping_interval_minutes) return max(1, samples) def load_config() -> Settings: """Carica e valida la configurazione da ambiente/.env.""" load_dotenv() missing = [name for name in REQUIRED_VARS if not os.environ.get(name)] if missing: raise RuntimeError( f"Variabili obbligatorie mancanti: {', '.join(missing)}" ) try: db_port = int(os.environ["SUPABASE_DB_PORT"]) ping_interval = float(os.environ.get("PING_INTERVAL_MINUTES", "4320")) web_port = int(os.environ.get("WEB_PORT", "8080")) retention_hours = int(os.environ.get("RRD_RETENTION_HOURS", "48")) if ping_interval <= 0 or web_port <= 0 or retention_hours <= 0: raise ValueError except ValueError as exc: raise RuntimeError( "SUPABASE_DB_PORT, PING_INTERVAL_MINUTES, WEB_PORT e " "RRD_RETENTION_HOURS devono essere numeri positivi." ) from exc return Settings( db_host=os.environ["SUPABASE_DB_HOST"], db_port=db_port, db_name=os.environ["SUPABASE_DB_NAME"], db_user=os.environ["SUPABASE_DB_USER"], db_password=os.environ["SUPABASE_DB_PASSWORD"], ping_query=os.environ.get("PING_QUERY", "SELECT 1;"), ping_interval_minutes=ping_interval, web_host=os.environ.get("WEB_HOST", "0.0.0.0"), web_port=web_port, rrd_db_path=os.environ.get("RRD_DB_PATH", "data/connection_rrd.sqlite3"), rrd_retention_hours=retention_hours, ) class RRDStore: """Storage storico a buffer circolare, dimensionato a finestra 48h.""" def __init__(self, path: str, max_samples: int) -> None: self.path = path self.max_samples = max_samples Path(path).parent.mkdir(parents=True, exist_ok=True) self._init_db() def _connect(self) -> sqlite3.Connection: conn = sqlite3.connect(self.path) conn.row_factory = sqlite3.Row return conn def _init_db(self) -> None: with self._connect() as conn: conn.execute( """ CREATE TABLE IF NOT EXISTS samples ( id INTEGER PRIMARY KEY AUTOINCREMENT, ts INTEGER NOT NULL, success INTEGER NOT NULL, latency_ms REAL, error_message TEXT ); """ ) conn.commit() def add_sample( self, *, ts: int, success: bool, latency_ms: float | None, error_message: str | None ) -> None: with self._connect() as conn: conn.execute( """ INSERT INTO samples (ts, success, latency_ms, error_message) VALUES (?, ?, ?, ?) """, (ts, 1 if success else 0, latency_ms, error_message), ) conn.execute( """ DELETE FROM samples WHERE id NOT IN ( SELECT id FROM samples ORDER BY id DESC LIMIT ? ) """, (self.max_samples,), ) conn.commit() def latest(self) -> dict[str, Any] | None: with self._connect() as conn: row = conn.execute( """ SELECT ts, success, latency_ms, error_message FROM samples ORDER BY id DESC LIMIT 1 """ ).fetchone() return self._row_to_dict(row) if row else None def history(self, hours: int) -> list[dict[str, Any]]: min_ts = int(time.time()) - (hours * 3600) with self._connect() as conn: rows = conn.execute( """ SELECT ts, success, latency_ms, error_message FROM samples WHERE ts >= ? ORDER BY ts ASC """, (min_ts,), ).fetchall() return [self._row_to_dict(row) for row in rows] @staticmethod def _row_to_dict(row: sqlite3.Row) -> dict[str, Any]: return { "timestamp": int(row["ts"]), "success": bool(row["success"]), "latency_ms": row["latency_ms"], "error_message": row["error_message"], } def run_ping(settings: Settings) -> tuple[bool, float | None, str | None]: """Esegue il keep-alive e restituisce esito, latenza, eventuale errore.""" conn = None started = time.perf_counter() try: conn = psycopg2.connect( host=settings.db_host, port=settings.db_port, dbname=settings.db_name, user=settings.db_user, password=settings.db_password, connect_timeout=10, ) with conn.cursor() as cur: cur.execute(settings.ping_query) latency_ms = (time.perf_counter() - started) * 1000 return True, latency_ms, None except psycopg2.Error as exc: return False, None, str(exc) finally: if conn is not None: try: conn.close() except Exception: pass def collector_loop(settings: Settings, store: RRDStore, stop_event: threading.Event) -> None: interval_seconds = settings.ping_interval_minutes * 60 log.info( "Collector keep-alive avviato (interval=%s min, retention=%s ore, max_samples=%s)", settings.ping_interval_minutes, settings.rrd_retention_hours, settings.max_samples, ) while not stop_event.is_set(): success, latency_ms, error_message = run_ping(settings) sample_ts = int(time.time()) store.add_sample( ts=sample_ts, success=success, latency_ms=latency_ms, error_message=error_message, ) if success: log.info("Keep-alive OK (latency_ms=%.2f)", latency_ms or 0.0) else: log.warning("Keep-alive failed: %s", error_message) wait_seconds = 0 while wait_seconds < interval_seconds and not stop_event.is_set(): time.sleep(min(5, interval_seconds - wait_seconds)) wait_seconds += 5 log.info("Collector keep-alive arrestato") def _build_dashboard_html() -> str: return """ Supabase Pinger Dashboard

Supabase Pinger - SmokePing Style

N/A
Caricamento...
""" def create_app( config_override: Settings | None = None, start_collector: bool = True, ) -> FastAPI: @asynccontextmanager async def lifespan(app: FastAPI): settings = config_override or load_config() store = RRDStore(settings.rrd_db_path, settings.max_samples) app.state.settings = settings app.state.store = store app.state.collector_stop_event = threading.Event() app.state.collector_thread = None if start_collector: thread = threading.Thread( target=collector_loop, args=(settings, store, app.state.collector_stop_event), daemon=True, name="collector-thread", ) thread.start() app.state.collector_thread = thread yield app.state.collector_stop_event.set() if app.state.collector_thread is not None: app.state.collector_thread.join(timeout=5) app = FastAPI( title="Supabase Pinger API", version="1.1.0", description=( "API read-only per monitoraggio storico connessione Supabase " "(retention RRD 48h) e dashboard SmokePing-style." ), lifespan=lifespan, ) @app.get("/", response_class=HTMLResponse, tags=["dashboard"]) def dashboard() -> str: return _build_dashboard_html() @app.get("/api/status", tags=["monitoring"]) def api_status(request: Request) -> dict[str, Any]: latest = request.app.state.store.latest() if latest is None: return { "success": None, "timestamp": None, "latency_ms": None, "error_message": "Nessun campione disponibile", } return latest @app.get("/api/history", tags=["monitoring"]) def api_history( request: Request, hours: int = Query(48, ge=1, le=48), ) -> dict[str, Any]: if hours > request.app.state.settings.rrd_retention_hours: raise HTTPException(status_code=400, detail="Finestra richiesta oltre retention") samples = request.app.state.store.history(hours) return { "hours": hours, "count": len(samples), "samples": samples, } return app app = create_app() if __name__ == "__main__": cfg = load_config() uvicorn.run("app:app", host=cfg.web_host, port=cfg.web_port, log_level="info")