diff --git a/src/log_analyzer_cli/analyzer.py b/src/log_analyzer_cli/analyzer.py index cac046a..c4a4e4a 100644 --- a/src/log_analyzer_cli/analyzer.py +++ b/src/log_analyzer_cli/analyzer.py @@ -74,6 +74,13 @@ def analyze( Returns: Analysis result. """ + # Clear any patterns accumulated by a previous call so re-using the + # same LogAnalyzer across multiple batches does not leak earlier + # groups into the new result. Callers that want to keep history + # across calls should manage their own aggregation instead of + # relying on internal instance state. + self._error_patterns = {} + result = AnalysisResult() result.total_lines = len(entries) result.parsed_entries = len(entries) diff --git a/src/log_analyzer_cli/parsers/syslog.py b/src/log_analyzer_cli/parsers/syslog.py index 99891f3..50f2243 100644 --- a/src/log_analyzer_cli/parsers/syslog.py +++ b/src/log_analyzer_cli/parsers/syslog.py @@ -85,7 +85,6 @@ def parse(self, line: str) -> Optional[ParsedEntry]: groups = match.groupdict() timestamp = self._parse_timestamp(groups.get("timestamp", "")) - level = detect_log_level(line) # Check full line for level metadata = {} if groups.get("host"): @@ -94,11 +93,12 @@ def parse(self, line: str) -> Optional[ParsedEntry]: metadata["process"] = groups["process"] source = groups["process"] else: - # Use host as source when no process name source = groups.get("host") if groups.get("pid"): metadata["pid"] = groups["pid"] + level = detect_log_level(groups.get("message", "")) + return ParsedEntry( raw=line, timestamp=timestamp, diff --git a/src/log_analyzer_cli/utils.py b/src/log_analyzer_cli/utils.py index 4d518a3..4a0865a 100644 --- a/src/log_analyzer_cli/utils.py +++ b/src/log_analyzer_cli/utils.py @@ -41,6 +41,8 @@ def _try_parse_datetime(ts_str: str) -> Optional[datetime]: "%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S", + "%Y-%m-%dT%H:%M:%S.%f%z", + "%Y-%m-%d %H:%M:%S.%f%z", "%Y-%m-%dT%H:%M:%S%z", "%d/%b/%Y:%H:%M:%S", "%b %d %H:%M:%S", diff --git a/tests/test_analyzer.py b/tests/test_analyzer.py index cd4ccf1..950134e 100644 --- a/tests/test_analyzer.py +++ b/tests/test_analyzer.py @@ -149,6 +149,31 @@ def test_reset(self): analyzer.reset() assert len(analyzer._error_patterns) == 0 + def test_repeated_analyze_does_not_leak_error_groups(self): + analyzer = LogAnalyzer() + first_batch = [ + ParsedEntry( + raw="Error: timeout after 30s", + level="ERROR", + message="timeout after 30s", + ), + ] + second_batch = [ + ParsedEntry( + raw="Error: db connection lost", + level="ERROR", + message="db connection lost", + ), + ] + + first_result = analyzer.analyze(first_batch) + second_result = analyzer.analyze(second_batch) + + assert len(first_result.error_groups) == 1 + assert len(second_result.error_groups) == 1 + assert second_result.error_groups[0].pattern != first_result.error_groups[0].pattern + assert second_result.error_groups[0].count == 1 + class TestAnalyzeLogEntries: """Tests for the analyze_log_entries function.""" diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..d261221 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,93 @@ +"""Tests for log-analyzer-cli utility functions.""" + +from __future__ import annotations + +from datetime import datetime, timedelta, timezone + +from log_analyzer_cli.utils import ( + _try_parse_datetime, + detect_log_level, + parse_timestamp, +) + + +class TestTryParseDatetime: + """Tests for _try_parse_datetime.""" + + def test_iso_with_microseconds_and_offset_colon(self): + # Python's datetime.isoformat() emits exactly this form + assert _try_parse_datetime("2026-06-21T19:25:00.123456+02:00") == datetime( + 2026, 6, 21, 19, 25, 0, 123456, + tzinfo=timezone(timedelta(hours=2)), + ) + + def test_iso_with_microseconds_and_offset_no_colon(self): + # ISO 8601 also allows ±HHMM form + assert _try_parse_datetime("2026-06-21T19:25:00.123456+0200") == datetime( + 2026, 6, 21, 19, 25, 0, 123456, + tzinfo=timezone(timedelta(hours=2)), + ) + + def test_space_separator_with_microseconds_and_offset(self): + # The space-separator variant also gets a combined format + assert _try_parse_datetime("2026-06-21 19:25:00.123456+02:00") == datetime( + 2026, 6, 21, 19, 25, 0, 123456, + tzinfo=timezone(timedelta(hours=2)), + ) + + def test_iso_microseconds_with_z_suffix(self): + # Existing Z-substitution should still work for fractional seconds + assert _try_parse_datetime("2026-06-21T19:25:00.123Z") == datetime( + 2026, 6, 21, 19, 25, 0, 123000, + tzinfo=timezone.utc, + ) + + def test_iso_microseconds_no_timezone(self): + # The pre-existing microsecond form must keep working + assert _try_parse_datetime("2026-06-21T19:25:00.123456") == datetime( + 2026, 6, 21, 19, 25, 0, 123456, + ) + + def test_iso_no_microseconds_with_offset(self): + # The pre-existing offset form must keep working + assert _try_parse_datetime("2026-06-21T19:25:00+02:00") == datetime( + 2026, 6, 21, 19, 25, 0, + tzinfo=timezone(timedelta(hours=2)), + ) + + def test_iso_no_microseconds_no_timezone(self): + # The pre-existing plain form must keep working + assert _try_parse_datetime("2026-06-21T19:25:00") == datetime( + 2026, 6, 21, 19, 25, 0, + ) + + +class TestParseTimestampIntegration: + """End-to-end checks through parse_timestamp.""" + + def test_combined_form_extracts_full_datetime(self): + # parse_timestamp's first regex captures the whole ISO timestamp; + # the inner _try_parse_datetime must now handle microseconds+offset. + assert parse_timestamp( + "2026-06-21T19:25:00.123456+02:00 ERROR something failed" + ) == datetime(2026, 6, 21, 19, 25, 0, 123456, + tzinfo=timezone(timedelta(hours=2))) + + def test_z_fractional_extracts_full_datetime(self): + assert parse_timestamp( + "2026-06-21T19:25:00.123Z ERROR something failed" + ) == datetime(2026, 6, 21, 19, 25, 0, 123000, tzinfo=timezone.utc) + + def test_plain_form_still_parses(self): + # regression guard + assert parse_timestamp("2026-06-21 19:25:00 INFO ok") == datetime( + 2026, 6, 21, 19, 25, 0, + ) + + +class TestDetectLogLevel: + def test_returns_uppercase_level(self): + assert detect_log_level("2025-01-01 error: bad") == "ERROR" + + def test_returns_unknown_for_plain_text(self): + assert detect_log_level("just a normal line") == "UNKNOWN"