diff --git a/.env.example b/.env.example index 401a9d3..c5a7680 100644 --- a/.env.example +++ b/.env.example @@ -15,3 +15,11 @@ PING_QUERY=SELECT 1; # Timezone del container (per i log) TZ=Europe/Rome + +# API/Webapp FastAPI (Uvicorn) +WEB_HOST=0.0.0.0 +WEB_PORT=8080 + +# Storage storico (retention RRD-like) +RRD_DB_PATH=data/connection_rrd.sqlite3 +RRD_RETENTION_HOURS=48 diff --git a/.gitignore b/.gitignore index e16e790..ae0db31 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,10 @@ env/ # Docker .dockerignore +# Storage storico locale +data/ +*.sqlite3 + # OS .DS_Store Thumbs.db diff --git a/Dockerfile b/Dockerfile index dbedc2b..2460d26 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,6 +5,6 @@ WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt -COPY app.py . +COPY . . -CMD ["python", "-u", "app.py"] +CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8080"] diff --git a/README.md b/README.md index 60b8057..c099bcc 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,12 @@ Tool Python dockerizzato pensato per mantenere "attivo" un database Supabase in L'idea e' semplice: il container resta in esecuzione fino a quando non viene fermato esplicitamente e, a intervalli regolari, esegue un'operazione verso il database Supabase per generare attivita' sufficiente a non far decadere l'istanza. +In piu', il servizio espone una webapp/API con FastAPI servita da Uvicorn: + +- dashboard in stile SmokePing; +- API read-only per stato e storico; +- documentazione Swagger in `/docs` e OpenAPI in `/openapi.json`. + ## Obiettivo I progetti Supabase in free tier possono diventare temporaneamente inaccessibili se rimangono inattivi troppo a lungo. Questo progetto serve a: @@ -20,8 +26,9 @@ Il servizio viene eseguito all'interno di un container Docker e resta attivo in 1. legge la configurazione dal file `.env`; 2. apre una connessione al database Supabase/PostgreSQL; 3. esegue una query o un'operazione leggera di keep-alive; -4. attende l'intervallo configurato; -5. ripete il processo fino allo stop del container. +4. salva un campione storico (successo/errore + latenza) in uno storage locale a buffer circolare dimensionato per 48 ore; +5. attende l'intervallo configurato; +6. ripete il processo fino allo stop del container. ## Configurazione @@ -40,6 +47,10 @@ SUPABASE_DB_PASSWORD= PING_INTERVAL_MINUTES=4320 PING_QUERY=SELECT 1; TZ=Europe/Rome +WEB_HOST=0.0.0.0 +WEB_PORT=8080 +RRD_DB_PATH=data/connection_rrd.sqlite3 +RRD_RETENTION_HOURS=48 ``` ### Significato delle variabili @@ -52,6 +63,20 @@ TZ=Europe/Rome - `PING_INTERVAL_MINUTES`: intervallo tra un keep-alive e il successivo. Default `4320` (72 ore, circa 3 volte a settimana). - `PING_QUERY`: query leggera da eseguire per generare attivita'. - `TZ`: timezone del container per logging e scheduling coerenti. +- `WEB_HOST`: host di bind del server Uvicorn. +- `WEB_PORT`: porta HTTP della webapp/API. +- `RRD_DB_PATH`: path del database storico locale (RRD-like a buffer circolare). +- `RRD_RETENTION_HOURS`: retention dello storico, default `48`. + +## API e Swagger + +Endpoint principali: + +- `GET /api/status`: ultimo campione disponibile; +- `GET /api/history?hours=48`: storico campioni (finestra max 48h); +- `GET /`: dashboard web in stile SmokePing; +- `GET /docs`: Swagger UI; +- `GET /openapi.json`: schema OpenAPI. ## Docker @@ -69,6 +94,7 @@ docker build -t supabase-pinger . docker run -d \ --name supabase-pinger \ --env-file .env \ + -p 8080:8080 \ --restart unless-stopped \ supabase-pinger ``` @@ -90,6 +116,8 @@ services: container_name: supabase-pinger env_file: - .env + ports: + - "8080:8080" restart: unless-stopped ``` @@ -112,15 +140,24 @@ docker compose down |-- .env # credenziali locali (non versionato) |-- .env.example # modello variabili senza segreti |-- .gitignore -|-- app.py # entrypoint del servizio +|-- app.py # app FastAPI + collector keep-alive |-- docker-compose.yml |-- Dockerfile |-- prd.md # product requirements document |-- progress.md # piano e stato di sviluppo |-- README.md +|-- tests/ `-- requirements.txt ``` +## Test + +Esecuzione test: + +```bash +pytest -q +``` + ## Note operative - Non committare mai il file `.env` con credenziali reali. diff --git a/app.py b/app.py index d2d436e..d03097b 100644 --- a/app.py +++ b/app.py @@ -1,25 +1,24 @@ -""" -supabase-pinger ---------------- -Servizio long-running che esegue periodicamente una query di keep-alive -verso un database Supabase/PostgreSQL per evitarne la sospensione automatica -nel free tier. +"""Supabase pinger con API FastAPI, storage storico a buffer circolare e dashboard.""" -Configurazione: file .env (vedi .env.example) -""" +from __future__ import annotations import logging import os -import signal +import sqlite3 import sys +import threading import time +from contextlib import asynccontextmanager +from dataclasses import dataclass +from pathlib import Path +from typing import Any import psycopg2 +import uvicorn from dotenv import load_dotenv +from fastapi import FastAPI, HTTPException, Query, Request +from fastapi.responses import HTMLResponse -# --------------------------------------------------------------------------- -# Logging -# --------------------------------------------------------------------------- logging.basicConfig( level=logging.INFO, @@ -29,27 +28,7 @@ logging.basicConfig( ) log = logging.getLogger("supabase-pinger") -# --------------------------------------------------------------------------- -# Shutdown ordinato -# --------------------------------------------------------------------------- - -_shutdown = False - - -def _handle_signal(signum, _frame): - global _shutdown - log.info("Segnale di arresto ricevuto (%s). Chiusura in corso...", signum) - _shutdown = True - - -signal.signal(signal.SIGTERM, _handle_signal) -signal.signal(signal.SIGINT, _handle_signal) - -# --------------------------------------------------------------------------- -# Configurazione -# --------------------------------------------------------------------------- - -_REQUIRED_VARS = [ +REQUIRED_VARS = [ "SUPABASE_DB_HOST", "SUPABASE_DB_PORT", "SUPABASE_DB_NAME", @@ -58,79 +37,171 @@ _REQUIRED_VARS = [ ] -def load_config() -> dict: - """Carica e valida la configurazione dal file .env o dall'ambiente.""" +@dataclass +class Settings: + db_host: str + db_port: int + db_name: str + db_user: str + db_password: str + ping_query: str + ping_interval_minutes: int + web_host: str + web_port: int + rrd_db_path: str + rrd_retention_hours: int + + @property + def max_samples(self) -> int: + samples = int((self.rrd_retention_hours * 60) / self.ping_interval_minutes) + return max(1, samples) + + +def load_config() -> Settings: + """Carica e valida la configurazione da ambiente/.env.""" load_dotenv() - missing = [v for v in _REQUIRED_VARS if not os.environ.get(v)] + missing = [name for name in REQUIRED_VARS if not os.environ.get(name)] if missing: - log.error( - "Variabili di ambiente obbligatorie mancanti: %s", ", ".join(missing) + raise RuntimeError( + f"Variabili obbligatorie mancanti: {', '.join(missing)}" ) - sys.exit(1) try: - port = int(os.environ["SUPABASE_DB_PORT"]) - except ValueError: - log.error("SUPABASE_DB_PORT deve essere un numero intero.") - sys.exit(1) - - try: - interval = int(os.environ.get("PING_INTERVAL_MINUTES", "4320")) - if interval <= 0: + db_port = int(os.environ["SUPABASE_DB_PORT"]) + ping_interval = int(os.environ.get("PING_INTERVAL_MINUTES", "4320")) + web_port = int(os.environ.get("WEB_PORT", "8080")) + retention_hours = int(os.environ.get("RRD_RETENTION_HOURS", "48")) + if ping_interval <= 0 or web_port <= 0 or retention_hours <= 0: raise ValueError - except ValueError: - log.error("PING_INTERVAL_MINUTES deve essere un numero intero positivo.") - sys.exit(1) + except ValueError as exc: + raise RuntimeError( + "SUPABASE_DB_PORT, PING_INTERVAL_MINUTES, WEB_PORT e " + "RRD_RETENTION_HOURS devono essere interi positivi." + ) from exc - return { - "host": os.environ["SUPABASE_DB_HOST"], - "port": port, - "dbname": os.environ["SUPABASE_DB_NAME"], - "user": os.environ["SUPABASE_DB_USER"], - "password": os.environ["SUPABASE_DB_PASSWORD"], - "query": os.environ.get("PING_QUERY", "SELECT 1;"), - "interval_minutes": interval, - } - -# --------------------------------------------------------------------------- -# Keep-alive -# --------------------------------------------------------------------------- + return Settings( + db_host=os.environ["SUPABASE_DB_HOST"], + db_port=db_port, + db_name=os.environ["SUPABASE_DB_NAME"], + db_user=os.environ["SUPABASE_DB_USER"], + db_password=os.environ["SUPABASE_DB_PASSWORD"], + ping_query=os.environ.get("PING_QUERY", "SELECT 1;"), + ping_interval_minutes=ping_interval, + web_host=os.environ.get("WEB_HOST", "0.0.0.0"), + web_port=web_port, + rrd_db_path=os.environ.get("RRD_DB_PATH", "data/connection_rrd.sqlite3"), + rrd_retention_hours=retention_hours, + ) -def ping(cfg: dict) -> bool: - """ - Apre una connessione, esegue la query di keep-alive e chiude tutto. - Restituisce True in caso di successo, False in caso di errore. - Non espone mai credenziali nei log. - """ +class RRDStore: + """Storage storico a buffer circolare, dimensionato a finestra 48h.""" + + def __init__(self, path: str, max_samples: int) -> None: + self.path = path + self.max_samples = max_samples + Path(path).parent.mkdir(parents=True, exist_ok=True) + self._init_db() + + def _connect(self) -> sqlite3.Connection: + conn = sqlite3.connect(self.path) + conn.row_factory = sqlite3.Row + return conn + + def _init_db(self) -> None: + with self._connect() as conn: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS samples ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ts INTEGER NOT NULL, + success INTEGER NOT NULL, + latency_ms REAL, + error_message TEXT + ); + """ + ) + conn.commit() + + def add_sample( + self, *, ts: int, success: bool, latency_ms: float | None, error_message: str | None + ) -> None: + with self._connect() as conn: + conn.execute( + """ + INSERT INTO samples (ts, success, latency_ms, error_message) + VALUES (?, ?, ?, ?) + """, + (ts, 1 if success else 0, latency_ms, error_message), + ) + conn.execute( + """ + DELETE FROM samples + WHERE id NOT IN ( + SELECT id FROM samples ORDER BY id DESC LIMIT ? + ) + """, + (self.max_samples,), + ) + conn.commit() + + def latest(self) -> dict[str, Any] | None: + with self._connect() as conn: + row = conn.execute( + """ + SELECT ts, success, latency_ms, error_message + FROM samples + ORDER BY id DESC + LIMIT 1 + """ + ).fetchone() + return self._row_to_dict(row) if row else None + + def history(self, hours: int) -> list[dict[str, Any]]: + min_ts = int(time.time()) - (hours * 3600) + with self._connect() as conn: + rows = conn.execute( + """ + SELECT ts, success, latency_ms, error_message + FROM samples + WHERE ts >= ? + ORDER BY ts ASC + """, + (min_ts,), + ).fetchall() + return [self._row_to_dict(row) for row in rows] + + @staticmethod + def _row_to_dict(row: sqlite3.Row) -> dict[str, Any]: + return { + "timestamp": int(row["ts"]), + "success": bool(row["success"]), + "latency_ms": row["latency_ms"], + "error_message": row["error_message"], + } + + +def run_ping(settings: Settings) -> tuple[bool, float | None, str | None]: + """Esegue il keep-alive e restituisce esito, latenza, eventuale errore.""" conn = None + started = time.perf_counter() + try: - log.info( - "Connessione a %s:%d/%s...", - cfg["host"], - cfg["port"], - cfg["dbname"], - ) conn = psycopg2.connect( - host=cfg["host"], - port=cfg["port"], - dbname=cfg["dbname"], - user=cfg["user"], - password=cfg["password"], + host=settings.db_host, + port=settings.db_port, + dbname=settings.db_name, + user=settings.db_user, + password=settings.db_password, connect_timeout=10, ) with conn.cursor() as cur: - log.info("Esecuzione query di keep-alive: %s", cfg["query"]) - cur.execute(cfg["query"]) - log.info("Keep-alive eseguito con successo.") - return True - except psycopg2.OperationalError as exc: - log.error("Errore di connessione al database: %s", exc) - return False + cur.execute(settings.ping_query) + latency_ms = (time.perf_counter() - started) * 1000 + return True, latency_ms, None except psycopg2.Error as exc: - log.error("Errore durante l'esecuzione della query: %s", exc) - return False + return False, None, str(exc) finally: if conn is not None: try: @@ -138,52 +209,213 @@ def ping(cfg: dict) -> bool: except Exception: pass -# --------------------------------------------------------------------------- -# Loop principale -# --------------------------------------------------------------------------- +def collector_loop(settings: Settings, store: RRDStore, stop_event: threading.Event) -> None: + interval_seconds = settings.ping_interval_minutes * 60 + log.info( + "Collector keep-alive avviato (interval=%s min, retention=%s ore, max_samples=%s)", + settings.ping_interval_minutes, + settings.rrd_retention_hours, + settings.max_samples, + ) -def run(cfg: dict) -> None: - """Loop principale del servizio.""" - interval_seconds = cfg["interval_minutes"] * 60 - - log.info("=" * 60) - log.info("supabase-pinger avviato") - log.info("Host : %s:%d/%s", cfg["host"], cfg["port"], cfg["dbname"]) - log.info("Intervallo : %d minuti", cfg["interval_minutes"]) - log.info("Query : %s", cfg["query"]) - log.info("=" * 60) - - while not _shutdown: - success = ping(cfg) - - if not success: - log.warning( - "Keep-alive fallito. Nuovo tentativo al prossimo ciclo." - ) - - if _shutdown: - break - - next_run = time.strftime( - "%Y-%m-%d %H:%M:%S", - time.localtime(time.time() + interval_seconds), + while not stop_event.is_set(): + success, latency_ms, error_message = run_ping(settings) + sample_ts = int(time.time()) + store.add_sample( + ts=sample_ts, + success=success, + latency_ms=latency_ms, + error_message=error_message, ) - log.info("Prossima esecuzione: %s. In attesa...", next_run) - # Sleep a blocchi per rispondere rapidamente a SIGTERM/SIGINT - elapsed = 0 - while elapsed < interval_seconds and not _shutdown: - time.sleep(min(10, interval_seconds - elapsed)) - elapsed += 10 + if success: + log.info("Keep-alive OK (latency_ms=%.2f)", latency_ms or 0.0) + else: + log.warning("Keep-alive failed: %s", error_message) - log.info("supabase-pinger arrestato.") + wait_seconds = 0 + while wait_seconds < interval_seconds and not stop_event.is_set(): + time.sleep(min(5, interval_seconds - wait_seconds)) + wait_seconds += 5 + + log.info("Collector keep-alive arrestato") -# --------------------------------------------------------------------------- -# Entrypoint -# --------------------------------------------------------------------------- +def _build_dashboard_html() -> str: + return """ + + + + + + Supabase Pinger Dashboard + + + +
+
+
+

