From 3dbc1cb42791fe80327db56520dc17824aabb4dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 26 Feb 2026 19:30:55 +0800 Subject: [PATCH 1/4] fix: user_name --- src/memos/memories/textual/tree.py | 14 ++-- .../tree_text_memory/organize/handler.py | 64 ++++++++++----- .../tree_text_memory/organize/manager.py | 41 +++++++--- .../tree_text_memory/organize/reorganizer.py | 77 ++++++++++++++----- 4 files changed, 140 insertions(+), 56 deletions(-) diff --git a/src/memos/memories/textual/tree.py b/src/memos/memories/textual/tree.py index 5faf8aa09..5b210ba61 100644 --- a/src/memos/memories/textual/tree.py +++ b/src/memos/memories/textual/tree.py @@ -404,10 +404,10 @@ def delete_by_memory_ids(self, memory_ids: list[str]) -> None: except Exception as e: logger.error(f"An error occurred while deleting memories by memory_ids: {e}") - def delete_all(self) -> None: + def delete_all(self, user_name: str | None = None) -> None: """Delete all memories and their relationships from the graph store.""" try: - self.graph_store.clear() + self.graph_store.clear(user_name=user_name) logger.info("All memories and edges have been deleted from the graph.") except Exception as e: logger.error(f"An error occurred while deleting all memories: {e}") @@ -424,7 +424,7 @@ def delete_by_filter( writable_cube_ids=writable_cube_ids, file_ids=file_ids, filter=filter ) - def load(self, dir: str) -> None: + def load(self, dir: str, user_name: str | None = None) -> None: try: memory_file = os.path.join(dir, self.config.memory_filename) @@ -435,7 +435,7 @@ def load(self, dir: str) -> None: with open(memory_file, encoding="utf-8") as f: memories = json.load(f) - self.graph_store.import_graph(memories) + self.graph_store.import_graph(memories, user_name=user_name) logger.info(f"Loaded {len(memories)} memories from {memory_file}") except FileNotFoundError: @@ -445,10 +445,12 @@ def load(self, dir: str) -> None: except Exception as e: logger.error(f"An error occurred while loading memories: {e}") - def dump(self, dir: str, include_embedding: bool = False) -> None: + def dump(self, dir: str, include_embedding: bool = False, user_name: str | None = None) -> None: """Dump memories to os.path.join(dir, self.config.memory_filename)""" try: - json_memories = self.graph_store.export_graph(include_embedding=include_embedding) + json_memories = self.graph_store.export_graph( + include_embedding=include_embedding, user_name=user_name + ) os.makedirs(dir, exist_ok=True) memory_file = os.path.join(dir, self.config.memory_filename) diff --git a/src/memos/memories/textual/tree_text_memory/organize/handler.py b/src/memos/memories/textual/tree_text_memory/organize/handler.py index 595cf099c..2d776912b 100644 --- a/src/memos/memories/textual/tree_text_memory/organize/handler.py +++ b/src/memos/memories/textual/tree_text_memory/organize/handler.py @@ -27,18 +27,24 @@ def __init__(self, graph_store: Neo4jGraphDB, llm: BaseLLM, embedder: BaseEmbedd self.llm = llm self.embedder = embedder - def detect(self, memory, top_k: int = 5, scope=None): + def detect(self, memory, top_k: int = 5, scope=None, user_name: str | None = None): # 1. Search for similar memories based on embedding embedding = memory.metadata.embedding embedding_candidates_info = self.graph_store.search_by_embedding( - embedding, top_k=top_k, scope=scope, threshold=self.EMBEDDING_THRESHOLD + embedding, + top_k=top_k, + scope=scope, + threshold=self.EMBEDDING_THRESHOLD, + user_name=user_name, ) # 2. Filter based on similarity threshold embedding_candidates_ids = [ info["id"] for info in embedding_candidates_info if info["id"] != memory.id ] # 3. Judge conflicts using LLM - embedding_candidates = self.graph_store.get_nodes(embedding_candidates_ids) + embedding_candidates = self.graph_store.get_nodes( + embedding_candidates_ids, user_name=user_name + ) detected_relationships = [] for embedding_candidate in embedding_candidates: embedding_candidate = TextualMemoryItem.from_dict(embedding_candidate) @@ -67,13 +73,20 @@ def detect(self, memory, top_k: int = 5, scope=None): pass return detected_relationships - def resolve(self, memory_a: TextualMemoryItem, memory_b: TextualMemoryItem, relation) -> None: + def resolve( + self, + memory_a: TextualMemoryItem, + memory_b: TextualMemoryItem, + relation, + user_name: str | None = None, + ) -> None: """ Resolve detected conflicts between two memory items using LLM fusion. Args: memory_a: The first conflicting memory item. memory_b: The second conflicting memory item. relation: relation + user_name: Optional user name for multi-tenant isolation. Returns: A fused TextualMemoryItem representing the resolved memory. """ @@ -105,17 +118,22 @@ def resolve(self, memory_a: TextualMemoryItem, memory_b: TextualMemoryItem, rela logger.warning( f"{relation} between {memory_a.id} and {memory_b.id} could not be resolved. " ) - self._hard_update(memory_a, memory_b) + self._hard_update(memory_a, memory_b, user_name=user_name) # —————— 2.2 Conflict resolved, update metadata and memory ———— else: fixed_metadata = self._merge_metadata(answer, memory_a.metadata, memory_b.metadata) merged_memory = TextualMemoryItem(memory=answer, metadata=fixed_metadata) logger.info(f"Resolved result: {merged_memory}") - self._resolve_in_graph(memory_a, memory_b, merged_memory) + self._resolve_in_graph(memory_a, memory_b, merged_memory, user_name=user_name) except json.decoder.JSONDecodeError: logger.error(f"Failed to parse LLM response: {response}") - def _hard_update(self, memory_a: TextualMemoryItem, memory_b: TextualMemoryItem): + def _hard_update( + self, + memory_a: TextualMemoryItem, + memory_b: TextualMemoryItem, + user_name: str | None = None, + ): """ Hard update: compare updated_at, keep the newer one, overwrite the older one's metadata. """ @@ -125,7 +143,7 @@ def _hard_update(self, memory_a: TextualMemoryItem, memory_b: TextualMemoryItem) newer_mem = memory_a if time_a >= time_b else memory_b older_mem = memory_b if time_a >= time_b else memory_a - self.graph_store.delete_node(older_mem.id) + self.graph_store.delete_node(older_mem.id, user_name=user_name) logger.warning( f"Delete older memory {older_mem.id}: <{older_mem.memory}> due to conflict with {newer_mem.id}: <{newer_mem.memory}>" ) @@ -135,13 +153,21 @@ def _resolve_in_graph( conflict_a: TextualMemoryItem, conflict_b: TextualMemoryItem, merged: TextualMemoryItem, + user_name: str | None = None, ): - edges_a = self.graph_store.get_edges(conflict_a.id, type="ANY", direction="ANY") - edges_b = self.graph_store.get_edges(conflict_b.id, type="ANY", direction="ANY") + edges_a = self.graph_store.get_edges( + conflict_a.id, type="ANY", direction="ANY", user_name=user_name + ) + edges_b = self.graph_store.get_edges( + conflict_b.id, type="ANY", direction="ANY", user_name=user_name + ) all_edges = edges_a + edges_b self.graph_store.add_node( - merged.id, merged.memory, merged.metadata.model_dump(exclude_none=True) + merged.id, + merged.memory, + merged.metadata.model_dump(exclude_none=True), + user_name=user_name, ) for edge in all_edges: @@ -150,13 +176,15 @@ def _resolve_in_graph( if new_from == new_to: continue # Check if the edge already exists before adding - if not self.graph_store.edge_exists(new_from, new_to, edge["type"], direction="ANY"): - self.graph_store.add_edge(new_from, new_to, edge["type"]) - - self.graph_store.update_node(conflict_a.id, {"status": "archived"}) - self.graph_store.update_node(conflict_b.id, {"status": "archived"}) - self.graph_store.add_edge(conflict_a.id, merged.id, type="MERGED_TO") - self.graph_store.add_edge(conflict_b.id, merged.id, type="MERGED_TO") + if not self.graph_store.edge_exists( + new_from, new_to, edge["type"], direction="ANY", user_name=user_name + ): + self.graph_store.add_edge(new_from, new_to, edge["type"], user_name=user_name) + + self.graph_store.update_node(conflict_a.id, {"status": "archived"}, user_name=user_name) + self.graph_store.update_node(conflict_b.id, {"status": "archived"}, user_name=user_name) + self.graph_store.add_edge(conflict_a.id, merged.id, type="MERGED_TO", user_name=user_name) + self.graph_store.add_edge(conflict_b.id, merged.id, type="MERGED_TO", user_name=user_name) logger.debug( f"Archive {conflict_a.id} and {conflict_b.id}, and inherit their edges to {merged.id}." ) diff --git a/src/memos/memories/textual/tree_text_memory/organize/manager.py b/src/memos/memories/textual/tree_text_memory/organize/manager.py index cbc349d67..4ca30c7b8 100644 --- a/src/memos/memories/textual/tree_text_memory/organize/manager.py +++ b/src/memos/memories/textual/tree_text_memory/organize/manager.py @@ -238,7 +238,9 @@ def _submit_batches(nodes: list[dict], node_kind: str) -> None: _submit_batches(graph_nodes, "graph memory") if graph_node_ids and self.is_reorganize: - self.reorganizer.add_message(QueueMessage(op="add", after_node=graph_node_ids)) + self.reorganizer.add_message( + QueueMessage(op="add", after_node=graph_node_ids, user_name=user_name) + ) return added_ids @@ -411,16 +413,19 @@ def _add_to_graph_memory( QueueMessage( op="add", after_node=[node_id], + user_name=user_name, ) ) return node_id - def _inherit_edges(self, from_id: str, to_id: str) -> None: + def _inherit_edges(self, from_id: str, to_id: str, user_name: str | None = None) -> None: """ Migrate all non-lineage edges from `from_id` to `to_id`, and remove them from `from_id` after copying. """ - edges = self.graph_store.get_edges(from_id, type="ANY", direction="ANY") + edges = self.graph_store.get_edges( + from_id, type="ANY", direction="ANY", user_name=user_name + ) for edge in edges: if edge["type"] == "MERGED_TO": @@ -433,20 +438,29 @@ def _inherit_edges(self, from_id: str, to_id: str) -> None: continue # Add edge to merged node if it doesn't already exist - if not self.graph_store.edge_exists(new_from, new_to, edge["type"], direction="ANY"): - self.graph_store.add_edge(new_from, new_to, edge["type"]) + if not self.graph_store.edge_exists( + new_from, new_to, edge["type"], direction="ANY", user_name=user_name + ): + self.graph_store.add_edge(new_from, new_to, edge["type"], user_name=user_name) # Remove original edge if it involved the archived node - self.graph_store.delete_edge(edge["from"], edge["to"], edge["type"]) + self.graph_store.delete_edge( + edge["from"], edge["to"], edge["type"], user_name=user_name + ) def _ensure_structure_path( - self, memory_type: str, metadata: TreeNodeTextualMemoryMetadata + self, + memory_type: str, + metadata: TreeNodeTextualMemoryMetadata, + user_name: str | None = None, ) -> str: """ Ensure structural path exists (ROOT → ... → final node), return last node ID. Args: - path: like ["hobby", "photography"] + memory_type: Memory type for the structure node. + metadata: Metadata containing key and other fields. + user_name: Optional user name for multi-tenant isolation. Returns: Final node ID of the structure path. @@ -456,7 +470,8 @@ def _ensure_structure_path( [ {"field": "memory", "op": "=", "value": metadata.key}, {"field": "memory_type", "op": "=", "value": memory_type}, - ] + ], + user_name=user_name, ) if existing: node_id = existing[0] # Use the first match @@ -479,14 +494,16 @@ def _ensure_structure_path( ), ) self.graph_store.add_node( - id=new_node.id, - memory=new_node.memory, - metadata=new_node.metadata.model_dump(exclude_none=True), + new_node.id, + new_node.memory, + new_node.metadata.model_dump(exclude_none=True), + user_name=user_name, ) self.reorganizer.add_message( QueueMessage( op="add", after_node=[new_node.id], + user_name=user_name, ) ) diff --git a/src/memos/memories/textual/tree_text_memory/organize/reorganizer.py b/src/memos/memories/textual/tree_text_memory/organize/reorganizer.py index ea06a7c60..b7fb6b1a0 100644 --- a/src/memos/memories/textual/tree_text_memory/organize/reorganizer.py +++ b/src/memos/memories/textual/tree_text_memory/organize/reorganizer.py @@ -52,12 +52,14 @@ def __init__( before_edge: list[str] | list[GraphDBEdge] | None = None, after_node: list[str] | list[GraphDBNode] | None = None, after_edge: list[str] | list[GraphDBEdge] | None = None, + user_name: str | None = None, ): self.op = op self.before_node = before_node self.before_edge = before_edge self.after_node = after_node self.after_edge = after_edge + self.user_name = user_name def __str__(self) -> str: return f"QueueMessage(op={self.op}, before_node={self.before_node if self.before_node is None else len(self.before_node)}, after_node={self.after_node if self.after_node is None else len(self.after_node)})" @@ -191,11 +193,15 @@ def handle_add(self, message: QueueMessage): logger.debug(f"Handling add operation: {str(message)[:500]}") added_node = message.after_node[0] detected_relationships = self.resolver.detect( - added_node, scope=added_node.metadata.memory_type + added_node, + scope=added_node.metadata.memory_type, + user_name=message.user_name, ) if detected_relationships: for added_node, existing_node, relation in detected_relationships: - self.resolver.resolve(added_node, existing_node, relation) + self.resolver.resolve( + added_node, existing_node, relation, user_name=message.user_name + ) self._reorganize_needed = True @@ -209,6 +215,7 @@ def optimize_structure( min_cluster_size: int = 4, min_group_size: int = 20, max_duration_sec: int = 600, + user_name: str | None = None, ): """ Periodically reorganize the graph: @@ -232,7 +239,7 @@ def _check_deadline(where: str): logger.info(f"[GraphStructureReorganize] Already optimizing for {scope}. Skipping.") return - if self.graph_store.node_not_exist(scope): + if self.graph_store.node_not_exist(scope, user_name=user_name): logger.debug(f"[GraphStructureReorganize] No nodes for scope={scope}. Skip.") return @@ -244,12 +251,14 @@ def _check_deadline(where: str): logger.debug( f"[GraphStructureReorganize] Num of scope in self.graph_store is" - f" {self.graph_store.get_memory_count(scope)}" + f" {self.graph_store.get_memory_count(scope, user_name=user_name)}" ) # Load candidate nodes if _check_deadline("[GraphStructureReorganize] Before loading candidates"): return - raw_nodes = self.graph_store.get_structure_optimization_candidates(scope) + raw_nodes = self.graph_store.get_structure_optimization_candidates( + scope, user_name=user_name + ) nodes = [GraphDBNode(**n) for n in raw_nodes] if not nodes: @@ -281,6 +290,7 @@ def _check_deadline(where: str): scope, local_tree_threshold, min_cluster_size, + user_name, ) ) @@ -307,6 +317,7 @@ def _process_cluster_and_write( scope: str, local_tree_threshold: int, min_cluster_size: int, + user_name: str | None = None, ): if len(cluster_nodes) <= min_cluster_size: return @@ -319,15 +330,17 @@ def _process_cluster_and_write( if len(sub_nodes) < min_cluster_size: continue # Skip tiny noise sub_parent_node = self._summarize_cluster(sub_nodes, scope) - self._create_parent_node(sub_parent_node) - self._link_cluster_nodes(sub_parent_node, sub_nodes) + self._create_parent_node(sub_parent_node, user_name=user_name) + self._link_cluster_nodes(sub_parent_node, sub_nodes, user_name=user_name) sub_parents.append(sub_parent_node) if sub_parents and len(sub_parents) >= min_cluster_size: cluster_parent_node = self._summarize_cluster(cluster_nodes, scope) - self._create_parent_node(cluster_parent_node) + self._create_parent_node(cluster_parent_node, user_name=user_name) for sub_parent in sub_parents: - self.graph_store.add_edge(cluster_parent_node.id, sub_parent.id, "PARENT") + self.graph_store.add_edge( + cluster_parent_node.id, sub_parent.id, "PARENT", user_name=user_name + ) logger.info("Adding relations/reasons") nodes_to_check = cluster_nodes @@ -351,10 +364,16 @@ def _process_cluster_and_write( # 1) Add pairwise relations for rel in results["relations"]: if not self.graph_store.edge_exists( - rel["source_id"], rel["target_id"], rel["relation_type"] + rel["source_id"], + rel["target_id"], + rel["relation_type"], + user_name=user_name, ): self.graph_store.add_edge( - rel["source_id"], rel["target_id"], rel["relation_type"] + rel["source_id"], + rel["target_id"], + rel["relation_type"], + user_name=user_name, ) # 2) Add inferred nodes and link to sources @@ -363,14 +382,21 @@ def _process_cluster_and_write( inf_node.id, inf_node.memory, inf_node.metadata.model_dump(exclude_none=True), + user_name=user_name, ) for src_id in inf_node.metadata.sources: - self.graph_store.add_edge(src_id, inf_node.id, "INFERS") + self.graph_store.add_edge( + src_id, inf_node.id, "INFERS", user_name=user_name + ) # 3) Add sequence links for seq in results["sequence_links"]: - if not self.graph_store.edge_exists(seq["from_id"], seq["to_id"], "FOLLOWS"): - self.graph_store.add_edge(seq["from_id"], seq["to_id"], "FOLLOWS") + if not self.graph_store.edge_exists( + seq["from_id"], seq["to_id"], "FOLLOWS", user_name=user_name + ): + self.graph_store.add_edge( + seq["from_id"], seq["to_id"], "FOLLOWS", user_name=user_name + ) # 4) Add aggregate concept nodes for agg_node in results["aggregate_nodes"]: @@ -378,9 +404,12 @@ def _process_cluster_and_write( agg_node.id, agg_node.memory, agg_node.metadata.model_dump(exclude_none=True), + user_name=user_name, ) for child_id in agg_node.metadata.sources: - self.graph_store.add_edge(agg_node.id, child_id, "AGGREGATE_TO") + self.graph_store.add_edge( + agg_node.id, child_id, "AGGREGATE_TO", user_name=user_name + ) logger.info("[Reorganizer] Cluster relation/reasoning done.") @@ -577,7 +606,7 @@ def _parse_json_result(self, response_text): ) return {} - def _create_parent_node(self, parent_node: GraphDBNode) -> None: + def _create_parent_node(self, parent_node: GraphDBNode, user_name: str | None = None) -> None: """ Create a new parent node for the cluster. """ @@ -585,17 +614,23 @@ def _create_parent_node(self, parent_node: GraphDBNode) -> None: parent_node.id, parent_node.memory, parent_node.metadata.model_dump(exclude_none=True), + user_name=user_name, ) - def _link_cluster_nodes(self, parent_node: GraphDBNode, child_nodes: list[GraphDBNode]): + def _link_cluster_nodes( + self, + parent_node: GraphDBNode, + child_nodes: list[GraphDBNode], + user_name: str | None = None, + ): """ Add PARENT edges from the parent node to all nodes in the cluster. """ for child in child_nodes: if not self.graph_store.edge_exists( - parent_node.id, child.id, "PARENT", direction="OUTGOING" + parent_node.id, child.id, "PARENT", direction="OUTGOING", user_name=user_name ): - self.graph_store.add_edge(parent_node.id, child.id, "PARENT") + self.graph_store.add_edge(parent_node.id, child.id, "PARENT", user_name=user_name) def _preprocess_message(self, message: QueueMessage) -> bool: message = self._convert_id_to_node(message) @@ -613,7 +648,9 @@ def _convert_id_to_node(self, message: QueueMessage) -> QueueMessage: for i, node in enumerate(message.after_node or []): if not isinstance(node, str): continue - raw_node = self.graph_store.get_node(node, include_embedding=True) + raw_node = self.graph_store.get_node( + node, include_embedding=True, user_name=message.user_name + ) if raw_node is None: logger.debug(f"Node with ID {node} not found in the graph store.") message.after_node[i] = None From b207363e095f2c0f38fbe6d760577b0ddb947a96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 26 Feb 2026 19:35:06 +0800 Subject: [PATCH 2/4] fix: user_name --- .../textual/tree_text_memory/organize/history_manager.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/memos/memories/textual/tree_text_memory/organize/history_manager.py b/src/memos/memories/textual/tree_text_memory/organize/history_manager.py index 1afdc9281..132582a0d 100644 --- a/src/memos/memories/textual/tree_text_memory/organize/history_manager.py +++ b/src/memos/memories/textual/tree_text_memory/organize/history_manager.py @@ -141,6 +141,7 @@ def mark_memory_status( self, memory_items: list[TextualMemoryItem], status: Literal["activated", "resolving", "archived", "deleted"], + user_name: str | None = None, ) -> None: """ Support status marking operations during history management. Common usages are: @@ -157,6 +158,7 @@ def mark_memory_status( self.graph_db.update_node, id=mem.id, fields={"status": status}, + user_name=user_name, ) ) From 666e8c0003214053ee172f1b54545a2c79f4f641 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 26 Feb 2026 21:13:48 +0800 Subject: [PATCH 3/4] feat: delete some logs --- src/memos/embedders/universal_api.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/memos/embedders/universal_api.py b/src/memos/embedders/universal_api.py index 538d913ea..2b3bd0967 100644 --- a/src/memos/embedders/universal_api.py +++ b/src/memos/embedders/universal_api.py @@ -73,7 +73,6 @@ async def _create_embeddings(): ) ) logger.info(f"Embeddings request succeeded with {time.time() - init_time} seconds") - logger.info(f"Embeddings request response: {response}") return [r.embedding for r in response.data] except Exception as e: if self.use_backup_client: From c9b5d4eeaf04bd1a13c359cca1a532dd0c796349 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 26 Feb 2026 21:22:41 +0800 Subject: [PATCH 4/4] fix: test_history_manager --- tests/memories/textual/test_history_manager.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/memories/textual/test_history_manager.py b/tests/memories/textual/test_history_manager.py index 46cf3a1f6..a6ac186b7 100644 --- a/tests/memories/textual/test_history_manager.py +++ b/tests/memories/textual/test_history_manager.py @@ -131,7 +131,7 @@ def test_mark_memory_status(history_manager, mock_graph_db): # Assert assert mock_graph_db.update_node.call_count == 3 - # Verify we called it correctly - mock_graph_db.update_node.assert_any_call(id=id1, fields={"status": status}) - mock_graph_db.update_node.assert_any_call(id=id2, fields={"status": status}) - mock_graph_db.update_node.assert_any_call(id=id3, fields={"status": status}) + # Verify we called it correctly (user_name=None is passed by mark_memory_status) + mock_graph_db.update_node.assert_any_call(id=id1, fields={"status": status}, user_name=None) + mock_graph_db.update_node.assert_any_call(id=id2, fields={"status": status}, user_name=None) + mock_graph_db.update_node.assert_any_call(id=id3, fields={"status": status}, user_name=None)