Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,47 +68,35 @@ def log_add_messages(self, msg: ScheduleMessageItem):
mem_item = mem_cube.text_mem.get(memory_id=memory_id, user_name=msg.mem_cube_id)
if mem_item is None:
raise ValueError(f"Memory {memory_id} not found after retries")
key = getattr(mem_item.metadata, "key", None) or transform_name_to_key(
name=mem_item.memory
)
exists = False
original_content = None
original_item_id = None

if key and hasattr(mem_cube.text_mem, "graph_store"):
# Determine add vs update from the merged_from field set by the upstream
# mem_reader during fine extraction. When the LLM merges a new memory with
# existing ones it writes their IDs into metadata.info["merged_from"].
# This avoids an extra graph DB query and the self-match / cross-user
# matching bugs that came with the old get_by_metadata approach.
merged_from = (getattr(mem_item.metadata, "info", None) or {}).get("merged_from")
if merged_from:
merged_ids = (
merged_from
if isinstance(merged_from, list | tuple | set)
else [merged_from]
)
original_item_id = merged_ids[0]
try:
candidates = mem_cube.text_mem.graph_store.get_by_metadata(
[
{"field": "key", "op": "=", "value": key},
{
"field": "memory_type",
"op": "=",
"value": mem_item.metadata.memory_type,
},
],
user_name=msg.mem_cube_id,
original_mem_item = mem_cube.text_mem.get(
memory_id=original_item_id, user_name=msg.mem_cube_id
)
original_content = original_mem_item.memory if original_mem_item else None
except Exception as e:
logger.warning(
"get_by_metadata failed for memory_id=%s key=%s: %s",
memory_id,
key,
"Failed to fetch original memory %s for update log: %s",
original_item_id,
e,
)
candidates = []
# Exclude the current item itself — it was already persisted
# upstream by text_mem.add(), so it will always appear in the
# candidates and would cause every ADD to be mis-labelled as UPDATE.
candidates = [c for c in candidates if c != mem_item.id]
if candidates:
exists = True
original_item_id = candidates[0]
original_mem_item = mem_cube.text_mem.get(
memory_id=original_item_id, user_name=msg.mem_cube_id
)
original_content = original_mem_item.memory if original_mem_item else None

if exists:
if merged_from:
prepared_update_items_with_original.append(
{
"new_item": mem_item,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,19 @@ def _process_memories_with_reader(
source_doc_id = (
file_ids[0] if isinstance(file_ids, list) and file_ids else None
)
# Use merged_from to determine ADD vs UPDATE.
# The upstream mem_reader sets this during fine extraction when
# the new memory was merged with an existing one.
item_merged_from = (getattr(item.metadata, "info", None) or {}).get(
"merged_from"
)
kb_log_content.append(
{
"log_source": "KNOWLEDGE_BASE_LOG",
"trigger_source": info.get("trigger_source", "Messages")
if info
else "Messages",
"operation": "ADD",
"operation": "UPDATE" if item_merged_from else "ADD",
"memory_id": item.id,
"content": item.memory,
"original_content": None,
Expand Down Expand Up @@ -302,29 +308,39 @@ def _process_memories_with_reader(
else:
add_content_legacy: list[dict] = []
add_meta_legacy: list[dict] = []
update_content_legacy: list[dict] = []
update_meta_legacy: list[dict] = []
for item_id, item in zip(
enhanced_mem_ids, flattened_memories, strict=False
):
key = getattr(item.metadata, "key", None) or transform_name_to_key(
name=item.memory
)
add_content_legacy.append(
{"content": f"{key}: {item.memory}", "ref_id": item_id}
)
add_meta_legacy.append(
{
"ref_id": item_id,
"id": item_id,
"key": item.metadata.key,
"memory": item.memory,
"memory_type": item.metadata.memory_type,
"status": item.metadata.status,
"confidence": item.metadata.confidence,
"tags": item.metadata.tags,
"updated_at": getattr(item.metadata, "updated_at", None)
or getattr(item.metadata, "update_at", None),
}
item_merged_from = (getattr(item.metadata, "info", None) or {}).get(
"merged_from"
)
meta_entry = {
"ref_id": item_id,
"id": item_id,
"key": item.metadata.key,
"memory": item.memory,
"memory_type": item.metadata.memory_type,
"status": item.metadata.status,
"confidence": item.metadata.confidence,
"tags": item.metadata.tags,
"updated_at": getattr(item.metadata, "updated_at", None)
or getattr(item.metadata, "update_at", None),
}
if item_merged_from:
update_content_legacy.append(
{"content": f"{key}: {item.memory}", "ref_id": item_id}
)
update_meta_legacy.append(meta_entry)
else:
add_content_legacy.append(
{"content": f"{key}: {item.memory}", "ref_id": item_id}
)
add_meta_legacy.append(meta_entry)
if add_content_legacy:
event = self.scheduler_context.services.create_event_log(
label="addMemory",
Expand All @@ -342,6 +358,23 @@ def _process_memories_with_reader(
)
event.task_id = task_id
self.scheduler_context.services.submit_web_logs([event])
if update_content_legacy:
event = self.scheduler_context.services.create_event_log(
label="updateMemory",
from_memory_type=USER_INPUT_TYPE,
to_memory_type=LONG_TERM_MEMORY_TYPE,
user_id=user_id,
mem_cube_id=mem_cube_id,
mem_cube=self.scheduler_context.get_mem_cube(),
memcube_log_content=update_content_legacy,
metadata=update_meta_legacy,
memory_len=len(update_content_legacy),
memcube_name=self.scheduler_context.services.map_memcube_name(
mem_cube_id
),
)
event.task_id = task_id
self.scheduler_context.services.submit_web_logs([event])
else:
logger.info("No enhanced memories generated by mem_reader")
else:
Expand Down