From 72d6f995e8b48292dc97a41c466cbf9f169d1e4e Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Wed, 25 Feb 2026 16:37:49 +0800 Subject: [PATCH 1/2] fix(handlers): replace get_by_metadata with merged_from for add/update log detection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit add_handler: - Remove get_by_metadata graph DB query (eliminated self-match, cross-user matching, and Cypher escaping bugs) - Use metadata.info['merged_from'] to determine ADD vs UPDATE — set upstream by mem_reader during fine extraction when LLM merges memories - Remove unused key/transform_name_to_key computation in log_add_messages mem_read_handler: - Cloud env: mark operation as 'UPDATE' when merged_from is set, 'ADD' otherwise - Local env: split items into addMemory / updateMemory events based on merged_from, emitting separate scheduler log events for each --- .../handlers/add_handler.py | 52 ++++++-------- .../handlers/mem_read_handler.py | 67 ++++++++++++++----- 2 files changed, 71 insertions(+), 48 deletions(-) diff --git a/src/memos/mem_scheduler/task_schedule_modules/handlers/add_handler.py b/src/memos/mem_scheduler/task_schedule_modules/handlers/add_handler.py index a30119cf4..c2a1b56e5 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/handlers/add_handler.py +++ b/src/memos/mem_scheduler/task_schedule_modules/handlers/add_handler.py @@ -68,47 +68,37 @@ def log_add_messages(self, msg: ScheduleMessageItem): mem_item = mem_cube.text_mem.get(memory_id=memory_id, user_name=msg.mem_cube_id) if mem_item is None: raise ValueError(f"Memory {memory_id} not found after retries") - key = getattr(mem_item.metadata, "key", None) or transform_name_to_key( - name=mem_item.memory - ) - exists = False original_content = None original_item_id = None - if key and hasattr(mem_cube.text_mem, "graph_store"): + # Determine add vs update from the merged_from field set by the upstream + # mem_reader during fine extraction. When the LLM merges a new memory with + # existing ones it writes their IDs into metadata.info["merged_from"]. + # This avoids an extra graph DB query and the self-match / cross-user + # matching bugs that came with the old get_by_metadata approach. + merged_from = (getattr(mem_item.metadata, "info", None) or {}).get( + "merged_from" + ) + if merged_from: + merged_ids = ( + merged_from + if isinstance(merged_from, (list, tuple, set)) + else [merged_from] + ) + original_item_id = merged_ids[0] try: - candidates = mem_cube.text_mem.graph_store.get_by_metadata( - [ - {"field": "key", "op": "=", "value": key}, - { - "field": "memory_type", - "op": "=", - "value": mem_item.metadata.memory_type, - }, - ], - user_name=msg.mem_cube_id, + original_mem_item = mem_cube.text_mem.get( + memory_id=original_item_id, user_name=msg.mem_cube_id ) + original_content = original_mem_item.memory if original_mem_item else None except Exception as e: logger.warning( - "get_by_metadata failed for memory_id=%s key=%s: %s", - memory_id, - key, + "Failed to fetch original memory %s for update log: %s", + original_item_id, e, ) - candidates = [] - # Exclude the current item itself — it was already persisted - # upstream by text_mem.add(), so it will always appear in the - # candidates and would cause every ADD to be mis-labelled as UPDATE. - candidates = [c for c in candidates if c != mem_item.id] - if candidates: - exists = True - original_item_id = candidates[0] - original_mem_item = mem_cube.text_mem.get( - memory_id=original_item_id, user_name=msg.mem_cube_id - ) - original_content = original_mem_item.memory if original_mem_item else None - if exists: + if merged_from: prepared_update_items_with_original.append( { "new_item": mem_item, diff --git a/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py b/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py index 5d86c5589..20dbb63b2 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py +++ b/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py @@ -259,13 +259,19 @@ def _process_memories_with_reader( source_doc_id = ( file_ids[0] if isinstance(file_ids, list) and file_ids else None ) + # Use merged_from to determine ADD vs UPDATE. + # The upstream mem_reader sets this during fine extraction when + # the new memory was merged with an existing one. + item_merged_from = (getattr(item.metadata, "info", None) or {}).get( + "merged_from" + ) kb_log_content.append( { "log_source": "KNOWLEDGE_BASE_LOG", "trigger_source": info.get("trigger_source", "Messages") if info else "Messages", - "operation": "ADD", + "operation": "UPDATE" if item_merged_from else "ADD", "memory_id": item.id, "content": item.memory, "original_content": None, @@ -302,29 +308,39 @@ def _process_memories_with_reader( else: add_content_legacy: list[dict] = [] add_meta_legacy: list[dict] = [] + update_content_legacy: list[dict] = [] + update_meta_legacy: list[dict] = [] for item_id, item in zip( enhanced_mem_ids, flattened_memories, strict=False ): key = getattr(item.metadata, "key", None) or transform_name_to_key( name=item.memory ) - add_content_legacy.append( - {"content": f"{key}: {item.memory}", "ref_id": item_id} - ) - add_meta_legacy.append( - { - "ref_id": item_id, - "id": item_id, - "key": item.metadata.key, - "memory": item.memory, - "memory_type": item.metadata.memory_type, - "status": item.metadata.status, - "confidence": item.metadata.confidence, - "tags": item.metadata.tags, - "updated_at": getattr(item.metadata, "updated_at", None) - or getattr(item.metadata, "update_at", None), - } + item_merged_from = (getattr(item.metadata, "info", None) or {}).get( + "merged_from" ) + meta_entry = { + "ref_id": item_id, + "id": item_id, + "key": item.metadata.key, + "memory": item.memory, + "memory_type": item.metadata.memory_type, + "status": item.metadata.status, + "confidence": item.metadata.confidence, + "tags": item.metadata.tags, + "updated_at": getattr(item.metadata, "updated_at", None) + or getattr(item.metadata, "update_at", None), + } + if item_merged_from: + update_content_legacy.append( + {"content": f"{key}: {item.memory}", "ref_id": item_id} + ) + update_meta_legacy.append(meta_entry) + else: + add_content_legacy.append( + {"content": f"{key}: {item.memory}", "ref_id": item_id} + ) + add_meta_legacy.append(meta_entry) if add_content_legacy: event = self.scheduler_context.services.create_event_log( label="addMemory", @@ -342,6 +358,23 @@ def _process_memories_with_reader( ) event.task_id = task_id self.scheduler_context.services.submit_web_logs([event]) + if update_content_legacy: + event = self.scheduler_context.services.create_event_log( + label="updateMemory", + from_memory_type=USER_INPUT_TYPE, + to_memory_type=LONG_TERM_MEMORY_TYPE, + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=self.scheduler_context.get_mem_cube(), + memcube_log_content=update_content_legacy, + metadata=update_meta_legacy, + memory_len=len(update_content_legacy), + memcube_name=self.scheduler_context.services.map_memcube_name( + mem_cube_id + ), + ) + event.task_id = task_id + self.scheduler_context.services.submit_web_logs([event]) else: logger.info("No enhanced memories generated by mem_reader") else: From e280eca9e42bf28ff66e7d3028ecc7a1c4787cb8 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Wed, 25 Feb 2026 16:56:40 +0800 Subject: [PATCH 2/2] style(handlers): satisfy ruff formatting and isinstance union types --- .../task_schedule_modules/handlers/add_handler.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/memos/mem_scheduler/task_schedule_modules/handlers/add_handler.py b/src/memos/mem_scheduler/task_schedule_modules/handlers/add_handler.py index c2a1b56e5..e4a88a635 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/handlers/add_handler.py +++ b/src/memos/mem_scheduler/task_schedule_modules/handlers/add_handler.py @@ -76,13 +76,11 @@ def log_add_messages(self, msg: ScheduleMessageItem): # existing ones it writes their IDs into metadata.info["merged_from"]. # This avoids an extra graph DB query and the self-match / cross-user # matching bugs that came with the old get_by_metadata approach. - merged_from = (getattr(mem_item.metadata, "info", None) or {}).get( - "merged_from" - ) + merged_from = (getattr(mem_item.metadata, "info", None) or {}).get("merged_from") if merged_from: merged_ids = ( merged_from - if isinstance(merged_from, (list, tuple, set)) + if isinstance(merged_from, list | tuple | set) else [merged_from] ) original_item_id = merged_ids[0]