Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 24 additions & 15 deletions src/log_analyzer_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,36 +193,45 @@ def _parse_file(
):
"""Parse log file with optional filtering."""
entries = []

from log_analyzer_cli.parsers import ParsedEntry
from log_analyzer_cli.utils import detect_log_level, parse_timestamp
from log_analyzer_cli.utils import parse_timestamp
import re

compiled_pattern = re.compile(search_pattern) if search_pattern else None

for line in read_log_file(file_path):
line = line.rstrip("\n\r")
if not line:
continue

if include_levels:
level = detect_log_level(line)
if level not in include_levels:
continue


if compiled_pattern and not compiled_pattern.search(line):
continue

timestamp = parse_timestamp(line)
if start_time and timestamp and timestamp < start_time:
continue
if end_time and timestamp and timestamp > end_time:
continue
Comment on lines 211 to 215

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Use parsed.timestamp for time-window filtering to avoid parser/filter drift.

At Line 211-Line 215, timestamp gating uses parse_timestamp(line), which can disagree with parser-specific timestamp extraction (e.g., generic parser supports additional formats). That lets entries bypass --start-time/--end-time even though they parse successfully. Filter on parsed.timestamp after parser.parse(line) so filtering and reporting use the same parsed contract.

Proposed fix
-    from log_analyzer_cli.utils import parse_timestamp
+    from log_analyzer_cli.utils import parse_timestamp
     import re

     compiled_pattern = re.compile(search_pattern) if search_pattern else None

     for line in read_log_file(file_path):
@@
-        timestamp = parse_timestamp(line)
-        if start_time and timestamp and timestamp < start_time:
-            continue
-        if end_time and timestamp and timestamp > end_time:
-            continue
-
         parsed = parser.parse(line)
         if not parsed:
             continue
+
+        timestamp = parsed.timestamp
+        if start_time and timestamp and timestamp < start_time:
+            continue
+        if end_time and timestamp and timestamp > end_time:
+            continue

Also applies to: 217-231

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/log_analyzer_cli/cli.py` around lines 211 - 215, The timestamp filtering
at lines 211-215 uses parse_timestamp(line) directly for the start_time and
end_time checks, which can disagree with the parser-specific timestamp
extraction from parser.parse(line), allowing entries with supported timestamp
formats to bypass the time-window filters. After calling parser.parse(line) to
get the parsed object, use parsed.timestamp instead of parse_timestamp(line) for
the timestamp comparisons against start_time and end_time throughout the
filtering block (lines 211-215 and also lines 217-231). This ensures both the
filtering and subsequent reporting use the same timestamp extraction logic from
the parser.


parsed = parser.parse(line)
if parsed:
entries.append(parsed)

if not parsed:
continue

# Filter on the parser's level rather than re-scanning the raw
# line: the parser already strips host/process names (the syslog
# parser detects level from the message portion only), so a
# WARNING line whose message happens to contain the word "error"
# is reported as WARNING consistently in both the filter check
# and the output. The pre-parse filter form scanned the full
# line and could let "WARNING: error in user input" slip past
# --levels=ERROR, only for the output to then label it WARNING
# — the filter and the report disagreed about the same line.
if include_levels and parsed.level not in include_levels:
continue

entries.append(parsed)

return entries


Expand Down
10 changes: 7 additions & 3 deletions src/log_analyzer_cli/parsers/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ def parse(self, line: str) -> Optional[ParsedEntry]:
return None

timestamp = self._parse_timestamp(match.group("timestamp"))
level = detect_log_level(line)

# Detect the level from the message portion only; scanning the
# full line picks up words like "error" that appear inside the
# timestamp, hostname, or process name and misclassifies an
# otherwise clean WARNING line as ERROR. The extracted message
# starts right after the timestamp and is what the user
# actually thinks of as the log entry's content.
message_start = match.end()
message = line[message_start:].strip()

Expand All @@ -51,7 +55,7 @@ def parse(self, line: str) -> Optional[ParsedEntry]:
return ParsedEntry(
raw=line,
timestamp=timestamp,
level=level,
level=detect_log_level(message),
message=message,
metadata={"format": "generic"},
)
Expand Down
22 changes: 21 additions & 1 deletion tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,27 @@ def test_analyze_no_group(self, runner, json_file):
def test_analyze_level_filter(self, runner, json_file):
result = runner.invoke(main, ["analyze", str(json_file), "-l", "ERROR,WARNING"])
assert result.exit_code == 0


def test_analyze_level_filter_uses_parsed_level_not_raw_line(
self, runner, tmp_path
):
# JSON log line where the message body mentions "error" as part of a
# description but the structured level field is WARNING. The
# pre-
log = tmp_path / "host-error-actual-warn.log"
log.write_text(
'{"level": "WARNING", "message": "error in the system"}\n'
)
result = runner.invoke(main, ["analyze", str(log), "-l", "WARNING", "-v"])
assert result.exit_code == 0
assert "Total Lines:" in result.output
assert "WARNING" in result.output
# "ERROR" appears as a section header ("TOP ERROR GROUPS") so
# check the level-distribution line specifically.
import re
level_rows = re.findall(r"\bERROR\b\s+:\s+\d+", result.output)
assert not level_rows, f"unexpected ERROR level row: {level_rows}"

def test_analyze_pattern_filter(self, runner, json_file):
result = runner.invoke(main, ["analyze", str(json_file), "-p", "database"])
assert result.exit_code == 0
Expand Down
Loading