diff --git a/src/memos/mem_scheduler/task_schedule_modules/handlers/add_handler.py b/src/memos/mem_scheduler/task_schedule_modules/handlers/add_handler.py index a30119cf4..e4a88a635 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/handlers/add_handler.py +++ b/src/memos/mem_scheduler/task_schedule_modules/handlers/add_handler.py @@ -68,47 +68,35 @@ def log_add_messages(self, msg: ScheduleMessageItem): mem_item = mem_cube.text_mem.get(memory_id=memory_id, user_name=msg.mem_cube_id) if mem_item is None: raise ValueError(f"Memory {memory_id} not found after retries") - key = getattr(mem_item.metadata, "key", None) or transform_name_to_key( - name=mem_item.memory - ) - exists = False original_content = None original_item_id = None - if key and hasattr(mem_cube.text_mem, "graph_store"): + # Determine add vs update from the merged_from field set by the upstream + # mem_reader during fine extraction. When the LLM merges a new memory with + # existing ones it writes their IDs into metadata.info["merged_from"]. + # This avoids an extra graph DB query and the self-match / cross-user + # matching bugs that came with the old get_by_metadata approach. + merged_from = (getattr(mem_item.metadata, "info", None) or {}).get("merged_from") + if merged_from: + merged_ids = ( + merged_from + if isinstance(merged_from, list | tuple | set) + else [merged_from] + ) + original_item_id = merged_ids[0] try: - candidates = mem_cube.text_mem.graph_store.get_by_metadata( - [ - {"field": "key", "op": "=", "value": key}, - { - "field": "memory_type", - "op": "=", - "value": mem_item.metadata.memory_type, - }, - ], - user_name=msg.mem_cube_id, + original_mem_item = mem_cube.text_mem.get( + memory_id=original_item_id, user_name=msg.mem_cube_id ) + original_content = original_mem_item.memory if original_mem_item else None except Exception as e: logger.warning( - "get_by_metadata failed for memory_id=%s key=%s: %s", - memory_id, - key, + "Failed to fetch original memory %s for update log: %s", + original_item_id, e, ) - candidates = [] - # Exclude the current item itself — it was already persisted - # upstream by text_mem.add(), so it will always appear in the - # candidates and would cause every ADD to be mis-labelled as UPDATE. - candidates = [c for c in candidates if c != mem_item.id] - if candidates: - exists = True - original_item_id = candidates[0] - original_mem_item = mem_cube.text_mem.get( - memory_id=original_item_id, user_name=msg.mem_cube_id - ) - original_content = original_mem_item.memory if original_mem_item else None - if exists: + if merged_from: prepared_update_items_with_original.append( { "new_item": mem_item, diff --git a/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py b/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py index 5d86c5589..20dbb63b2 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py +++ b/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py @@ -259,13 +259,19 @@ def _process_memories_with_reader( source_doc_id = ( file_ids[0] if isinstance(file_ids, list) and file_ids else None ) + # Use merged_from to determine ADD vs UPDATE. + # The upstream mem_reader sets this during fine extraction when + # the new memory was merged with an existing one. + item_merged_from = (getattr(item.metadata, "info", None) or {}).get( + "merged_from" + ) kb_log_content.append( { "log_source": "KNOWLEDGE_BASE_LOG", "trigger_source": info.get("trigger_source", "Messages") if info else "Messages", - "operation": "ADD", + "operation": "UPDATE" if item_merged_from else "ADD", "memory_id": item.id, "content": item.memory, "original_content": None, @@ -302,29 +308,39 @@ def _process_memories_with_reader( else: add_content_legacy: list[dict] = [] add_meta_legacy: list[dict] = [] + update_content_legacy: list[dict] = [] + update_meta_legacy: list[dict] = [] for item_id, item in zip( enhanced_mem_ids, flattened_memories, strict=False ): key = getattr(item.metadata, "key", None) or transform_name_to_key( name=item.memory ) - add_content_legacy.append( - {"content": f"{key}: {item.memory}", "ref_id": item_id} - ) - add_meta_legacy.append( - { - "ref_id": item_id, - "id": item_id, - "key": item.metadata.key, - "memory": item.memory, - "memory_type": item.metadata.memory_type, - "status": item.metadata.status, - "confidence": item.metadata.confidence, - "tags": item.metadata.tags, - "updated_at": getattr(item.metadata, "updated_at", None) - or getattr(item.metadata, "update_at", None), - } + item_merged_from = (getattr(item.metadata, "info", None) or {}).get( + "merged_from" ) + meta_entry = { + "ref_id": item_id, + "id": item_id, + "key": item.metadata.key, + "memory": item.memory, + "memory_type": item.metadata.memory_type, + "status": item.metadata.status, + "confidence": item.metadata.confidence, + "tags": item.metadata.tags, + "updated_at": getattr(item.metadata, "updated_at", None) + or getattr(item.metadata, "update_at", None), + } + if item_merged_from: + update_content_legacy.append( + {"content": f"{key}: {item.memory}", "ref_id": item_id} + ) + update_meta_legacy.append(meta_entry) + else: + add_content_legacy.append( + {"content": f"{key}: {item.memory}", "ref_id": item_id} + ) + add_meta_legacy.append(meta_entry) if add_content_legacy: event = self.scheduler_context.services.create_event_log( label="addMemory", @@ -342,6 +358,23 @@ def _process_memories_with_reader( ) event.task_id = task_id self.scheduler_context.services.submit_web_logs([event]) + if update_content_legacy: + event = self.scheduler_context.services.create_event_log( + label="updateMemory", + from_memory_type=USER_INPUT_TYPE, + to_memory_type=LONG_TERM_MEMORY_TYPE, + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=self.scheduler_context.get_mem_cube(), + memcube_log_content=update_content_legacy, + metadata=update_meta_legacy, + memory_len=len(update_content_legacy), + memcube_name=self.scheduler_context.services.map_memcube_name( + mem_cube_id + ), + ) + event.task_id = task_id + self.scheduler_context.services.submit_web_logs([event]) else: logger.info("No enhanced memories generated by mem_reader") else: