From c505861b858a4a696dd0c44f928a75bb16740c27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=A7=E6=B4=8B?= <714403855@qq.com> Date: Wed, 11 Mar 2026 21:07:09 +0800 Subject: [PATCH 1/5] feat:optimize pool && log --- src/memos/graph_dbs/polardb.py | 52 ++++++++++++++++++---------------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index a87c83510..af283655f 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -4,11 +4,12 @@ import threading import time -from contextlib import contextmanager +from contextlib import contextmanager, suppress from datetime import datetime from typing import Any, Literal import numpy as np +import psycopg2 from memos.configs.graph_db import PolarDBGraphDBConfig from memos.dependency import require_python_package @@ -248,14 +249,9 @@ def _warm_up_connections_by_all(self): @contextmanager def _get_connection(self): timeout = self._connection_wait_timeout - if timeout <= 0: - self._semaphore.acquire() - else: - if not self._semaphore.acquire(timeout=timeout): - logger.warning(f"Timeout waiting for connection slot ({timeout}s)") - raise RuntimeError( - f"Connection pool busy: acquire a slot within {timeout}s (all connections in use)." - ) + if not self._semaphore.acquire(timeout=max(timeout, 0)): + logger.warning(f"Timeout waiting for connection slot ({timeout}s)") + raise RuntimeError("Connection pool busy") logger.info( "Connection pool usage: %s/%s", self.connection_pool.maxconn - self._semaphore._value, @@ -263,25 +259,31 @@ def _get_connection(self): ) conn = None broken = False - try: conn = self.connection_pool.getconn() - logger.debug(f"Acquired connection {id(conn)} from pool") conn.autocommit = True + for attempt in range(2): + try: + with conn.cursor() as cur: + cur.execute("SELECT 1") + break + except psycopg2.Error: + logger.warning("Dead connection detected, recreating (attempt %d)", attempt + 1) + self.connection_pool.putconn(conn, close=True) + conn = self.connection_pool.getconn() + conn.autocommit = True + else: + raise RuntimeError("Cannot obtain valid DB connection after 2 attempts") with conn.cursor() as cur: cur.execute(f'SET search_path = {self.db_name}_graph, ag_catalog, "$user", public;') yield conn - except Exception as e: + except Exception: broken = True - logger.exception(f"Connection failed or broken: {e}") raise finally: if conn: - try: + with suppress(Exception): self.connection_pool.putconn(conn, close=broken) - logger.debug(f"Returned connection {id(conn)} to pool (broken={broken})") - except Exception as e: - logger.warning(f"Failed to return connection to pool: {e}") self._semaphore.release() def _ensure_database_exists(self): @@ -1814,7 +1816,7 @@ def search_by_fulltext( properties = row[2] # properties column item.update(self._extract_fields_from_properties(properties, return_fields)) output.append(item) - elapsed = (time.perf_counter() - start_time) * 1000 + elapsed = (time.perf_counter() - start_time) * 1000.0 logger.info("search_by_fulltext internal took %.1f ms", elapsed) return output[:top_k] @@ -1945,7 +1947,7 @@ def search_by_embedding( properties = row[1] # properties column item.update(self._extract_fields_from_properties(properties, return_fields)) output.append(item) - elapsed_time = time.perf_counter() - start_time + elapsed_time = (time.perf_counter() - start_time) * 1000.0 logger.info( "search_by_embedding query embedding completed time took %.1f ms", elapsed_time ) @@ -2500,7 +2502,7 @@ def _extract_special_filter_values(filter_obj): except Exception as e: logger.error(f"[EXPORT GRAPH - NODES] Exception: {e}", exc_info=True) raise RuntimeError(f"[EXPORT GRAPH - NODES] Exception: {e}") from e - elapsed = (time.perf_counter() - start_time) * 1000 + elapsed = (time.perf_counter() - start_time) * 1000.0 logger.info("export internal took %.1f ms", elapsed) edges = [] @@ -3360,7 +3362,7 @@ def add_nodes_batch( logger.warning( f"[add_nodes_batch] Failed to deallocate {prepare_name}: {dealloc_error}" ) - elapsed_time = time.perf_counter() - batch_start_time + elapsed_time = (time.perf_counter() - batch_start_time) * 1000.0 logger.info( "add_nodes_batch batch insert completed successfully in took %.1f ms", elapsed_time, @@ -4815,7 +4817,7 @@ def delete_node_by_prams( """ batch_start_time = time.time() logger.info( - f"[delete_node_by_prams] memory_ids: {memory_ids}, file_ids: {file_ids}, filter: {filter}, writable_cube_ids: {writable_cube_ids}" + f" delete_node_by_prams memory_ids: {memory_ids}, file_ids: {file_ids}, filter: {filter}, writable_cube_ids: {writable_cube_ids}" ) # Build user_name condition from writable_cube_ids (OR relationship - match any cube_id) @@ -4889,7 +4891,7 @@ def delete_node_by_prams( DELETE FROM "{self.db_name}_graph"."Memory" WHERE {where_clause} """ - logger.info(f"[delete_node_by_prams] delete_query: {delete_query}") + logger.info(f" delete_node_by_prams delete_query: {delete_query}") cursor.execute(delete_query) deleted_count = cursor.rowcount @@ -4897,9 +4899,9 @@ def delete_node_by_prams( logger.info(f"[delete_node_by_prams] Deleted {deleted_count} nodes") - elapsed_time = time.time() - batch_start_time + elapsed_time = (time.time() - batch_start_time) * 1000.0 logger.info( - f"[delete_node_by_prams] Deletion completed successfully in {elapsed_time:.2f}s, total deleted {total_deleted_count} nodes" + f"delete_node_by_prams Deletion completed successfully in {elapsed_time:.2f}s, total deleted {total_deleted_count} nodes" ) except Exception as e: logger.error(f"[delete_node_by_prams] Failed to delete nodes: {e}", exc_info=True) From f34b6ddd9c43939942920c3f32f71993640e04a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=A7=E6=B4=8B?= <714403855@qq.com> Date: Wed, 11 Mar 2026 21:12:55 +0800 Subject: [PATCH 2/5] feat:optimize pool && log --- src/memos/graph_dbs/polardb.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index af283655f..7ba06eacd 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -9,7 +9,6 @@ from typing import Any, Literal import numpy as np -import psycopg2 from memos.configs.graph_db import PolarDBGraphDBConfig from memos.dependency import require_python_package From c45eb0f48ac79b34390ee3abb6e6cc18221ce583 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=A7=E6=B4=8B?= <714403855@qq.com> Date: Wed, 11 Mar 2026 21:21:26 +0800 Subject: [PATCH 3/5] feat:optimize pool && log --- src/memos/graph_dbs/polardb.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index 7ba06eacd..af283655f 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -9,6 +9,7 @@ from typing import Any, Literal import numpy as np +import psycopg2 from memos.configs.graph_db import PolarDBGraphDBConfig from memos.dependency import require_python_package From c98e55cc84db69c48f8e8acff5d4d5442d5a3e8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=A7=E6=B4=8B?= <714403855@qq.com> Date: Wed, 11 Mar 2026 21:24:41 +0800 Subject: [PATCH 4/5] feat:optimize pool && log --- src/memos/graph_dbs/polardb.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index af283655f..014c5490c 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -9,7 +9,6 @@ from typing import Any, Literal import numpy as np -import psycopg2 from memos.configs.graph_db import PolarDBGraphDBConfig from memos.dependency import require_python_package @@ -248,6 +247,8 @@ def _warm_up_connections_by_all(self): @contextmanager def _get_connection(self): + import psycopg2 + timeout = self._connection_wait_timeout if not self._semaphore.acquire(timeout=max(timeout, 0)): logger.warning(f"Timeout waiting for connection slot ({timeout}s)") From 1fb04291b9228a660b413116585092ed53694051 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=A4=A7=E6=B4=8B?= <714403855@qq.com> Date: Wed, 11 Mar 2026 21:37:02 +0800 Subject: [PATCH 5/5] feat:optimize pool && log --- src/memos/graph_dbs/polardb.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index 014c5490c..d740ad1d2 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -4,7 +4,7 @@ import threading import time -from contextlib import contextmanager, suppress +from contextlib import contextmanager from datetime import datetime from typing import Any, Literal @@ -283,8 +283,11 @@ def _get_connection(self): raise finally: if conn: - with suppress(Exception): + try: self.connection_pool.putconn(conn, close=broken) + logger.debug(f"Returned connection {id(conn)} to pool (broken={broken})") + except Exception as e: + logger.warning(f"Failed to return connection to pool: {e}") self._semaphore.release() def _ensure_database_exists(self):