feat(ingestion): implement log monitoring script with webhook integration

- Add logwhisperer.sh script for tailing and monitoring system logs
- Implement pattern matching for critical errors (FATAL, ERROR, OOM, segfault)
- Add JSON payload generation with severity levels
- Implement rate limiting and offset tracking per log source
- Add install.sh with interactive configuration and systemd support
- Create comprehensive test suite with pytest
- Add technical specification documentation
- Update CHANGELOG.md following Common Changelog standard

All 12 tests passing. Follows Metodo Sacchi (Safety first, little often, double check).
This commit is contained in:
Luca Sacchi Ricciardi
2026-04-02 16:09:00 +02:00
parent 34dbba1201
commit 69f475ec78
6 changed files with 1148 additions and 0 deletions

194
tests/test_logwhisperer.py Normal file
View File

@@ -0,0 +1,194 @@
import pytest
import subprocess
import os
import json
import tempfile
import time
import signal
import threading
SCRIPT_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "scripts", "logwhisperer.sh")
@pytest.fixture
def temp_log_file():
"""Create a temporary log file for testing."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as f:
f.write("Apr 2 10:00:00 server kernel: Normal log line\n")
f.flush()
yield f.name
os.unlink(f.name)
@pytest.fixture
def temp_config():
"""Create a temporary config file for testing."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".env", delete=False) as f:
f.write('WEBHOOK_URL="http://localhost:9999/webhook"\n')
f.write('CLIENT_ID="test-client-uuid"\n')
f.write('LOG_SOURCES="/tmp/test.log"\n')
f.write("POLL_INTERVAL=1\n")
f.write("MAX_LINE_LENGTH=2000\n")
f.flush()
yield f.name
os.unlink(f.name)
class TestScriptExistence:
"""Test that the script exists and is executable."""
def test_script_exists(self):
assert os.path.exists(SCRIPT_PATH), f"Script not found at {SCRIPT_PATH}"
def test_script_is_executable(self):
assert os.access(SCRIPT_PATH, os.X_OK), f"Script is not executable: {SCRIPT_PATH}"
def test_script_has_shebang(self):
with open(SCRIPT_PATH, "r") as f:
first_line = f.readline().strip()
assert first_line.startswith("#!/bin/bash"), f"Missing bash shebang, got: {first_line}"
class TestScriptValidation:
"""Test script validation and help output."""
def test_script_help_flag(self):
result = subprocess.run(
[SCRIPT_PATH, "--help"],
capture_output=True,
text=True,
timeout=10,
)
assert result.returncode == 0
assert "usage" in result.stdout.lower() or "help" in result.stdout.lower()
def test_script_validate_config(self):
"""Test --validate flag with a good config."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".env", delete=False) as f:
f.write('WEBHOOK_URL="http://localhost:9999/webhook"\n')
f.write('CLIENT_ID="test-uuid"\n')
f.write('LOG_SOURCES="/var/log/syslog"\n')
f.write("POLL_INTERVAL=5\n")
config_path = f.name
try:
result = subprocess.run(
[SCRIPT_PATH, "--validate", "--config", config_path],
capture_output=True,
text=True,
timeout=10,
)
assert result.returncode == 0
finally:
os.unlink(config_path)
def test_script_validate_missing_config(self):
"""Test --validate flag with missing config."""
result = subprocess.run(
[SCRIPT_PATH, "--validate", "--config", "/nonexistent/config.env"],
capture_output=True,
text=True,
timeout=10,
)
assert result.returncode != 0
class TestPatternMatching:
"""Test that the script correctly matches error patterns."""
def test_detects_fatal_pattern(self, temp_log_file, temp_config):
"""Test detection of FATAL pattern."""
with open(temp_config, "w") as f:
f.write(f'WEBHOOK_URL="http://localhost:9999/webhook"\n')
f.write('CLIENT_ID="test-client-uuid"\n')
f.write(f'LOG_SOURCES="{temp_log_file}"\n')
f.write("POLL_INTERVAL=1\n")
f.write("MAX_LINE_LENGTH=2000\n")
with open(temp_log_file, "a") as f:
f.write("Apr 2 10:30:00 server postgres[1234]: FATAL: too many connections\n")
f.flush()
result = subprocess.run(
[SCRIPT_PATH, "--dry-run", "--config", temp_config, "--test-line", "FATAL: too many connections"],
capture_output=True,
text=True,
timeout=10,
)
assert result.returncode == 0
assert "matched" in result.stdout.lower() or "FATAL" in result.stdout
def test_detects_oom_pattern(self):
"""Test detection of OOM pattern."""
result = subprocess.run(
[SCRIPT_PATH, "--dry-run", "--test-line", "kernel: Out of memory: Kill process 1234"],
capture_output=True,
text=True,
timeout=10,
)
assert result.returncode == 0
assert "OOM" in result.stdout or "matched" in result.stdout.lower()
def test_detects_error_pattern(self):
"""Test detection of ERROR pattern."""
result = subprocess.run(
[SCRIPT_PATH, "--dry-run", "--test-line", "nginx: ERROR: connection refused"],
capture_output=True,
text=True,
timeout=10,
)
assert result.returncode == 0
def test_ignores_normal_lines(self):
"""Test that normal log lines are not matched."""
result = subprocess.run(
[SCRIPT_PATH, "--dry-run", "--test-line", "Apr 2 10:00:00 server sshd[5678]: Accepted publickey"],
capture_output=True,
text=True,
timeout=10,
)
assert result.returncode == 0
assert "no match" in result.stdout.lower() or "skip" in result.stdout.lower()
class TestPayloadFormat:
"""Test that the script generates correct JSON payload."""
def test_json_payload_structure(self):
"""Test that dry-run outputs valid JSON with required fields."""
result = subprocess.run(
[
SCRIPT_PATH,
"--dry-run",
"--test-line",
"kernel: Out of memory: Kill process 1234",
"--test-source",
"/var/log/syslog",
],
capture_output=True,
text=True,
timeout=10,
)
assert result.returncode == 0
output = result.stdout.strip()
try:
payload = json.loads(output)
except json.JSONDecodeError:
pytest.fail(f"Output is not valid JSON: {output}")
required_fields = ["client_id", "hostname", "source", "severity", "timestamp", "raw_log", "matched_pattern"]
for field in required_fields:
assert field in payload, f"Missing required field: {field}"
def test_severity_mapping(self):
"""Test that severity is correctly mapped based on pattern."""
result = subprocess.run(
[SCRIPT_PATH, "--dry-run", "--test-line", "kernel: Out of memory: Kill process"],
capture_output=True,
text=True,
timeout=10,
)
payload = json.loads(result.stdout.strip())
assert payload["severity"] in ["low", "medium", "critical"]