Supabase Pinger - SmokePing Style

+
+ N/A +
+
+
Caricamento...
+ +
+
+ + + + +""" + + +def create_app( + config_override: Settings | None = None, + start_collector: bool = True, +) -> FastAPI: + @asynccontextmanager + async def lifespan(app: FastAPI): + settings = config_override or load_config() + store = RRDStore(settings.rrd_db_path, settings.max_samples) + + app.state.settings = settings + app.state.store = store + app.state.collector_stop_event = threading.Event() + app.state.collector_thread = None + + if start_collector: + thread = threading.Thread( + target=collector_loop, + args=(settings, store, app.state.collector_stop_event), + daemon=True, + name="collector-thread", + ) + thread.start() + app.state.collector_thread = thread + + yield + + app.state.collector_stop_event.set() + if app.state.collector_thread is not None: + app.state.collector_thread.join(timeout=5) + + app = FastAPI( + title="Supabase Pinger API", + version="1.1.0", + description=( + "API read-only per monitoraggio storico connessione Supabase " + "(retention RRD 48h) e dashboard SmokePing-style." + ), + lifespan=lifespan, + ) + + @app.get("/", response_class=HTMLResponse, tags=["dashboard"]) + def dashboard() -> str: + return _build_dashboard_html() + + @app.get("/api/status", tags=["monitoring"]) + def api_status(request: Request) -> dict[str, Any]: + latest = request.app.state.store.latest() + if latest is None: + return { + "success": None, + "timestamp": None, + "latency_ms": None, + "error_message": "Nessun campione disponibile", + } + return latest + + @app.get("/api/history", tags=["monitoring"]) + def api_history( + request: Request, + hours: int = Query(48, ge=1, le=48), + ) -> dict[str, Any]: + if hours > request.app.state.settings.rrd_retention_hours: + raise HTTPException(status_code=400, detail="Finestra richiesta oltre retention") + samples = request.app.state.store.history(hours) + return { + "hours": hours, + "count": len(samples), + "samples": samples, + } + + return app + + +app = create_app() + if __name__ == "__main__": - config = load_config() - run(config) + cfg = load_config() + uvicorn.run("app:app", host=cfg.web_host, port=cfg.web_port, log_level="info") diff --git a/docker-compose.yml b/docker-compose.yml index 01433a9..3565f2c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -4,4 +4,6 @@ services: container_name: supabase-pinger env_file: - .env + ports: + - "8080:8080" restart: unless-stopped diff --git a/progress.md b/progress.md index 3e8473a..c5666e6 100644 --- a/progress.md +++ b/progress.md @@ -268,7 +268,7 @@ Verifica: Stato: -- **pianificata** +- **completata** — implementata app FastAPI con runtime Uvicorn, endpoint `/api/status` e `/api/history`, dashboard web in `/`, documentazione Swagger/OpenAPI in `/docs` e `/openapi.json`, storage storico RRD-like con retention 48h e test automatici `pytest` verdi (3 passed). ## Backlog Post-V1 @@ -310,6 +310,6 @@ La V1 e' completata quando: ## Prossima Attivita' Operativa -Fasi 0-6 completate. Prossimo step: avvio Fase 7 per implementazione FastAPI + Uvicorn + API + Swagger della dashboard storico. +Fasi 0-7 completate. V1.1 consegnata con FastAPI/Uvicorn, API read-only, Swagger/OpenAPI, dashboard storico e test automatici. Nota operativa: usare host pooler (`aws-1-eu-central-1.pooler.supabase.com`) su porta `6543` con utente `postgres.` per ambienti senza connettivita' IPv6. diff --git a/requirements.txt b/requirements.txt index 32f676f..93d4391 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,6 @@ -psycopg2-binary==2.9.9 +psycopg2-binary==2.9.10 python-dotenv==1.0.1 +fastapi==0.115.12 +uvicorn==0.30.6 +pytest==8.3.3 +httpx==0.27.2 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..2a855d9 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,6 @@ +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 0000000..d964cd9 --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,79 @@ +import tempfile +import time +from pathlib import Path + +from fastapi.testclient import TestClient + +from app import Settings, create_app + + +def build_settings(db_path: str) -> Settings: + return Settings( + db_host="localhost", + db_port=5432, + db_name="postgres", + db_user="user", + db_password="password", + ping_query="SELECT 1;", + ping_interval_minutes=30, + web_host="0.0.0.0", + web_port=8080, + rrd_db_path=db_path, + rrd_retention_hours=48, + ) + + +def test_swagger_and_openapi_available(): + with tempfile.TemporaryDirectory() as tmp: + db_path = str(Path(tmp) / "rrd.sqlite3") + app = create_app(config_override=build_settings(db_path), start_collector=False) + + with TestClient(app) as client: + docs = client.get("/docs") + assert docs.status_code == 200 + assert "Swagger UI" in docs.text + + openapi = client.get("/openapi.json") + assert openapi.status_code == 200 + payload = openapi.json() + assert "/api/history" in payload["paths"] + assert "/api/status" in payload["paths"] + + +def test_history_api_window_and_ring_buffer(): + with tempfile.TemporaryDirectory() as tmp: + db_path = str(Path(tmp) / "rrd.sqlite3") + settings = build_settings(db_path) + app = create_app(config_override=settings, start_collector=False) + + with TestClient(app) as client: + store = app.state.store + now_ts = int(time.time()) + for idx in range(settings.max_samples + 5): + store.add_sample( + ts=now_ts - (settings.max_samples + 5) + idx, + success=(idx % 2 == 0), + latency_ms=float(idx), + error_message=None if idx % 2 == 0 else "error", + ) + + history = client.get("/api/history", params={"hours": 48}) + assert history.status_code == 200 + payload = history.json() + assert payload["count"] == settings.max_samples + + invalid = client.get("/api/history", params={"hours": 49}) + assert invalid.status_code == 422 + + +def test_status_api_when_no_samples(): + with tempfile.TemporaryDirectory() as tmp: + db_path = str(Path(tmp) / "rrd.sqlite3") + app = create_app(config_override=build_settings(db_path), start_collector=False) + + with TestClient(app) as client: + status = client.get("/api/status") + assert status.status_code == 200 + payload = status.json() + assert payload["success"] is None + assert payload["timestamp"] is None