import pytest import subprocess import os import json import tempfile import time import signal import threading SCRIPT_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "scripts", "logwhisperer.sh") @pytest.fixture def temp_log_file(): """Create a temporary log file for testing.""" with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as f: f.write("Apr 2 10:00:00 server kernel: Normal log line\n") f.flush() yield f.name os.unlink(f.name) @pytest.fixture def temp_config(): """Create a temporary config file for testing.""" with tempfile.NamedTemporaryFile(mode="w", suffix=".env", delete=False) as f: f.write('WEBHOOK_URL="http://localhost:9999/webhook"\n') f.write('CLIENT_ID="test-client-uuid"\n') f.write('LOG_SOURCES="/tmp/test.log"\n') f.write("POLL_INTERVAL=1\n") f.write("MAX_LINE_LENGTH=2000\n") f.flush() yield f.name os.unlink(f.name) class TestScriptExistence: """Test that the script exists and is executable.""" def test_script_exists(self): assert os.path.exists(SCRIPT_PATH), f"Script not found at {SCRIPT_PATH}" def test_script_is_executable(self): assert os.access(SCRIPT_PATH, os.X_OK), f"Script is not executable: {SCRIPT_PATH}" def test_script_has_shebang(self): with open(SCRIPT_PATH, "r") as f: first_line = f.readline().strip() assert first_line.startswith("#!/bin/bash"), f"Missing bash shebang, got: {first_line}" class TestScriptValidation: """Test script validation and help output.""" def test_script_help_flag(self): result = subprocess.run( [SCRIPT_PATH, "--help"], capture_output=True, text=True, timeout=10, ) assert result.returncode == 0 assert "usage" in result.stdout.lower() or "help" in result.stdout.lower() def test_script_validate_config(self): """Test --validate flag with a good config.""" with tempfile.NamedTemporaryFile(mode="w", suffix=".env", delete=False) as f: f.write('WEBHOOK_URL="http://localhost:9999/webhook"\n') f.write('CLIENT_ID="test-uuid"\n') f.write('LOG_SOURCES="/var/log/syslog"\n') f.write("POLL_INTERVAL=5\n") config_path = f.name try: result = subprocess.run( [SCRIPT_PATH, "--validate", "--config", config_path], capture_output=True, text=True, timeout=10, ) assert result.returncode == 0 finally: os.unlink(config_path) def test_script_validate_missing_config(self): """Test --validate flag with missing config.""" result = subprocess.run( [SCRIPT_PATH, "--validate", "--config", "/nonexistent/config.env"], capture_output=True, text=True, timeout=10, ) assert result.returncode != 0 class TestPatternMatching: """Test that the script correctly matches error patterns.""" def test_detects_fatal_pattern(self, temp_log_file, temp_config): """Test detection of FATAL pattern.""" with open(temp_config, "w") as f: f.write(f'WEBHOOK_URL="http://localhost:9999/webhook"\n') f.write('CLIENT_ID="test-client-uuid"\n') f.write(f'LOG_SOURCES="{temp_log_file}"\n') f.write("POLL_INTERVAL=1\n") f.write("MAX_LINE_LENGTH=2000\n") with open(temp_log_file, "a") as f: f.write("Apr 2 10:30:00 server postgres[1234]: FATAL: too many connections\n") f.flush() result = subprocess.run( [SCRIPT_PATH, "--dry-run", "--config", temp_config, "--test-line", "FATAL: too many connections"], capture_output=True, text=True, timeout=10, ) assert result.returncode == 0 assert "matched" in result.stdout.lower() or "FATAL" in result.stdout def test_detects_oom_pattern(self): """Test detection of OOM pattern.""" result = subprocess.run( [SCRIPT_PATH, "--dry-run", "--test-line", "kernel: Out of memory: Kill process 1234"], capture_output=True, text=True, timeout=10, ) assert result.returncode == 0 assert "OOM" in result.stdout or "matched" in result.stdout.lower() def test_detects_error_pattern(self): """Test detection of ERROR pattern.""" result = subprocess.run( [SCRIPT_PATH, "--dry-run", "--test-line", "nginx: ERROR: connection refused"], capture_output=True, text=True, timeout=10, ) assert result.returncode == 0 def test_ignores_normal_lines(self): """Test that normal log lines are not matched.""" result = subprocess.run( [SCRIPT_PATH, "--dry-run", "--test-line", "Apr 2 10:00:00 server sshd[5678]: Accepted publickey"], capture_output=True, text=True, timeout=10, ) assert result.returncode == 0 assert "no match" in result.stdout.lower() or "skip" in result.stdout.lower() class TestPayloadFormat: """Test that the script generates correct JSON payload.""" def test_json_payload_structure(self): """Test that dry-run outputs valid JSON with required fields.""" result = subprocess.run( [ SCRIPT_PATH, "--dry-run", "--test-line", "kernel: Out of memory: Kill process 1234", "--test-source", "/var/log/syslog", ], capture_output=True, text=True, timeout=10, ) assert result.returncode == 0 output = result.stdout.strip() try: payload = json.loads(output) except json.JSONDecodeError: pytest.fail(f"Output is not valid JSON: {output}") required_fields = ["client_id", "hostname", "source", "severity", "timestamp", "raw_log", "matched_pattern"] for field in required_fields: assert field in payload, f"Missing required field: {field}" def test_severity_mapping(self): """Test that severity is correctly mapped based on pattern.""" result = subprocess.run( [SCRIPT_PATH, "--dry-run", "--test-line", "kernel: Out of memory: Kill process"], capture_output=True, text=True, timeout=10, ) payload = json.loads(result.stdout.strip()) assert payload["severity"] in ["low", "medium", "critical"]