From 773c1fe405d3a554bfc3902845b1295e1babd280 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=A7=E6=B4=8B?= <714403855@qq.com> Date: Thu, 26 Feb 2026 11:38:33 +0800 Subject: [PATCH 1/6] feat: add return_fields for search_by_embedding --- src/memos/graph_dbs/polardb.py | 57 ++++++++++++++++++---------------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index 2856a816a..baabd787c 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -2047,24 +2047,21 @@ def search_by_fulltext( @timed def search_by_embedding( - self, - vector: list[float], - user_name: str, - top_k: int = 5, - scope: str | None = None, - status: str | None = None, - threshold: float | None = None, - search_filter: dict | None = None, - filter: dict | None = None, - knowledgebase_ids: list[str] | None = None, - **kwargs, + self, + vector: list[float], + user_name: str, + top_k: int = 5, + scope: str | None = None, + status: str | None = None, + threshold: float | None = None, + search_filter: dict | None = None, + filter: dict | None = None, + knowledgebase_ids: list[str] | None = None, + return_fields: list[str] | None = None, + **kwargs, ) -> list[dict]: - """ - Retrieve node IDs based on vector similarity using PostgreSQL vector operations. - """ - logger.info( - f"search_by_embedding user_name:{user_name},filter: {filter}, knowledgebase_ids: {knowledgebase_ids},scope:{scope},status:{status},search_filter:{search_filter},filter:{filter},knowledgebase_ids:{knowledgebase_ids}" - ) + logger.info(f"search_by_embedding user_name:{user_name},filter: {filter}, knowledgebase_ids: {knowledgebase_ids},scope:{scope},status:{status},search_filter:{search_filter},filter:{filter},knowledgebase_ids:{knowledgebase_ids}" + ) start_time = time.time() where_clauses = [] if scope: @@ -2140,16 +2137,21 @@ def search_by_embedding( else: pass - logger.info(f" search_by_embedding query: {query}, params: {params}") + logger.info(f"[search_by_embedding] query: {query}, params: {params}") conn = None try: conn = self._get_connection() with conn.cursor() as cursor: - if params: - cursor.execute(query, params) - else: - cursor.execute(query) + try: + # If params is empty, execute query directly without parameters + if params: + cursor.execute(query, params) + else: + cursor.execute(query) + except Exception as e: + logger.error(f"[search_by_embedding] Error executing query: {e}") + raise results = cursor.fetchall() output = [] for row in results: @@ -2164,15 +2166,18 @@ def search_by_embedding( score_val = float(score) score_val = (score_val + 1) / 2 # align to neo4j, Normalized Cosine Score if threshold is None or score_val >= threshold: - output.append({"id": id_val, "score": score_val}) + item = {"id": id_val, "score": score_val} + if return_fields: + properties = row[1] # properties column + item.update( + self._extract_fields_from_properties(properties, return_fields) + ) + output.append(item) elapsed_time = time.time() - start_time logger.info( f" polardb search_by_embedding query embedding completed time in {elapsed_time:.2f}s" ) return output[:top_k] - except Exception as e: - logger.error(f"[search_by_embedding] Error executing query: {e}") - raise finally: self._return_connection(conn) From ba4a51a536303f546fae36b9fcf7102765a43e5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=A7=E6=B4=8B?= <714403855@qq.com> Date: Thu, 26 Feb 2026 11:39:55 +0800 Subject: [PATCH 2/6] feat: remove unused edges --- src/memos/graph_dbs/polardb.py | 154 +-------------------------------- 1 file changed, 1 insertion(+), 153 deletions(-) diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index baabd787c..720ed6fe5 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -2721,159 +2721,7 @@ def export_graph( finally: self._return_connection(conn) - conn = None - try: - conn = self._get_connection() - # Build Cypher WHERE conditions for edges - cypher_where_conditions = [] - if user_name: - cypher_where_conditions.append(f"a.user_name = '{user_name}'") - cypher_where_conditions.append(f"b.user_name = '{user_name}'") - if user_id: - cypher_where_conditions.append(f"a.user_id = '{user_id}'") - cypher_where_conditions.append(f"b.user_id = '{user_id}'") - - # Add memory_type filter condition for edges (apply to both source and target nodes) - if memory_type and isinstance(memory_type, list) and len(memory_type) > 0: - # Escape single quotes in memory_type values for Cypher - escaped_memory_types = [mt.replace("'", "\\'") for mt in memory_type] - memory_type_list_str = ", ".join([f"'{mt}'" for mt in escaped_memory_types]) - # Cypher IN syntax: a.memory_type IN ['LongTermMemory', 'WorkingMemory'] - cypher_where_conditions.append(f"a.memory_type IN [{memory_type_list_str}]") - cypher_where_conditions.append(f"b.memory_type IN [{memory_type_list_str}]") - - # Add status filter for edges: if not passed, exclude deleted; otherwise filter by IN list - if status is None: - # Default behavior: exclude deleted entries - cypher_where_conditions.append("a.status <> 'deleted' AND b.status <> 'deleted'") - elif isinstance(status, list) and len(status) > 0: - escaped_statuses = [st.replace("'", "\\'") for st in status] - status_list_str = ", ".join([f"'{st}'" for st in escaped_statuses]) - cypher_where_conditions.append(f"a.status IN [{status_list_str}]") - cypher_where_conditions.append(f"b.status IN [{status_list_str}]") - - # Build filter conditions for edges (apply to both source and target nodes) - filter_where_clause = self._build_filter_conditions_cypher(filter) - logger.info(f"[export_graph edges] filter_where_clause: {filter_where_clause}") - if filter_where_clause: - # _build_filter_conditions_cypher returns a string that starts with " AND " if filter exists - # Remove the leading " AND " and replace n. with a. for source node and b. for target node - filter_clause = filter_where_clause.strip() - if filter_clause.startswith("AND "): - filter_clause = filter_clause[4:].strip() - # Replace n. with a. for source node and create a copy for target node - source_filter = filter_clause.replace("n.", "a.") - target_filter = filter_clause.replace("n.", "b.") - # Combine source and target filters with AND - combined_filter = f"({source_filter}) AND ({target_filter})" - cypher_where_conditions.append(combined_filter) - - cypher_where_clause = "" - if cypher_where_conditions: - cypher_where_clause = f"WHERE {' AND '.join(cypher_where_conditions)}" - - # Get total count of edges before pagination - count_edge_query = f""" - SELECT COUNT(*) - FROM ( - SELECT * FROM cypher('{self.db_name}_graph', $$ - MATCH (a:Memory)-[r]->(b:Memory) - {cypher_where_clause} - RETURN a.id AS source, b.id AS target, type(r) as edge - $$) AS (source agtype, target agtype, edge agtype) - ) AS edges - """ - logger.info(f"[export_graph edges count] Query: {count_edge_query}") - with conn.cursor() as cursor: - cursor.execute(count_edge_query) - total_edges = cursor.fetchone()[0] - - # Export edges using cypher query - # Note: Apache AGE Cypher may not support SKIP, so we use SQL LIMIT/OFFSET on the subquery - # Build pagination clause if needed - edge_pagination_clause = "" - if use_pagination: - edge_pagination_clause = f"LIMIT {page_size} OFFSET {offset}" - - edge_query = f""" - SELECT source, target, edge FROM ( - SELECT * FROM cypher('{self.db_name}_graph', $$ - MATCH (a:Memory)-[r]->(b:Memory) - {cypher_where_clause} - RETURN a.id AS source, b.id AS target, type(r) as edge - ORDER BY COALESCE(a.created_at, '1970-01-01T00:00:00') DESC, - COALESCE(b.created_at, '1970-01-01T00:00:00') DESC, - a.id DESC, b.id DESC - $$) AS (source agtype, target agtype, edge agtype) - ) AS edges - {edge_pagination_clause} - """ - logger.info(f"[export_graph edges] Query: {edge_query}") - with conn.cursor() as cursor: - cursor.execute(edge_query) - edge_results = cursor.fetchall() - edges = [] - - for row in edge_results: - source_agtype, target_agtype, edge_agtype = row - - # Extract and clean source - source_raw = ( - source_agtype.value - if hasattr(source_agtype, "value") - else str(source_agtype) - ) - if ( - isinstance(source_raw, str) - and source_raw.startswith('"') - and source_raw.endswith('"') - ): - source = source_raw[1:-1] - else: - source = str(source_raw) - - # Extract and clean target - target_raw = ( - target_agtype.value - if hasattr(target_agtype, "value") - else str(target_agtype) - ) - if ( - isinstance(target_raw, str) - and target_raw.startswith('"') - and target_raw.endswith('"') - ): - target = target_raw[1:-1] - else: - target = str(target_raw) - - # Extract and clean edge type - type_raw = ( - edge_agtype.value if hasattr(edge_agtype, "value") else str(edge_agtype) - ) - if ( - isinstance(type_raw, str) - and type_raw.startswith('"') - and type_raw.endswith('"') - ): - edge_type = type_raw[1:-1] - else: - edge_type = str(type_raw) - - edges.append( - { - "source": source, - "target": target, - "type": edge_type, - } - ) - - except Exception as e: - logger.error(f"[EXPORT GRAPH - EDGES] Exception: {e}", exc_info=True) - raise RuntimeError(f"[EXPORT GRAPH - EDGES] Exception: {e}") from e - finally: - self._return_connection(conn) - + edges = [] return { "nodes": nodes, "edges": edges, From 301f3a176334432beb46486070bab8bbbcf0e8ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=A7=E6=B4=8B?= <714403855@qq.com> Date: Thu, 26 Feb 2026 11:42:31 +0800 Subject: [PATCH 3/6] feat: optimize log --- src/memos/graph_dbs/polardb.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index 720ed6fe5..7d596bd0e 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -282,8 +282,8 @@ def _get_connection(self): try: # Try to get pool stats if available pool_info = f"Pool config: minconn={self.connection_pool.minconn}, maxconn={self.connection_pool.maxconn}" - logger.warning( - f"[_get_connection] Connection pool exhausted (attempt {attempt + 1}/{max_retries}). {pool_info}" + logger.info( + f" polardb get_connection Connection pool exhausted (attempt {attempt + 1}/{max_retries}). {pool_info}" ) except Exception: logger.warning( @@ -2859,7 +2859,7 @@ def get_all_memory_items( node_ids.add(node_id) except Exception as e: - logger.error(f"Failed to get memories: {e}", exc_info=True) + logger.warning(f"Failed to get memories: {e}", exc_info=True) finally: self._return_connection(conn) From cc7bd7176dd23ae3ce4ad1ffda90e279d81836b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=A7=E6=B4=8B?= <714403855@qq.com> Date: Thu, 26 Feb 2026 11:53:10 +0800 Subject: [PATCH 4/6] feat: optimize log --- src/memos/graph_dbs/polardb.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index 7d596bd0e..edd877a74 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -2060,7 +2060,7 @@ def search_by_embedding( return_fields: list[str] | None = None, **kwargs, ) -> list[dict]: - logger.info(f"search_by_embedding user_name:{user_name},filter: {filter}, knowledgebase_ids: {knowledgebase_ids},scope:{scope},status:{status},search_filter:{search_filter},filter:{filter},knowledgebase_ids:{knowledgebase_ids}" + logger.info(f"search_by_embedding user_name:{user_name},filter: {filter}, knowledgebase_ids: {knowledgebase_ids},scope:{scope},status:{status},search_filter:{search_filter},filter:{filter},knowledgebase_ids:{knowledgebase_ids},return_fields:{return_fields}" ) start_time = time.time() where_clauses = [] From 9a9af0404d9a414641ea6d48168e4fd9630efef5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=A7=E6=B4=8B?= <714403855@qq.com> Date: Thu, 26 Feb 2026 11:54:04 +0800 Subject: [PATCH 5/6] feat: optimize log --- src/memos/graph_dbs/polardb.py | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index edd877a74..dafcebe38 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -2047,21 +2047,22 @@ def search_by_fulltext( @timed def search_by_embedding( - self, - vector: list[float], - user_name: str, - top_k: int = 5, - scope: str | None = None, - status: str | None = None, - threshold: float | None = None, - search_filter: dict | None = None, - filter: dict | None = None, - knowledgebase_ids: list[str] | None = None, - return_fields: list[str] | None = None, - **kwargs, + self, + vector: list[float], + user_name: str, + top_k: int = 5, + scope: str | None = None, + status: str | None = None, + threshold: float | None = None, + search_filter: dict | None = None, + filter: dict | None = None, + knowledgebase_ids: list[str] | None = None, + return_fields: list[str] | None = None, + **kwargs, ) -> list[dict]: - logger.info(f"search_by_embedding user_name:{user_name},filter: {filter}, knowledgebase_ids: {knowledgebase_ids},scope:{scope},status:{status},search_filter:{search_filter},filter:{filter},knowledgebase_ids:{knowledgebase_ids},return_fields:{return_fields}" - ) + logger.info( + f"search_by_embedding user_name:{user_name},filter: {filter}, knowledgebase_ids: {knowledgebase_ids},scope:{scope},status:{status},search_filter:{search_filter},filter:{filter},knowledgebase_ids:{knowledgebase_ids},return_fields:{return_fields}" + ) start_time = time.time() where_clauses = [] if scope: From 79eb46ecad6fdcb3c21e69b7f539d8c8f8ddb2cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=A7=E6=B4=8B?= <714403855@qq.com> Date: Thu, 26 Feb 2026 11:59:16 +0800 Subject: [PATCH 6/6] feat: optimize log --- src/memos/graph_dbs/polardb.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index dafcebe38..7a7e6ef97 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -2144,15 +2144,10 @@ def search_by_embedding( try: conn = self._get_connection() with conn.cursor() as cursor: - try: - # If params is empty, execute query directly without parameters - if params: - cursor.execute(query, params) - else: - cursor.execute(query) - except Exception as e: - logger.error(f"[search_by_embedding] Error executing query: {e}") - raise + if params: + cursor.execute(query, params) + else: + cursor.execute(query) results = cursor.fetchall() output = [] for row in results: