#!/usr/bin/env python3 """ Test suite for secure_logwhisperer.sh RED Phase - Tests should FAIL until implementation is complete Test IDs from spec: - SEC-001 to SEC-010: Security tests - INT-001 to INT-004: Integration tests """ import pytest import subprocess import os import tempfile import json import time import hashlib import hmac from pathlib import Path from unittest.mock import patch, MagicMock # Path to the script under test SCRIPT_DIR = Path(__file__).parent.parent / "scripts" SCRIPT_PATH = SCRIPT_DIR / "secure_logwhisperer.sh" @pytest.fixture def temp_dir(): """Create a temporary directory for test files.""" with tempfile.TemporaryDirectory() as tmpdir: yield Path(tmpdir) @pytest.fixture def mock_config(temp_dir): """Create a mock config.env for testing.""" config_path = temp_dir / "config.env" config_content = """ WEBHOOK_URL="https://example.com/webhook" CLIENT_ID="test-client-123" CLIENT_SECRET="test-secret-key-32-chars-long" LOG_SOURCES="/var/log/syslog" POLL_INTERVAL=5 MAX_LINE_LENGTH=2000 OFFSET_DIR="/tmp/logwhisperer_test" """ config_path.write_text(config_content) return config_path class TestScriptExists: """Test that the script exists and is executable.""" def test_script_file_exists(self): """SEC-PRE: Script file must exist.""" assert SCRIPT_PATH.exists(), f"Script not found at {SCRIPT_PATH}" def test_script_is_executable(self): """SEC-PRE: Script must be executable.""" if SCRIPT_PATH.exists(): assert os.access(SCRIPT_PATH, os.X_OK), "Script is not executable" class TestPathValidation: """ Security tests for path validation (anti-path traversal). Test IDs: SEC-001, SEC-002, SEC-003 """ def test_reject_path_outside_var_log(self, temp_dir, mock_config): """SEC-001: Reject path /etc/passwd in LOG_SOURCES.""" result = subprocess.run( ["bash", str(SCRIPT_PATH), "--validate-source", "/etc/passwd"], capture_output=True, text=True, cwd=temp_dir ) assert result.returncode != 0, "Path outside /var/log should be rejected" assert "Invalid log source path" in result.stderr or "must be under /var/log" in result.stderr def test_reject_path_traversal_attempt(self, temp_dir, mock_config): """SEC-002: Reject path ../../../etc/shadow.""" result = subprocess.run( ["bash", str(SCRIPT_PATH), "--validate-source", "../../../etc/shadow"], capture_output=True, text=True, cwd=temp_dir ) assert result.returncode != 0, "Path traversal attempt should be rejected" assert "Invalid log source path" in result.stderr or "must be under /var/log" in result.stderr def test_reject_symlink_outside_var_log(self, temp_dir, mock_config): """SEC-003: Reject symlink to /etc/shadow from /var/log.""" # Create a symlink in temp_dir simulating /var/log var_log_dir = temp_dir / "var" / "log" var_log_dir.mkdir(parents=True) # Create symlink pointing outside /var/log symlink_path = var_log_dir / "malicious_link" target_path = temp_dir / "etc" / "shadow" target_path.parent.mkdir(parents=True) target_path.write_text("secret") symlink_path.symlink_to(target_path) result = subprocess.run( ["bash", str(SCRIPT_PATH), "--validate-source", str(symlink_path)], capture_output=True, text=True, cwd=temp_dir ) assert result.returncode != 0, "Symlink outside /var/log should be rejected" assert "Symlink target outside /var/log" in result.stderr def test_accept_valid_var_log_path(self, temp_dir, mock_config): """Accept valid path under /var/log.""" var_log_dir = temp_dir / "var" / "log" var_log_dir.mkdir(parents=True) log_file = var_log_dir / "syslog" log_file.write_text("test log") result = subprocess.run( ["bash", str(SCRIPT_PATH), "--validate-source", str(log_file)], capture_output=True, text=True, cwd=temp_dir ) assert result.returncode == 0, "Valid /var/log path should be accepted" class TestLogLineSanitization: """ Security tests for log line sanitization (DLP + anti-injection). Test IDs: SEC-004, SEC-005, SEC-006 """ def test_sanitize_command_injection(self, temp_dir): """SEC-004: Log line with '; rm -rf /;' must be sanitized.""" malicious_line = 'user action"; rm -rf /; "done' result = subprocess.run( ["bash", str(SCRIPT_PATH), "--sanitize-line", malicious_line], capture_output=True, text=True, cwd=temp_dir ) output = result.stdout.strip() # Control characters should be removed assert ";" not in output or result.returncode == 0, "Command injection attempt should be sanitized" def test_mask_password_in_log(self, temp_dir): """SEC-005: Mask password=secret123 as password=***.""" log_line = "User login password=secret123 and username=john" result = subprocess.run( ["bash", str(SCRIPT_PATH), "--sanitize-line", log_line], capture_output=True, text=True, cwd=temp_dir ) output = result.stdout.strip() assert "secret123" not in output, "Password should be masked" assert "password=***" in output, "Password should be replaced with ***" def test_mask_email_in_log(self, temp_dir): """SEC-006: Mask user@example.com as [EMAIL].""" log_line = "Contact user@example.com for support" result = subprocess.run( ["bash", str(SCRIPT_PATH), "--sanitize-line", log_line], capture_output=True, text=True, cwd=temp_dir ) output = result.stdout.strip() assert "user@example.com" not in output, "Email should be masked" assert "[EMAIL]" in output, "Email should be replaced with [EMAIL]" def test_mask_api_key_in_log(self, temp_dir): """Mask api_key=1234567890123456 as api_key=***.""" log_line = "api_key=abcd1234efgh5678ijkl9012mnop" result = subprocess.run( ["bash", str(SCRIPT_PATH), "--sanitize-line", log_line], capture_output=True, text=True, cwd=temp_dir ) output = result.stdout.strip() assert "abcd1234efgh5678ijkl9012mnop" not in output, "API key should be masked" assert "api_key=***" in output, "API key should be replaced with ***" def test_mask_ip_address_in_log(self, temp_dir): """Mask IP addresses as [IP].""" log_line = "Connection from 192.168.1.100 accepted" result = subprocess.run( ["bash", str(SCRIPT_PATH), "--sanitize-line", log_line], capture_output=True, text=True, cwd=temp_dir ) output = result.stdout.strip() assert "192.168.1.100" not in output, "IP should be masked" assert "[IP]" in output, "IP should be replaced with [IP]" def test_truncate_long_lines(self, temp_dir): """Lines longer than MAX_LINE_LENGTH should be truncated.""" long_line = "A" * 3000 result = subprocess.run( ["bash", str(SCRIPT_PATH), "--sanitize-line", long_line], capture_output=True, text=True, cwd=temp_dir ) output = result.stdout.strip() assert len(output) <= 2100, "Line should be truncated to MAX_LINE_LENGTH" assert "...[truncated]" in output, "Truncated line should have indicator" class TestDependencies: """ Security tests for required dependencies. Test ID: SEC-007 """ def test_jq_is_required(self, temp_dir, mock_config): """SEC-007: Missing jq binary should cause exit with error.""" # Temporarily modify PATH to exclude jq env = os.environ.copy() env["PATH"] = "/usr/local/bin:/usr/bin:/bin" # Minimal path without jq result = subprocess.run( ["bash", str(SCRIPT_PATH), "--check-deps"], capture_output=True, text=True, cwd=temp_dir, env=env ) if result.returncode == 0: # If jq is available, test that --check-deps passes assert "jq" in result.stdout or result.returncode == 0 else: # If jq is not available, should fail with clear message assert "jq" in result.stderr.lower() or "required" in result.stderr.lower() def test_curl_is_required(self, temp_dir, mock_config): """curl binary should be checked.""" result = subprocess.run( ["bash", str(SCRIPT_PATH), "--check-deps"], capture_output=True, text=True, cwd=temp_dir ) # Should either pass (curl available) or fail with curl message assert result.returncode == 0 or "curl" in result.stderr.lower() def test_openssl_is_required(self, temp_dir, mock_config): """openssl binary should be checked.""" result = subprocess.run( ["bash", str(SCRIPT_PATH), "--check-deps"], capture_output=True, text=True, cwd=temp_dir ) # Should either pass (openssl available) or fail with openssl message assert result.returncode == 0 or "openssl" in result.stderr.lower() class TestHTTPSValidation: """ Security tests for HTTPS enforcement. Test ID: SEC-008 """ def test_reject_http_webhook_url(self, temp_dir): """SEC-008: HTTP webhook URL should be rejected.""" config_path = temp_dir / "config.env" config_content = """ WEBHOOK_URL="http://example.com/webhook" CLIENT_ID="test-client" CLIENT_SECRET="test-secret-key-32-chars-long" """ config_path.write_text(config_content) result = subprocess.run( ["bash", str(SCRIPT_PATH), "--validate-config"], capture_output=True, text=True, cwd=temp_dir ) assert result.returncode != 0, "HTTP webhook URL should be rejected" assert "HTTPS" in result.stderr or "https" in result.stderr def test_accept_https_webhook_url(self, temp_dir): """HTTPS webhook URL should be accepted.""" config_path = temp_dir / "config.env" config_content = """ WEBHOOK_URL="https://example.com/webhook" CLIENT_ID="test-client" CLIENT_SECRET="test-secret-key-32-chars-long" """ config_path.write_text(config_content) result = subprocess.run( ["bash", str(SCRIPT_PATH), "--validate-config"], capture_output=True, text=True, cwd=temp_dir ) # Should pass validation assert "HTTPS" not in result.stderr or result.returncode == 0 class TestHMACSignature: """ Security tests for HMAC-SHA256 signature generation. Test ID: SEC-009 """ def test_hmac_signature_generation(self, temp_dir): """SEC-009: Generate valid HMAC-SHA256 signature.""" payload = '{"test": "data"}' secret = "test-secret-key" result = subprocess.run( ["bash", str(SCRIPT_PATH), "--generate-hmac", payload, secret], capture_output=True, text=True, cwd=temp_dir ) assert result.returncode == 0, "HMAC generation should succeed" output = result.stdout.strip() # Output should contain timestamp and signature separated by colon assert ":" in output, "HMAC output should be timestamp:signature format" parts = output.split(":") assert len(parts) == 2, "HMAC output should have exactly two parts" timestamp, signature = parts assert timestamp.isdigit(), "Timestamp should be numeric" assert len(signature) == 64, "SHA256 signature should be 64 hex chars" def test_hmac_signature_is_deterministic(self, temp_dir): """Same payload and secret should produce verifiable signature.""" payload = '{"test": "data"}' secret = "test-secret-key" timestamp = str(int(time.time())) result = subprocess.run( ["bash", str(SCRIPT_PATH), "--generate-hmac", payload, secret, timestamp], capture_output=True, text=True, cwd=temp_dir ) if result.returncode == 0: output = result.stdout.strip() _, signature = output.split(":") # Verify with Python hmac expected = hmac.new( secret.encode(), f"{timestamp}:{payload}".encode(), hashlib.sha256 ).hexdigest() assert signature == expected, "Generated signature should match expected" class TestAtomicOffsetWrite: """ Security tests for atomic file operations. Test ID: SEC-010 """ def test_atomic_write_creates_file(self, temp_dir): """SEC-010: Atomic write should create offset file.""" offset_file = temp_dir / "offset.txt" offset_value = "12345" result = subprocess.run( ["bash", str(SCRIPT_PATH), "--atomic-write", str(offset_file), offset_value], capture_output=True, text=True, cwd=temp_dir ) assert result.returncode == 0, "Atomic write should succeed" assert offset_file.exists(), "Offset file should be created" assert offset_file.read_text() == offset_value, "Offset value should be written" def test_atomic_write_no_partial_files(self, temp_dir): """Atomic write should not leave temporary files.""" offset_file = temp_dir / "offset.txt" offset_value = "12345" subprocess.run( ["bash", str(SCRIPT_PATH), "--atomic-write", str(offset_file), offset_value], capture_output=True, text=True, cwd=temp_dir ) # Check no .tmp files left behind tmp_files = list(temp_dir.glob("*.tmp*")) assert len(tmp_files) == 0, "No temporary files should remain" def test_atomic_write_handles_corruption(self, temp_dir): """Offset file corruption should be detected and reset.""" offset_file = temp_dir / "offset.txt" offset_file.write_text("corrupted data not a number") result = subprocess.run( ["bash", str(SCRIPT_PATH), "--read-offset", str(offset_file)], capture_output=True, text=True, cwd=temp_dir ) # Should handle corruption gracefully assert result.returncode == 0 or "reset" in result.stderr.lower() class TestConfigurationValidation: """Tests for configuration parameter validation.""" def test_client_id_must_be_uuid(self, temp_dir): """CLIENT_ID should be valid UUID format.""" config_path = temp_dir / "config.env" config_content = """ WEBHOOK_URL="https://example.com/webhook" CLIENT_ID="not-a-uuid" CLIENT_SECRET="test-secret-key-32-chars-long" """ config_path.write_text(config_content) result = subprocess.run( ["bash", str(SCRIPT_PATH), "--validate-config"], capture_output=True, text=True, cwd=temp_dir ) assert result.returncode != 0 or "CLIENT_ID" in result.stderr def test_client_secret_min_length(self, temp_dir): """CLIENT_SECRET should be at least 32 characters.""" config_path = temp_dir / "config.env" config_content = """ WEBHOOK_URL="https://example.com/webhook" CLIENT_ID="550e8400-e29b-41d4-a716-446655440000" CLIENT_SECRET="short" """ config_path.write_text(config_content) result = subprocess.run( ["bash", str(SCRIPT_PATH), "--validate-config"], capture_output=True, text=True, cwd=temp_dir ) assert result.returncode != 0, "Short CLIENT_SECRET should be rejected" def test_max_line_length_range(self, temp_dir): """MAX_LINE_LENGTH should be between 500-10000.""" config_path = temp_dir / "config.env" config_content = """ WEBHOOK_URL="https://example.com/webhook" CLIENT_ID="550e8400-e29b-41d4-a716-446655440000" CLIENT_SECRET="test-secret-key-32-chars-long" MAX_LINE_LENGTH=100 """ config_path.write_text(config_content) result = subprocess.run( ["bash", str(SCRIPT_PATH), "--validate-config"], capture_output=True, text=True, cwd=temp_dir ) # Should either reject or use default assert result.returncode != 0 or "2000" in result.stdout class TestIntegration: """ Integration tests for complete workflow. Test IDs: INT-001, INT-002, INT-003, INT-004 """ def test_end_to_end_payload_delivery(self, temp_dir): """INT-001: End-to-end with valid log delivers payload with HMAC.""" # This test requires the full script implementation # For RED phase, we just verify the structure pytest.skip("Integration test - requires full implementation") def test_network_timeout_retry(self, temp_dir): """INT-002: Network timeout should retry 3 times.""" pytest.skip("Integration test - requires full implementation") def test_webhook_4xx_error_handling(self, temp_dir): """INT-003: Webhook 4xx should stop retry and log error.""" pytest.skip("Integration test - requires full implementation") def test_multiple_concurrent_sources(self, temp_dir): """INT-004: Multiple log sources should be monitored correctly.""" pytest.skip("Integration test - requires full implementation") class TestNoEval: """Security tests to ensure no eval is used.""" def test_no_eval_in_script(self): """Script should not contain 'eval' command.""" if not SCRIPT_PATH.exists(): pytest.skip("Script not yet implemented") script_content = SCRIPT_PATH.read_text() # Check for eval command (not just the word in comments) lines = script_content.split('\n') for line in lines: # Skip comments if line.strip().startswith('#'): continue # Check for eval usage assert 'eval ' not in line, f"Line contains eval: {line}" class TestJSONEncoding: """Tests for JSON encoding security.""" def test_json_encoding_uses_jq(self, temp_dir): """JSON encoding should use jq, not manual escaping.""" test_data = { "client_id": "test", "raw_log": "Special chars: \"quoted\" and \n newline and \\ backslash" } result = subprocess.run( ["bash", str(SCRIPT_PATH), "--encode-json", json.dumps(test_data)], capture_output=True, text=True, cwd=temp_dir ) if result.returncode == 0: output = result.stdout.strip() # Should be valid JSON parsed = json.loads(output) assert "raw_log" in parsed if __name__ == "__main__": pytest.main([__file__, "-v"])