Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 34 additions & 50 deletions dataclaw/parsers/claude.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from ..export_tasks import ExportSessionTask
from ..secrets import should_skip_large_binary_string
from .common import (
anonymize_value,
collect_project_sessions,
count_existing_paths_and_sizes,
iter_jsonl,
Expand Down Expand Up @@ -151,7 +150,7 @@ def parse_export_session_task(
return None


def build_tool_result_map(entries: Iterable[dict[str, Any]], anonymizer: Anonymizer) -> dict[str, dict]:
def build_tool_result_map(entries: Iterable[dict[str, Any]]) -> dict[str, dict]:
"""Pre-pass: build a map of tool_use_id -> {output, status} from tool_result blocks."""
result: dict[str, dict] = {}
for entry in entries:
Expand All @@ -166,7 +165,7 @@ def build_tool_result_map(entries: Iterable[dict[str, Any]], anonymizer: Anonymi
tid = block.get("tool_use_id")
if not tid:
continue
output = build_tool_result_output(block, entry, anonymizer)
output = build_tool_result_output(block, entry)
result[tid] = {
"output": output,
"status": "error" if block.get("is_error") else "success",
Expand All @@ -177,13 +176,12 @@ def build_tool_result_map(entries: Iterable[dict[str, Any]], anonymizer: Anonymi
def build_tool_result_output(
block: dict[str, Any],
entry: dict[str, Any],
anonymizer: Anonymizer,
) -> dict[str, Any]:
text, raw_content = parse_tool_result_content(block.get("content"), anonymizer)
text, raw_content = parse_tool_result_content(block.get("content"))
if text is None:
text = extract_tool_result_text(entry.get("toolUseResult"), anonymizer)
text = extract_tool_result_text(entry.get("toolUseResult"))

raw_result = sanitize_tool_use_result(entry.get("toolUseResult"), text, anonymizer)
raw_result = sanitize_tool_use_result(entry.get("toolUseResult"), text)
source_tool_uuid = entry.get("sourceToolAssistantUUID")
if isinstance(source_tool_uuid, str) and source_tool_uuid:
if raw_result is None:
Expand All @@ -201,9 +199,9 @@ def build_tool_result_output(
return output


def parse_tool_result_content(content: Any, anonymizer: Anonymizer) -> tuple[str | None, Any]:
def parse_tool_result_content(content: Any) -> tuple[str | None, Any]:
if isinstance(content, str):
text = normalize_tool_result_text(content, anonymizer)
text = normalize_tool_result_text(content)
if text is not None:
return text, None
if should_skip_large_binary_string(content):
Expand All @@ -215,36 +213,34 @@ def parse_tool_result_content(content: Any, anonymizer: Anonymizer) -> tuple[str
raw_parts: list[Any] = []
for part in content:
if isinstance(part, dict):
anonymized_part = anonymize_value("content", part, anonymizer)
if part.get("type") == "text":
part_text = extract_tool_result_text(anonymized_part, anonymizer=None)
part_text = extract_tool_result_text(part)
if part_text:
text_parts.append(part_text)
raw_part = prune_empty_values(drop_duplicate_text_fields(anonymized_part, part_text))
raw_part = prune_empty_values(drop_duplicate_text_fields(part, part_text))
if raw_part is not None and raw_part != {"type": "text"}:
raw_parts.append(raw_part)
continue
raw_parts.append(anonymized_part)
raw_parts.append(part)
continue
raw_parts.append(anonymize_value("content", part, anonymizer))
raw_parts.append(part)

text = "\n\n".join(text_parts).strip() if text_parts else None
return text or None, prune_empty_values(raw_parts)

if isinstance(content, dict):
anonymized_content = anonymize_value("content", content, anonymizer)
text = extract_tool_result_text(anonymized_content, anonymizer=None)
raw = prune_empty_values(drop_duplicate_text_fields(anonymized_content, text))
text = extract_tool_result_text(content)
raw = prune_empty_values(drop_duplicate_text_fields(content, text))
if raw == {"type": "text"}:
raw = None
return text, raw

return None, prune_empty_values(anonymize_value("content", content, anonymizer))
return None, prune_empty_values(content)


def extract_tool_result_text(value: Any, anonymizer: Anonymizer | None) -> str | None:
def extract_tool_result_text(value: Any) -> str | None:
if isinstance(value, str):
return normalize_tool_result_text(value, anonymizer)
return normalize_tool_result_text(value)

if isinstance(value, list):
text_parts = []
Expand All @@ -253,7 +249,7 @@ def extract_tool_result_text(value: Any, anonymizer: Anonymizer | None) -> str |
continue
if part.get("type") != "text":
continue
text = normalize_tool_result_text(part.get("text"), anonymizer)
text = normalize_tool_result_text(part.get("text"))
if text:
text_parts.append(text)
if text_parts:
Expand All @@ -264,45 +260,42 @@ def extract_tool_result_text(value: Any, anonymizer: Anonymizer | None) -> str |
return None

for candidate in (value.get("stdout"), value.get("content"), value.get("text")):
text = normalize_tool_result_text(candidate, anonymizer)
text = normalize_tool_result_text(candidate)
if text:
return text

file_info = value.get("file")
if isinstance(file_info, dict):
return normalize_tool_result_text(file_info.get("content"), anonymizer)
return normalize_tool_result_text(file_info.get("content"))

return None


def normalize_tool_result_text(value: Any, anonymizer: Anonymizer | None) -> str | None:
def normalize_tool_result_text(value: Any) -> str | None:
if not isinstance(value, str):
return None
text = value.strip()
if not text or should_skip_large_binary_string(text):
return None
if anonymizer is None:
return text
return anonymizer.text(text)
return text


def sanitize_tool_use_result(
tool_use_result: Any,
text: str | None,
anonymizer: Anonymizer,
) -> dict[str, Any] | None:
if tool_use_result is None:
return None

if isinstance(tool_use_result, str):
if should_skip_large_binary_string(tool_use_result):
return {"text": tool_use_result}
sanitized_text = normalize_tool_result_text(tool_use_result, anonymizer)
sanitized_text = normalize_tool_result_text(tool_use_result)
if not sanitized_text or text_matches_tool_result(sanitized_text, text):
return None
return {"text": sanitized_text}

sanitized = anonymize_value("toolUseResult", tool_use_result, anonymizer)
sanitized = tool_use_result
sanitized = drop_redundant_result_fields(sanitized)
sanitized = drop_duplicate_text_fields(sanitized, text)
pruned = prune_empty_values(sanitized)
Expand Down Expand Up @@ -405,7 +398,6 @@ def parse_session_file(
messages: list[dict[str, Any]] = []
metadata = {
"session_id": filepath.stem,
"cwd": None,
"git_branch": None,
"claude_version": None,
"model": None,
Expand All @@ -423,15 +415,14 @@ def parse_session_file(
messages,
metadata,
stats,
anonymizer,
include_thinking,
pending_tool_results=pending_tool_results,
pending_tool_uses=pending_tool_uses,
)
except OSError:
return None

return make_session_result(metadata, messages, stats)
return make_session_result(metadata, messages, stats, anonymizer=anonymizer)


def find_subagent_sessions(project_dir: Path) -> list[Path]:
Expand Down Expand Up @@ -469,7 +460,6 @@ def parse_subagent_session(
messages: list[dict[str, Any]] = []
metadata = {
"session_id": session_dir.name,
"cwd": None,
"git_branch": None,
"claude_version": None,
"model": None,
Expand All @@ -489,7 +479,6 @@ def parse_subagent_session(
messages,
metadata,
stats,
anonymizer,
include_thinking,
pending_tool_results=pending_tool_results,
pending_tool_uses=pending_tool_uses,
Expand All @@ -501,7 +490,7 @@ def parse_subagent_session(
return None

metadata["session_id"] = resolve_subagent_session_id(session_dir, metadata["session_id"])
return make_session_result(metadata, messages, stats)
return make_session_result(metadata, messages, stats, anonymizer=anonymizer)


def resolve_subagent_session_id(session_dir: Path, session_id: str) -> str:
Expand Down Expand Up @@ -539,25 +528,23 @@ def process_entry(
messages: list[dict[str, Any]],
metadata: dict[str, Any],
stats: dict[str, int],
anonymizer: Anonymizer,
include_thinking: bool,
tool_result_map: dict[str, dict] | None = None,
pending_tool_results: dict[str, dict[str, Any]] | None = None,
pending_tool_uses: dict[str, list[dict[str, Any]]] | None = None,
) -> None:
entry_type = entry.get("type")

if metadata["cwd"] is None and entry.get("cwd"):
metadata["cwd"] = anonymizer.path(entry["cwd"])
if entry.get("cwd"):
metadata["git_branch"] = entry.get("gitBranch")
metadata["claude_version"] = entry.get("version")
metadata["session_id"] = entry.get("sessionId", metadata["session_id"])

timestamp = normalize_timestamp(entry.get("timestamp"))

if entry_type == "user":
_attach_claude_tool_results(entry, anonymizer, pending_tool_results, pending_tool_uses)
content = extract_user_content(entry, anonymizer)
_attach_claude_tool_results(entry, pending_tool_results, pending_tool_uses)
content = extract_user_content(entry)
if content is not None:
messages.append({"role": "user", "content": content, "timestamp": timestamp})
stats["user_messages"] += 1
Expand All @@ -566,7 +553,6 @@ def process_entry(
elif entry_type == "assistant":
msg = extract_assistant_content(
entry,
anonymizer,
include_thinking,
tool_result_map,
pending_tool_results,
Expand All @@ -591,20 +577,19 @@ def process_entry(
update_time_bounds(metadata, timestamp)


def extract_user_content(entry: dict[str, Any], anonymizer: Anonymizer) -> str | None:
def extract_user_content(entry: dict[str, Any]) -> str | None:
msg_data = entry.get("message", {})
content = msg_data.get("content", "")
if isinstance(content, list):
text_parts = [b.get("text", "") for b in content if b.get("type") == "text"]
content = "\n".join(text_parts)
if not content or not content.strip():
return None
return anonymizer.text(content)
return content


def extract_assistant_content(
entry: dict[str, Any],
anonymizer: Anonymizer,
include_thinking: bool,
tool_result_map: dict[str, dict] | None = None,
pending_tool_results: dict[str, dict[str, Any]] | None = None,
Expand All @@ -626,15 +611,15 @@ def extract_assistant_content(
if block_type == "text":
text = block.get("text", "").strip()
if text:
text_parts.append(anonymizer.text(text))
text_parts.append(text)
elif block_type == "thinking" and include_thinking:
thinking = block.get("thinking", "").strip()
if thinking:
thinking_parts.append(anonymizer.text(thinking))
thinking_parts.append(thinking)
elif block_type == "tool_use":
tu: dict[str, Any] = {
"tool": block.get("name"),
"input": parse_tool_input(block.get("name"), block.get("input", {}), anonymizer),
"input": parse_tool_input(block.get("input", {})),
}
tool_use_id = block.get("id")
if tool_result_map is not None:
Expand Down Expand Up @@ -671,7 +656,6 @@ def _apply_claude_tool_result(tool_use: dict[str, Any], result: dict[str, Any])

def _attach_claude_tool_results(
entry: dict[str, Any],
anonymizer: Anonymizer,
pending_tool_results: dict[str, dict[str, Any]] | None,
pending_tool_uses: dict[str, list[dict[str, Any]]] | None,
) -> None:
Expand All @@ -690,7 +674,7 @@ def _attach_claude_tool_results(
continue

result = {
"output": build_tool_result_output(block, entry, anonymizer),
"output": build_tool_result_output(block, entry),
"status": "error" if block.get("is_error") else "success",
}
matched_tool_uses = [] if pending_tool_uses is None else pending_tool_uses.pop(tool_use_id, [])
Expand Down
Loading
Loading