feat: implementa webapp FastAPI con API swagger e test

This commit is contained in:
Luca Sacchi Ricciardi
2026-04-24 13:56:32 +02:00
parent 24e7d5eede
commit e8ae3603e7
10 changed files with 509 additions and 137 deletions
+8
View File
@@ -15,3 +15,11 @@ PING_QUERY=SELECT 1;
# Timezone del container (per i log) # Timezone del container (per i log)
TZ=Europe/Rome TZ=Europe/Rome
# API/Webapp FastAPI (Uvicorn)
WEB_HOST=0.0.0.0
WEB_PORT=8080
# Storage storico (retention RRD-like)
RRD_DB_PATH=data/connection_rrd.sqlite3
RRD_RETENTION_HOURS=48
+4
View File
@@ -22,6 +22,10 @@ env/
# Docker # Docker
.dockerignore .dockerignore
# Storage storico locale
data/
*.sqlite3
# OS # OS
.DS_Store .DS_Store
Thumbs.db Thumbs.db
+2 -2
View File
@@ -5,6 +5,6 @@ WORKDIR /app
COPY requirements.txt . COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt RUN pip install --no-cache-dir -r requirements.txt
COPY app.py . COPY . .
CMD ["python", "-u", "app.py"] CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8080"]
+40 -3
View File
@@ -4,6 +4,12 @@ Tool Python dockerizzato pensato per mantenere "attivo" un database Supabase in
L'idea e' semplice: il container resta in esecuzione fino a quando non viene fermato esplicitamente e, a intervalli regolari, esegue un'operazione verso il database Supabase per generare attivita' sufficiente a non far decadere l'istanza. L'idea e' semplice: il container resta in esecuzione fino a quando non viene fermato esplicitamente e, a intervalli regolari, esegue un'operazione verso il database Supabase per generare attivita' sufficiente a non far decadere l'istanza.
In piu', il servizio espone una webapp/API con FastAPI servita da Uvicorn:
- dashboard in stile SmokePing;
- API read-only per stato e storico;
- documentazione Swagger in `/docs` e OpenAPI in `/openapi.json`.
## Obiettivo ## Obiettivo
I progetti Supabase in free tier possono diventare temporaneamente inaccessibili se rimangono inattivi troppo a lungo. Questo progetto serve a: I progetti Supabase in free tier possono diventare temporaneamente inaccessibili se rimangono inattivi troppo a lungo. Questo progetto serve a:
@@ -20,8 +26,9 @@ Il servizio viene eseguito all'interno di un container Docker e resta attivo in
1. legge la configurazione dal file `.env`; 1. legge la configurazione dal file `.env`;
2. apre una connessione al database Supabase/PostgreSQL; 2. apre una connessione al database Supabase/PostgreSQL;
3. esegue una query o un'operazione leggera di keep-alive; 3. esegue una query o un'operazione leggera di keep-alive;
4. attende l'intervallo configurato; 4. salva un campione storico (successo/errore + latenza) in uno storage locale a buffer circolare dimensionato per 48 ore;
5. ripete il processo fino allo stop del container. 5. attende l'intervallo configurato;
6. ripete il processo fino allo stop del container.
## Configurazione ## Configurazione
@@ -40,6 +47,10 @@ SUPABASE_DB_PASSWORD=
PING_INTERVAL_MINUTES=4320 PING_INTERVAL_MINUTES=4320
PING_QUERY=SELECT 1; PING_QUERY=SELECT 1;
TZ=Europe/Rome TZ=Europe/Rome
WEB_HOST=0.0.0.0
WEB_PORT=8080
RRD_DB_PATH=data/connection_rrd.sqlite3
RRD_RETENTION_HOURS=48
``` ```
### Significato delle variabili ### Significato delle variabili
@@ -52,6 +63,20 @@ TZ=Europe/Rome
- `PING_INTERVAL_MINUTES`: intervallo tra un keep-alive e il successivo. Default `4320` (72 ore, circa 3 volte a settimana). - `PING_INTERVAL_MINUTES`: intervallo tra un keep-alive e il successivo. Default `4320` (72 ore, circa 3 volte a settimana).
- `PING_QUERY`: query leggera da eseguire per generare attivita'. - `PING_QUERY`: query leggera da eseguire per generare attivita'.
- `TZ`: timezone del container per logging e scheduling coerenti. - `TZ`: timezone del container per logging e scheduling coerenti.
- `WEB_HOST`: host di bind del server Uvicorn.
- `WEB_PORT`: porta HTTP della webapp/API.
- `RRD_DB_PATH`: path del database storico locale (RRD-like a buffer circolare).
- `RRD_RETENTION_HOURS`: retention dello storico, default `48`.
## API e Swagger
Endpoint principali:
- `GET /api/status`: ultimo campione disponibile;
- `GET /api/history?hours=48`: storico campioni (finestra max 48h);
- `GET /`: dashboard web in stile SmokePing;
- `GET /docs`: Swagger UI;
- `GET /openapi.json`: schema OpenAPI.
## Docker ## Docker
@@ -69,6 +94,7 @@ docker build -t supabase-pinger .
docker run -d \ docker run -d \
--name supabase-pinger \ --name supabase-pinger \
--env-file .env \ --env-file .env \
-p 8080:8080 \
--restart unless-stopped \ --restart unless-stopped \
supabase-pinger supabase-pinger
``` ```
@@ -90,6 +116,8 @@ services:
container_name: supabase-pinger container_name: supabase-pinger
env_file: env_file:
- .env - .env
ports:
- "8080:8080"
restart: unless-stopped restart: unless-stopped
``` ```
@@ -112,15 +140,24 @@ docker compose down
|-- .env # credenziali locali (non versionato) |-- .env # credenziali locali (non versionato)
|-- .env.example # modello variabili senza segreti |-- .env.example # modello variabili senza segreti
|-- .gitignore |-- .gitignore
|-- app.py # entrypoint del servizio |-- app.py # app FastAPI + collector keep-alive
|-- docker-compose.yml |-- docker-compose.yml
|-- Dockerfile |-- Dockerfile
|-- prd.md # product requirements document |-- prd.md # product requirements document
|-- progress.md # piano e stato di sviluppo |-- progress.md # piano e stato di sviluppo
|-- README.md |-- README.md
|-- tests/
`-- requirements.txt `-- requirements.txt
``` ```
## Test
Esecuzione test:
```bash
pytest -q
```
## Note operative ## Note operative
- Non committare mai il file `.env` con credenziali reali. - Non committare mai il file `.env` con credenziali reali.
+361 -129
View File
@@ -1,25 +1,24 @@
""" """Supabase pinger con API FastAPI, storage storico a buffer circolare e dashboard."""
supabase-pinger
---------------
Servizio long-running che esegue periodicamente una query di keep-alive
verso un database Supabase/PostgreSQL per evitarne la sospensione automatica
nel free tier.
Configurazione: file .env (vedi .env.example) from __future__ import annotations
"""
import logging import logging
import os import os
import signal import sqlite3
import sys import sys
import threading
import time import time
from contextlib import asynccontextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import psycopg2 import psycopg2
import uvicorn
from dotenv import load_dotenv from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Query, Request
from fastapi.responses import HTMLResponse
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
@@ -29,27 +28,7 @@ logging.basicConfig(
) )
log = logging.getLogger("supabase-pinger") log = logging.getLogger("supabase-pinger")
# --------------------------------------------------------------------------- REQUIRED_VARS = [
# Shutdown ordinato
# ---------------------------------------------------------------------------
_shutdown = False
def _handle_signal(signum, _frame):
global _shutdown
log.info("Segnale di arresto ricevuto (%s). Chiusura in corso...", signum)
_shutdown = True
signal.signal(signal.SIGTERM, _handle_signal)
signal.signal(signal.SIGINT, _handle_signal)
# ---------------------------------------------------------------------------
# Configurazione
# ---------------------------------------------------------------------------
_REQUIRED_VARS = [
"SUPABASE_DB_HOST", "SUPABASE_DB_HOST",
"SUPABASE_DB_PORT", "SUPABASE_DB_PORT",
"SUPABASE_DB_NAME", "SUPABASE_DB_NAME",
@@ -58,79 +37,171 @@ _REQUIRED_VARS = [
] ]
def load_config() -> dict: @dataclass
"""Carica e valida la configurazione dal file .env o dall'ambiente.""" class Settings:
db_host: str
db_port: int
db_name: str
db_user: str
db_password: str
ping_query: str
ping_interval_minutes: int
web_host: str
web_port: int
rrd_db_path: str
rrd_retention_hours: int
@property
def max_samples(self) -> int:
samples = int((self.rrd_retention_hours * 60) / self.ping_interval_minutes)
return max(1, samples)
def load_config() -> Settings:
"""Carica e valida la configurazione da ambiente/.env."""
load_dotenv() load_dotenv()
missing = [v for v in _REQUIRED_VARS if not os.environ.get(v)] missing = [name for name in REQUIRED_VARS if not os.environ.get(name)]
if missing: if missing:
log.error( raise RuntimeError(
"Variabili di ambiente obbligatorie mancanti: %s", ", ".join(missing) f"Variabili obbligatorie mancanti: {', '.join(missing)}"
) )
sys.exit(1)
try: try:
port = int(os.environ["SUPABASE_DB_PORT"]) db_port = int(os.environ["SUPABASE_DB_PORT"])
except ValueError: ping_interval = int(os.environ.get("PING_INTERVAL_MINUTES", "4320"))
log.error("SUPABASE_DB_PORT deve essere un numero intero.") web_port = int(os.environ.get("WEB_PORT", "8080"))
sys.exit(1) retention_hours = int(os.environ.get("RRD_RETENTION_HOURS", "48"))
if ping_interval <= 0 or web_port <= 0 or retention_hours <= 0:
try:
interval = int(os.environ.get("PING_INTERVAL_MINUTES", "4320"))
if interval <= 0:
raise ValueError raise ValueError
except ValueError: except ValueError as exc:
log.error("PING_INTERVAL_MINUTES deve essere un numero intero positivo.") raise RuntimeError(
sys.exit(1) "SUPABASE_DB_PORT, PING_INTERVAL_MINUTES, WEB_PORT e "
"RRD_RETENTION_HOURS devono essere interi positivi."
) from exc
return { return Settings(
"host": os.environ["SUPABASE_DB_HOST"], db_host=os.environ["SUPABASE_DB_HOST"],
"port": port, db_port=db_port,
"dbname": os.environ["SUPABASE_DB_NAME"], db_name=os.environ["SUPABASE_DB_NAME"],
"user": os.environ["SUPABASE_DB_USER"], db_user=os.environ["SUPABASE_DB_USER"],
"password": os.environ["SUPABASE_DB_PASSWORD"], db_password=os.environ["SUPABASE_DB_PASSWORD"],
"query": os.environ.get("PING_QUERY", "SELECT 1;"), ping_query=os.environ.get("PING_QUERY", "SELECT 1;"),
"interval_minutes": interval, ping_interval_minutes=ping_interval,
} web_host=os.environ.get("WEB_HOST", "0.0.0.0"),
web_port=web_port,
# --------------------------------------------------------------------------- rrd_db_path=os.environ.get("RRD_DB_PATH", "data/connection_rrd.sqlite3"),
# Keep-alive rrd_retention_hours=retention_hours,
# --------------------------------------------------------------------------- )
def ping(cfg: dict) -> bool: class RRDStore:
""" """Storage storico a buffer circolare, dimensionato a finestra 48h."""
Apre una connessione, esegue la query di keep-alive e chiude tutto.
Restituisce True in caso di successo, False in caso di errore. def __init__(self, path: str, max_samples: int) -> None:
Non espone mai credenziali nei log. self.path = path
""" self.max_samples = max_samples
Path(path).parent.mkdir(parents=True, exist_ok=True)
self._init_db()
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.path)
conn.row_factory = sqlite3.Row
return conn
def _init_db(self) -> None:
with self._connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS samples (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts INTEGER NOT NULL,
success INTEGER NOT NULL,
latency_ms REAL,
error_message TEXT
);
"""
)
conn.commit()
def add_sample(
self, *, ts: int, success: bool, latency_ms: float | None, error_message: str | None
) -> None:
with self._connect() as conn:
conn.execute(
"""
INSERT INTO samples (ts, success, latency_ms, error_message)
VALUES (?, ?, ?, ?)
""",
(ts, 1 if success else 0, latency_ms, error_message),
)
conn.execute(
"""
DELETE FROM samples
WHERE id NOT IN (
SELECT id FROM samples ORDER BY id DESC LIMIT ?
)
""",
(self.max_samples,),
)
conn.commit()
def latest(self) -> dict[str, Any] | None:
with self._connect() as conn:
row = conn.execute(
"""
SELECT ts, success, latency_ms, error_message
FROM samples
ORDER BY id DESC
LIMIT 1
"""
).fetchone()
return self._row_to_dict(row) if row else None
def history(self, hours: int) -> list[dict[str, Any]]:
min_ts = int(time.time()) - (hours * 3600)
with self._connect() as conn:
rows = conn.execute(
"""
SELECT ts, success, latency_ms, error_message
FROM samples
WHERE ts >= ?
ORDER BY ts ASC
""",
(min_ts,),
).fetchall()
return [self._row_to_dict(row) for row in rows]
@staticmethod
def _row_to_dict(row: sqlite3.Row) -> dict[str, Any]:
return {
"timestamp": int(row["ts"]),
"success": bool(row["success"]),
"latency_ms": row["latency_ms"],
"error_message": row["error_message"],
}
def run_ping(settings: Settings) -> tuple[bool, float | None, str | None]:
"""Esegue il keep-alive e restituisce esito, latenza, eventuale errore."""
conn = None conn = None
started = time.perf_counter()
try: try:
log.info(
"Connessione a %s:%d/%s...",
cfg["host"],
cfg["port"],
cfg["dbname"],
)
conn = psycopg2.connect( conn = psycopg2.connect(
host=cfg["host"], host=settings.db_host,
port=cfg["port"], port=settings.db_port,
dbname=cfg["dbname"], dbname=settings.db_name,
user=cfg["user"], user=settings.db_user,
password=cfg["password"], password=settings.db_password,
connect_timeout=10, connect_timeout=10,
) )
with conn.cursor() as cur: with conn.cursor() as cur:
log.info("Esecuzione query di keep-alive: %s", cfg["query"]) cur.execute(settings.ping_query)
cur.execute(cfg["query"]) latency_ms = (time.perf_counter() - started) * 1000
log.info("Keep-alive eseguito con successo.") return True, latency_ms, None
return True
except psycopg2.OperationalError as exc:
log.error("Errore di connessione al database: %s", exc)
return False
except psycopg2.Error as exc: except psycopg2.Error as exc:
log.error("Errore durante l'esecuzione della query: %s", exc) return False, None, str(exc)
return False
finally: finally:
if conn is not None: if conn is not None:
try: try:
@@ -138,52 +209,213 @@ def ping(cfg: dict) -> bool:
except Exception: except Exception:
pass pass
# ---------------------------------------------------------------------------
# Loop principale
# ---------------------------------------------------------------------------
def collector_loop(settings: Settings, store: RRDStore, stop_event: threading.Event) -> None:
interval_seconds = settings.ping_interval_minutes * 60
log.info(
"Collector keep-alive avviato (interval=%s min, retention=%s ore, max_samples=%s)",
settings.ping_interval_minutes,
settings.rrd_retention_hours,
settings.max_samples,
)
def run(cfg: dict) -> None: while not stop_event.is_set():
"""Loop principale del servizio.""" success, latency_ms, error_message = run_ping(settings)
interval_seconds = cfg["interval_minutes"] * 60 sample_ts = int(time.time())
store.add_sample(
log.info("=" * 60) ts=sample_ts,
log.info("supabase-pinger avviato") success=success,
log.info("Host : %s:%d/%s", cfg["host"], cfg["port"], cfg["dbname"]) latency_ms=latency_ms,
log.info("Intervallo : %d minuti", cfg["interval_minutes"]) error_message=error_message,
log.info("Query : %s", cfg["query"])
log.info("=" * 60)
while not _shutdown:
success = ping(cfg)
if not success:
log.warning(
"Keep-alive fallito. Nuovo tentativo al prossimo ciclo."
)
if _shutdown:
break
next_run = time.strftime(
"%Y-%m-%d %H:%M:%S",
time.localtime(time.time() + interval_seconds),
) )
log.info("Prossima esecuzione: %s. In attesa...", next_run)
# Sleep a blocchi per rispondere rapidamente a SIGTERM/SIGINT if success:
elapsed = 0 log.info("Keep-alive OK (latency_ms=%.2f)", latency_ms or 0.0)
while elapsed < interval_seconds and not _shutdown: else:
time.sleep(min(10, interval_seconds - elapsed)) log.warning("Keep-alive failed: %s", error_message)
elapsed += 10
log.info("supabase-pinger arrestato.") wait_seconds = 0
while wait_seconds < interval_seconds and not stop_event.is_set():
time.sleep(min(5, interval_seconds - wait_seconds))
wait_seconds += 5
log.info("Collector keep-alive arrestato")
# --------------------------------------------------------------------------- def _build_dashboard_html() -> str:
# Entrypoint return """
# --------------------------------------------------------------------------- <!doctype html>
<html lang=\"it\">
<head>
<meta charset=\"utf-8\" />
<meta name=\"viewport\" content=\"width=device-width, initial-scale=1\" />
<title>Supabase Pinger Dashboard</title>
<style>
:root {
--bg: #101418;
--panel: #17202a;
--line: #3ecf8e;
--danger: #ff6b6b;
--text: #edf2f7;
--muted: #9fb3c8;
}
body { margin: 0; font-family: 'Segoe UI', sans-serif; background: radial-gradient(circle at 20% 20%, #1f2937, var(--bg)); color: var(--text); }
.wrap { max-width: 1100px; margin: 0 auto; padding: 24px; }
.panel { background: color-mix(in srgb, var(--panel), #000 15%); border: 1px solid #283544; border-radius: 14px; padding: 18px; }
.top { display: flex; justify-content: space-between; gap: 16px; flex-wrap: wrap; }
.badge { padding: 6px 10px; border-radius: 999px; font-weight: 700; font-size: 13px; }
.ok { background: rgba(62, 207, 142, .15); color: #7ff5bd; }
.ko { background: rgba(255, 107, 107, .15); color: #ff9f9f; }
.meta { color: var(--muted); font-size: 14px; }
canvas { margin-top: 18px; }
</style>
</head>
<body>
<div class=\"wrap\">
<div class=\"panel\">
<div class=\"top\">
<h1>Supabase Pinger - SmokePing Style</h1>
<div>
<span id=\"statusBadge\" class=\"badge\">N/A</span>
</div>
</div>
<div class=\"meta\" id=\"meta\">Caricamento...</div>
<canvas id=\"latencyChart\"></canvas>
</div>
</div>
<script src=\"https://cdn.jsdelivr.net/npm/chart.js\"></script>
<script>
const badge = document.getElementById('statusBadge');
const meta = document.getElementById('meta');
const ctx = document.getElementById('latencyChart').getContext('2d');
const chart = new Chart(ctx, {
type: 'line',
data: { labels: [], datasets: [
{ label: 'Latenza ms', data: [], borderColor: '#3ecf8e', tension: 0.2 },
{ label: 'Errori', data: [], borderColor: '#ff6b6b', stepped: true, yAxisID: 'y1' }
] },
options: {
responsive: true,
scales: {
y: { title: { display: true, text: 'ms' } },
y1: { position: 'right', min: 0, max: 1, ticks: { stepSize: 1 } }
}
}
});
async function load() {
const [statusRes, historyRes] = await Promise.all([
fetch('/api/status'),
fetch('/api/history?hours=48')
]);
const status = await statusRes.json();
const history = await historyRes.json();
if (status.success === true) {
badge.textContent = 'UP';
badge.className = 'badge ok';
} else if (status.success === false) {
badge.textContent = 'DOWN';
badge.className = 'badge ko';
} else {
badge.textContent = 'N/A';
badge.className = 'badge';
}
meta.textContent = `Campioni 48h: ${history.count} | Ultimo campione: ${status.timestamp ? new Date(status.timestamp * 1000).toLocaleString() : 'n/a'}`;
chart.data.labels = history.samples.map(s => new Date(s.timestamp * 1000).toLocaleTimeString());
chart.data.datasets[0].data = history.samples.map(s => s.latency_ms);
chart.data.datasets[1].data = history.samples.map(s => s.success ? 0 : 1);
chart.update();
}
load().catch((err) => {
meta.textContent = `Errore caricamento dashboard: ${err}`;
});
setInterval(() => load().catch(() => {}), 30000);
</script>
</body>
</html>
"""
def create_app(
config_override: Settings | None = None,
start_collector: bool = True,
) -> FastAPI:
@asynccontextmanager
async def lifespan(app: FastAPI):
settings = config_override or load_config()
store = RRDStore(settings.rrd_db_path, settings.max_samples)
app.state.settings = settings
app.state.store = store
app.state.collector_stop_event = threading.Event()
app.state.collector_thread = None
if start_collector:
thread = threading.Thread(
target=collector_loop,
args=(settings, store, app.state.collector_stop_event),
daemon=True,
name="collector-thread",
)
thread.start()
app.state.collector_thread = thread
yield
app.state.collector_stop_event.set()
if app.state.collector_thread is not None:
app.state.collector_thread.join(timeout=5)
app = FastAPI(
title="Supabase Pinger API",
version="1.1.0",
description=(
"API read-only per monitoraggio storico connessione Supabase "
"(retention RRD 48h) e dashboard SmokePing-style."
),
lifespan=lifespan,
)
@app.get("/", response_class=HTMLResponse, tags=["dashboard"])
def dashboard() -> str:
return _build_dashboard_html()
@app.get("/api/status", tags=["monitoring"])
def api_status(request: Request) -> dict[str, Any]:
latest = request.app.state.store.latest()
if latest is None:
return {
"success": None,
"timestamp": None,
"latency_ms": None,
"error_message": "Nessun campione disponibile",
}
return latest
@app.get("/api/history", tags=["monitoring"])
def api_history(
request: Request,
hours: int = Query(48, ge=1, le=48),
) -> dict[str, Any]:
if hours > request.app.state.settings.rrd_retention_hours:
raise HTTPException(status_code=400, detail="Finestra richiesta oltre retention")
samples = request.app.state.store.history(hours)
return {
"hours": hours,
"count": len(samples),
"samples": samples,
}
return app
app = create_app()
if __name__ == "__main__": if __name__ == "__main__":
config = load_config() cfg = load_config()
run(config) uvicorn.run("app:app", host=cfg.web_host, port=cfg.web_port, log_level="info")
+2
View File
@@ -4,4 +4,6 @@ services:
container_name: supabase-pinger container_name: supabase-pinger
env_file: env_file:
- .env - .env
ports:
- "8080:8080"
restart: unless-stopped restart: unless-stopped
+2 -2
View File
@@ -268,7 +268,7 @@ Verifica:
Stato: Stato:
- **pianificata** - **completata** — implementata app FastAPI con runtime Uvicorn, endpoint `/api/status` e `/api/history`, dashboard web in `/`, documentazione Swagger/OpenAPI in `/docs` e `/openapi.json`, storage storico RRD-like con retention 48h e test automatici `pytest` verdi (3 passed).
## Backlog Post-V1 ## Backlog Post-V1
@@ -310,6 +310,6 @@ La V1 e' completata quando:
## Prossima Attivita' Operativa ## Prossima Attivita' Operativa
Fasi 0-6 completate. Prossimo step: avvio Fase 7 per implementazione FastAPI + Uvicorn + API + Swagger della dashboard storico. Fasi 0-7 completate. V1.1 consegnata con FastAPI/Uvicorn, API read-only, Swagger/OpenAPI, dashboard storico e test automatici.
Nota operativa: usare host pooler (`aws-1-eu-central-1.pooler.supabase.com`) su porta `6543` con utente `postgres.<project-ref>` per ambienti senza connettivita' IPv6. Nota operativa: usare host pooler (`aws-1-eu-central-1.pooler.supabase.com`) su porta `6543` con utente `postgres.<project-ref>` per ambienti senza connettivita' IPv6.
+5 -1
View File
@@ -1,2 +1,6 @@
psycopg2-binary==2.9.9 psycopg2-binary==2.9.10
python-dotenv==1.0.1 python-dotenv==1.0.1
fastapi==0.115.12
uvicorn==0.30.6
pytest==8.3.3
httpx==0.27.2
+6
View File
@@ -0,0 +1,6 @@
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
+79
View File
@@ -0,0 +1,79 @@
import tempfile
import time
from pathlib import Path
from fastapi.testclient import TestClient
from app import Settings, create_app
def build_settings(db_path: str) -> Settings:
return Settings(
db_host="localhost",
db_port=5432,
db_name="postgres",
db_user="user",
db_password="password",
ping_query="SELECT 1;",
ping_interval_minutes=30,
web_host="0.0.0.0",
web_port=8080,
rrd_db_path=db_path,
rrd_retention_hours=48,
)
def test_swagger_and_openapi_available():
with tempfile.TemporaryDirectory() as tmp:
db_path = str(Path(tmp) / "rrd.sqlite3")
app = create_app(config_override=build_settings(db_path), start_collector=False)
with TestClient(app) as client:
docs = client.get("/docs")
assert docs.status_code == 200
assert "Swagger UI" in docs.text
openapi = client.get("/openapi.json")
assert openapi.status_code == 200
payload = openapi.json()
assert "/api/history" in payload["paths"]
assert "/api/status" in payload["paths"]
def test_history_api_window_and_ring_buffer():
with tempfile.TemporaryDirectory() as tmp:
db_path = str(Path(tmp) / "rrd.sqlite3")
settings = build_settings(db_path)
app = create_app(config_override=settings, start_collector=False)
with TestClient(app) as client:
store = app.state.store
now_ts = int(time.time())
for idx in range(settings.max_samples + 5):
store.add_sample(
ts=now_ts - (settings.max_samples + 5) + idx,
success=(idx % 2 == 0),
latency_ms=float(idx),
error_message=None if idx % 2 == 0 else "error",
)
history = client.get("/api/history", params={"hours": 48})
assert history.status_code == 200
payload = history.json()
assert payload["count"] == settings.max_samples
invalid = client.get("/api/history", params={"hours": 49})
assert invalid.status_code == 422
def test_status_api_when_no_samples():
with tempfile.TemporaryDirectory() as tmp:
db_path = str(Path(tmp) / "rrd.sqlite3")
app = create_app(config_override=build_settings(db_path), start_collector=False)
with TestClient(app) as client:
status = client.get("/api/status")
assert status.status_code == 200
payload = status.json()
assert payload["success"] is None
assert payload["timestamp"] is None