- Add logwhisperer.sh script for tailing and monitoring system logs - Implement pattern matching for critical errors (FATAL, ERROR, OOM, segfault) - Add JSON payload generation with severity levels - Implement rate limiting and offset tracking per log source - Add install.sh with interactive configuration and systemd support - Create comprehensive test suite with pytest - Add technical specification documentation - Update CHANGELOG.md following Common Changelog standard All 12 tests passing. Follows Metodo Sacchi (Safety first, little often, double check).
195 lines
6.6 KiB
Python
195 lines
6.6 KiB
Python
import pytest
|
|
import subprocess
|
|
import os
|
|
import json
|
|
import tempfile
|
|
import time
|
|
import signal
|
|
import threading
|
|
|
|
|
|
SCRIPT_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "scripts", "logwhisperer.sh")
|
|
|
|
|
|
@pytest.fixture
|
|
def temp_log_file():
|
|
"""Create a temporary log file for testing."""
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as f:
|
|
f.write("Apr 2 10:00:00 server kernel: Normal log line\n")
|
|
f.flush()
|
|
yield f.name
|
|
os.unlink(f.name)
|
|
|
|
|
|
@pytest.fixture
|
|
def temp_config():
|
|
"""Create a temporary config file for testing."""
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".env", delete=False) as f:
|
|
f.write('WEBHOOK_URL="http://localhost:9999/webhook"\n')
|
|
f.write('CLIENT_ID="test-client-uuid"\n')
|
|
f.write('LOG_SOURCES="/tmp/test.log"\n')
|
|
f.write("POLL_INTERVAL=1\n")
|
|
f.write("MAX_LINE_LENGTH=2000\n")
|
|
f.flush()
|
|
yield f.name
|
|
os.unlink(f.name)
|
|
|
|
|
|
class TestScriptExistence:
|
|
"""Test that the script exists and is executable."""
|
|
|
|
def test_script_exists(self):
|
|
assert os.path.exists(SCRIPT_PATH), f"Script not found at {SCRIPT_PATH}"
|
|
|
|
def test_script_is_executable(self):
|
|
assert os.access(SCRIPT_PATH, os.X_OK), f"Script is not executable: {SCRIPT_PATH}"
|
|
|
|
def test_script_has_shebang(self):
|
|
with open(SCRIPT_PATH, "r") as f:
|
|
first_line = f.readline().strip()
|
|
assert first_line.startswith("#!/bin/bash"), f"Missing bash shebang, got: {first_line}"
|
|
|
|
|
|
class TestScriptValidation:
|
|
"""Test script validation and help output."""
|
|
|
|
def test_script_help_flag(self):
|
|
result = subprocess.run(
|
|
[SCRIPT_PATH, "--help"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
)
|
|
assert result.returncode == 0
|
|
assert "usage" in result.stdout.lower() or "help" in result.stdout.lower()
|
|
|
|
def test_script_validate_config(self):
|
|
"""Test --validate flag with a good config."""
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".env", delete=False) as f:
|
|
f.write('WEBHOOK_URL="http://localhost:9999/webhook"\n')
|
|
f.write('CLIENT_ID="test-uuid"\n')
|
|
f.write('LOG_SOURCES="/var/log/syslog"\n')
|
|
f.write("POLL_INTERVAL=5\n")
|
|
config_path = f.name
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[SCRIPT_PATH, "--validate", "--config", config_path],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
)
|
|
assert result.returncode == 0
|
|
finally:
|
|
os.unlink(config_path)
|
|
|
|
def test_script_validate_missing_config(self):
|
|
"""Test --validate flag with missing config."""
|
|
result = subprocess.run(
|
|
[SCRIPT_PATH, "--validate", "--config", "/nonexistent/config.env"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
)
|
|
assert result.returncode != 0
|
|
|
|
|
|
class TestPatternMatching:
|
|
"""Test that the script correctly matches error patterns."""
|
|
|
|
def test_detects_fatal_pattern(self, temp_log_file, temp_config):
|
|
"""Test detection of FATAL pattern."""
|
|
with open(temp_config, "w") as f:
|
|
f.write(f'WEBHOOK_URL="http://localhost:9999/webhook"\n')
|
|
f.write('CLIENT_ID="test-client-uuid"\n')
|
|
f.write(f'LOG_SOURCES="{temp_log_file}"\n')
|
|
f.write("POLL_INTERVAL=1\n")
|
|
f.write("MAX_LINE_LENGTH=2000\n")
|
|
|
|
with open(temp_log_file, "a") as f:
|
|
f.write("Apr 2 10:30:00 server postgres[1234]: FATAL: too many connections\n")
|
|
f.flush()
|
|
|
|
result = subprocess.run(
|
|
[SCRIPT_PATH, "--dry-run", "--config", temp_config, "--test-line", "FATAL: too many connections"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
)
|
|
assert result.returncode == 0
|
|
assert "matched" in result.stdout.lower() or "FATAL" in result.stdout
|
|
|
|
def test_detects_oom_pattern(self):
|
|
"""Test detection of OOM pattern."""
|
|
result = subprocess.run(
|
|
[SCRIPT_PATH, "--dry-run", "--test-line", "kernel: Out of memory: Kill process 1234"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
)
|
|
assert result.returncode == 0
|
|
assert "OOM" in result.stdout or "matched" in result.stdout.lower()
|
|
|
|
def test_detects_error_pattern(self):
|
|
"""Test detection of ERROR pattern."""
|
|
result = subprocess.run(
|
|
[SCRIPT_PATH, "--dry-run", "--test-line", "nginx: ERROR: connection refused"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
)
|
|
assert result.returncode == 0
|
|
|
|
def test_ignores_normal_lines(self):
|
|
"""Test that normal log lines are not matched."""
|
|
result = subprocess.run(
|
|
[SCRIPT_PATH, "--dry-run", "--test-line", "Apr 2 10:00:00 server sshd[5678]: Accepted publickey"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
)
|
|
assert result.returncode == 0
|
|
assert "no match" in result.stdout.lower() or "skip" in result.stdout.lower()
|
|
|
|
|
|
class TestPayloadFormat:
|
|
"""Test that the script generates correct JSON payload."""
|
|
|
|
def test_json_payload_structure(self):
|
|
"""Test that dry-run outputs valid JSON with required fields."""
|
|
result = subprocess.run(
|
|
[
|
|
SCRIPT_PATH,
|
|
"--dry-run",
|
|
"--test-line",
|
|
"kernel: Out of memory: Kill process 1234",
|
|
"--test-source",
|
|
"/var/log/syslog",
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
)
|
|
assert result.returncode == 0
|
|
|
|
output = result.stdout.strip()
|
|
try:
|
|
payload = json.loads(output)
|
|
except json.JSONDecodeError:
|
|
pytest.fail(f"Output is not valid JSON: {output}")
|
|
|
|
required_fields = ["client_id", "hostname", "source", "severity", "timestamp", "raw_log", "matched_pattern"]
|
|
for field in required_fields:
|
|
assert field in payload, f"Missing required field: {field}"
|
|
|
|
def test_severity_mapping(self):
|
|
"""Test that severity is correctly mapped based on pattern."""
|
|
result = subprocess.run(
|
|
[SCRIPT_PATH, "--dry-run", "--test-line", "kernel: Out of memory: Kill process"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
)
|
|
payload = json.loads(result.stdout.strip())
|
|
assert payload["severity"] in ["low", "medium", "critical"]
|