From db432ad0085a6c0c3c7aecc30957ec2aff7e4dbd Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Wed, 22 Oct 2025 19:29:38 +0800 Subject: [PATCH 01/36] feat: update log context --- src/memos/api/middleware/request_context.py | 25 +++++++- src/memos/context/context.py | 67 +++++++++++++++++++-- src/memos/log.py | 48 +++++++++++---- 3 files changed, 122 insertions(+), 18 deletions(-) diff --git a/src/memos/api/middleware/request_context.py b/src/memos/api/middleware/request_context.py index cb41428d4..aabb61b64 100644 --- a/src/memos/api/middleware/request_context.py +++ b/src/memos/api/middleware/request_context.py @@ -2,6 +2,8 @@ Request context middleware for automatic trace_id injection. """ +import time + from collections.abc import Callable from starlette.middleware.base import BaseHTTPMiddleware @@ -38,8 +40,19 @@ async def dispatch(self, request: Request, call_next: Callable) -> Response: # Extract or generate trace_id trace_id = extract_trace_id_from_headers(request) or generate_trace_id() + env = request.headers.get("x-env") + user_type = request.headers.get("x-user-type") + user_name = request.headers.get("x-user-name") + start_time = time.time() + # Create and set request context - context = RequestContext(trace_id=trace_id, api_path=request.url.path) + context = RequestContext( + trace_id=trace_id, + api_path=request.url.path, + env=env, + user_type=user_type, + user_name=user_name, + ) set_request_context(context) # Log request start with parameters @@ -49,15 +62,21 @@ async def dispatch(self, request: Request, call_next: Callable) -> Response: if request.query_params: params_log["query_params"] = dict(request.query_params) - logger.info(f"Request started: {request.method} {request.url.path}, {params_log}") + logger.info(f"Request started, params: {params_log}, headers: {request.headers}") # Process the request response = await call_next(request) + end_time = time.time() # Log request completion with output - logger.info(f"Request completed: {request.url.path}, status: {response.status_code}") + logger.info( + f"Request completed: {request.url.path}, status: {response.status_code}, cost: {end_time - start_time}s" + ) # Add trace_id to response headers for debugging response.headers["x-trace-id"] = trace_id + response.headers["x-env"] = env + response.headers["x-user-type"] = user_type + response.headers["x-user-name"] = user_name return response diff --git a/src/memos/context/context.py b/src/memos/context/context.py index 4f54348fb..9d77e19e4 100644 --- a/src/memos/context/context.py +++ b/src/memos/context/context.py @@ -93,6 +93,36 @@ def get_current_api_path() -> str | None: return None +def get_current_env() -> str | None: + """ + Get the current request's env. + """ + context = _request_context.get() + if context: + return context.get("env") + return "prod" + + +def get_current_user_type() -> str | None: + """ + Get the current request's user type. + """ + context = _request_context.get() + if context: + return context.get("user_type") + return "normal" + + +def get_current_user_name() -> str | None: + """ + Get the current request's user name. + """ + context = _request_context.get() + if context: + return context.get("user_name") + return "unknown" + + def get_current_context() -> RequestContext | None: """ Get the current request context. @@ -103,7 +133,11 @@ def get_current_context() -> RequestContext | None: context_dict = _request_context.get() if context_dict: ctx = RequestContext( - trace_id=context_dict.get("trace_id"), api_path=context_dict.get("api_path") + trace_id=context_dict.get("trace_id"), + api_path=context_dict.get("api_path"), + env=context_dict.get("env"), + user_type=context_dict.get("user_type"), + user_name=context_dict.get("user_name"), ) ctx._data = context_dict.get("data", {}).copy() return ctx @@ -141,6 +175,9 @@ def __init__(self, target, args=(), kwargs=None, **thread_kwargs): self.main_trace_id = get_current_trace_id() self.main_api_path = get_current_api_path() + self.main_env = get_current_env() + self.main_user_type = get_current_user_type() + self.main_user_name = get_current_user_name() self.main_context = get_current_context() def run(self): @@ -148,7 +185,11 @@ def run(self): if self.main_context: # Copy the context data child_context = RequestContext( - trace_id=self.main_trace_id, api_path=self.main_context.api_path + trace_id=self.main_trace_id, + api_path=self.main_api_path, + env=self.main_env, + user_type=self.main_user_type, + user_name=self.main_user_name, ) child_context._data = self.main_context._data.copy() @@ -171,13 +212,22 @@ def submit(self, fn: Callable[..., T], *args: Any, **kwargs: Any) -> Any: """ main_trace_id = get_current_trace_id() main_api_path = get_current_api_path() + main_env = get_current_env() + main_user_type = get_current_user_type() + main_user_name = get_current_user_name() main_context = get_current_context() @functools.wraps(fn) def wrapper(*args: Any, **kwargs: Any) -> Any: if main_context: # Create and set new context in worker thread - child_context = RequestContext(trace_id=main_trace_id, api_path=main_api_path) + child_context = RequestContext( + trace_id=main_trace_id, + api_path=main_api_path, + env=main_env, + user_type=main_user_type, + user_name=main_user_name, + ) child_context._data = main_context._data.copy() set_request_context(child_context) @@ -198,13 +248,22 @@ def map( """ main_trace_id = get_current_trace_id() main_api_path = get_current_api_path() + main_env = get_current_env() + main_user_type = get_current_user_type() + main_user_name = get_current_user_name() main_context = get_current_context() @functools.wraps(fn) def wrapper(*args: Any, **kwargs: Any) -> Any: if main_context: # Create and set new context in worker thread - child_context = RequestContext(trace_id=main_trace_id, api_path=main_api_path) + child_context = RequestContext( + trace_id=main_trace_id, + api_path=main_api_path, + env=main_env, + user_type=main_user_type, + user_name=main_user_name, + ) child_context._data = main_context._data.copy() set_request_context(child_context) diff --git a/src/memos/log.py b/src/memos/log.py index 339d13f26..a610d4011 100644 --- a/src/memos/log.py +++ b/src/memos/log.py @@ -14,7 +14,13 @@ from dotenv import load_dotenv from memos import settings -from memos.context.context import get_current_api_path, get_current_trace_id +from memos.context.context import ( + get_current_api_path, + get_current_env, + get_current_trace_id, + get_current_user_name, + get_current_user_type, +) # Load environment variables @@ -34,15 +40,21 @@ def _setup_logfile() -> Path: return logfile -class TraceIDFilter(logging.Filter): - """add trace_id to the log record""" +class ContextFilter(logging.Filter): + """add context to the log record""" def filter(self, record): try: trace_id = get_current_trace_id() record.trace_id = trace_id if trace_id else "trace-id" + record.env = get_current_env() + record.user_type = get_current_user_type() + record.user_name = get_current_user_name() except Exception: record.trace_id = "trace-id" + record.env = "prod" + record.user_type = "normal" + record.user_name = "unknown" return True @@ -86,13 +98,24 @@ def emit(self, record): try: trace_id = get_current_trace_id() or "trace-id" api_path = get_current_api_path() + env = get_current_env() + user_type = get_current_user_type() + user_name = get_current_user_name() if api_path is not None: - self._executor.submit(self._send_log_sync, record.getMessage(), trace_id, api_path) + self._executor.submit( + self._send_log_sync, + record.getMessage(), + trace_id, + api_path, + env, + user_type, + user_name, + ) except Exception as e: if not self._is_shutting_down.is_set(): print(f"Error sending log: {e}") - def _send_log_sync(self, message, trace_id, api_path): + def _send_log_sync(self, message, trace_id, api_path, env, user_type, user_name): """Send log message synchronously in a separate thread""" try: logger_url = os.getenv("CUSTOM_LOGGER_URL") @@ -104,6 +127,9 @@ def _send_log_sync(self, message, trace_id, api_path): "trace_id": trace_id, "action": api_path, "current_time": round(time.time(), 3), + "env": env, + "user_type": user_type, + "user_name": user_name, } # Add auth token if exists @@ -145,18 +171,18 @@ def close(self): "disable_existing_loggers": False, "formatters": { "standard": { - "format": "%(asctime)s [%(trace_id)s] - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" + "format": "%(asctime)s [%(trace_id)s] - %(env)s | %(user_type)s | %(user_name)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" }, "no_datetime": { - "format": "[%(trace_id)s] - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" + "format": "[%(trace_id)s] - %(env)s | %(user_type)s | %(user_name)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" }, "simplified": { - "format": "%(asctime)s | %(trace_id)s | %(levelname)s | %(filename)s:%(lineno)d: %(funcName)s | %(message)s" + "format": "%(asctime)s | %(trace_id)s | %(env)s | %(user_type)s | %(user_name)s | %(levelname)s | %(filename)s:%(lineno)d: %(funcName)s | %(message)s" }, }, "filters": { "package_tree_filter": {"()": "logging.Filter", "name": settings.LOG_FILTER_TREE_PREFIX}, - "trace_id_filter": {"()": "memos.log.TraceIDFilter"}, + "context_filter": {"()": "memos.log.ContextFilter"}, }, "handlers": { "console": { @@ -164,7 +190,7 @@ def close(self): "class": "logging.StreamHandler", "stream": stdout, "formatter": "no_datetime", - "filters": ["package_tree_filter", "trace_id_filter"], + "filters": ["package_tree_filter", "context_filter"], }, "file": { "level": "DEBUG", @@ -173,7 +199,7 @@ def close(self): "maxBytes": 1024**2 * 10, "backupCount": 10, "formatter": "standard", - "filters": ["trace_id_filter"], + "filters": ["context_filter"], }, "custom_logger": { "level": "INFO", From 9502acc18aec4340e13edb58781b1cc73c4d59ac Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Wed, 22 Oct 2025 20:43:36 +0800 Subject: [PATCH 02/36] feat: update log context --- src/memos/context/context.py | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/src/memos/context/context.py b/src/memos/context/context.py index 9d77e19e4..2eecca92c 100644 --- a/src/memos/context/context.py +++ b/src/memos/context/context.py @@ -29,9 +29,19 @@ class RequestContext: This provides a Flask g-like object for FastAPI applications. """ - def __init__(self, trace_id: str | None = None, api_path: str | None = None): + def __init__( + self, + trace_id: str | None = None, + api_path: str | None = None, + env: str | None = None, + user_type: str | None = None, + user_name: str | None = None, + ): self.trace_id = trace_id or "trace-id" self.api_path = api_path + self.env = env + self.user_type = user_type + self.user_name = user_name self._data: dict[str, Any] = {} def set(self, key: str, value: Any) -> None: @@ -43,7 +53,13 @@ def get(self, key: str, default: Any | None = None) -> Any: return self._data.get(key, default) def __setattr__(self, name: str, value: Any) -> None: - if name.startswith("_") or name in ("trace_id", "api_path"): + if name.startswith("_") or name in ( + "trace_id", + "api_path", + "env", + "user_type", + "user_name", + ): super().__setattr__(name, value) else: if not hasattr(self, "_data"): @@ -58,7 +74,14 @@ def __getattr__(self, name: str) -> Any: def to_dict(self) -> dict[str, Any]: """Convert context to dictionary.""" - return {"trace_id": self.trace_id, "api_path": self.api_path, "data": self._data.copy()} + return { + "trace_id": self.trace_id, + "api_path": self.api_path, + "env": self.env, + "user_type": self.user_type, + "user_name": self.user_name, + "data": self._data.copy(), + } def set_request_context(context: RequestContext) -> None: From d74e62898f55e54906824a58be2d8219e4e086d0 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Thu, 23 Oct 2025 17:33:51 +0800 Subject: [PATCH 03/36] feat: update mcp --- src/memos/api/middleware/request_context.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/memos/api/middleware/request_context.py b/src/memos/api/middleware/request_context.py index aabb61b64..42e1fbb65 100644 --- a/src/memos/api/middleware/request_context.py +++ b/src/memos/api/middleware/request_context.py @@ -73,10 +73,4 @@ async def dispatch(self, request: Request, call_next: Callable) -> Response: f"Request completed: {request.url.path}, status: {response.status_code}, cost: {end_time - start_time}s" ) - # Add trace_id to response headers for debugging - response.headers["x-trace-id"] = trace_id - response.headers["x-env"] = env - response.headers["x-user-type"] = user_type - response.headers["x-user-name"] = user_name - return response From 32b2ac103f98a75f09a8f0221fcd4dcf6d2f70ed Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Thu, 23 Oct 2025 17:43:28 +0800 Subject: [PATCH 04/36] feat: update mcp --- src/memos/utils.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/memos/utils.py b/src/memos/utils.py index 6a1d42558..5801bc2d2 100644 --- a/src/memos/utils.py +++ b/src/memos/utils.py @@ -1,5 +1,6 @@ import time +from memos import settings from memos.log import get_logger @@ -13,7 +14,8 @@ def wrapper(*args, **kwargs): start = time.perf_counter() result = func(*args, **kwargs) elapsed = time.perf_counter() - start - logger.info(f"[TIMER] {func.__name__} took {elapsed:.2f} s") + if settings.DEBUG: + logger.info(f"[TIMER] {func.__name__} took {elapsed:.2f} s") return result return wrapper From e4c6b924a813560fa3a916234f8fd3ddd368838f Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Thu, 23 Oct 2025 17:58:17 +0800 Subject: [PATCH 05/36] feat: add error log --- src/memos/api/middleware/request_context.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/memos/api/middleware/request_context.py b/src/memos/api/middleware/request_context.py index 42e1fbb65..c81ab5a5a 100644 --- a/src/memos/api/middleware/request_context.py +++ b/src/memos/api/middleware/request_context.py @@ -68,9 +68,13 @@ async def dispatch(self, request: Request, call_next: Callable) -> Response: response = await call_next(request) end_time = time.time() - # Log request completion with output - logger.info( - f"Request completed: {request.url.path}, status: {response.status_code}, cost: {end_time - start_time}s" - ) + if response.status_code == 200: + logger.info( + f"Request completed: {request.url.path}, status: {response.status_code}, cost: {(end_time - start_time) * 1000:.2f}ms" + ) + else: + logger.error( + f"Request Failed: {request.url.path}, status: {response.status_code}, cost: {(end_time - start_time) * 1000:.2f}ms" + ) return response From c27bd61af8cace0b4ae550ec473c1e7b99c83038 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Thu, 23 Oct 2025 17:59:27 +0800 Subject: [PATCH 06/36] feat: add error log --- src/memos/log.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/memos/log.py b/src/memos/log.py index a610d4011..59e9ed0e3 100644 --- a/src/memos/log.py +++ b/src/memos/log.py @@ -171,10 +171,10 @@ def close(self): "disable_existing_loggers": False, "formatters": { "standard": { - "format": "%(asctime)s [%(trace_id)s] - %(env)s | %(user_type)s | %(user_name)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" + "format": "%(asctime)s | %(trace_id)s | %(env)s | %(user_type)s | %(user_name)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" }, "no_datetime": { - "format": "[%(trace_id)s] - %(env)s | %(user_type)s | %(user_name)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" + "format": "%(trace_id)s| %(env)s | %(user_type)s | %(user_name)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" }, "simplified": { "format": "%(asctime)s | %(trace_id)s | %(env)s | %(user_type)s | %(user_name)s | %(levelname)s | %(filename)s:%(lineno)d: %(funcName)s | %(message)s" From 6769b4c47abbfe1600c8dfb6f97a13b32cc6adf4 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Thu, 23 Oct 2025 20:49:58 +0800 Subject: [PATCH 07/36] feat: add error log --- src/memos/api/middleware/request_context.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/memos/api/middleware/request_context.py b/src/memos/api/middleware/request_context.py index c81ab5a5a..b9063a64d 100644 --- a/src/memos/api/middleware/request_context.py +++ b/src/memos/api/middleware/request_context.py @@ -74,7 +74,7 @@ async def dispatch(self, request: Request, call_next: Callable) -> Response: ) else: logger.error( - f"Request Failed: {request.url.path}, status: {response.status_code}, cost: {(end_time - start_time) * 1000:.2f}ms" + f"Request Failed: {request.url.path}, response: {response.json()}, status: {response.status_code}, cost: {(end_time - start_time) * 1000:.2f}ms" ) return response From 01547e1adcc1d1576ef69a5c51c321aa672a1755 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Fri, 24 Oct 2025 10:09:57 +0800 Subject: [PATCH 08/36] feat: update log --- src/memos/log.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/memos/log.py b/src/memos/log.py index 59e9ed0e3..4abdb19c7 100644 --- a/src/memos/log.py +++ b/src/memos/log.py @@ -171,13 +171,13 @@ def close(self): "disable_existing_loggers": False, "formatters": { "standard": { - "format": "%(asctime)s | %(trace_id)s | %(env)s | %(user_type)s | %(user_name)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" + "format": "%(asctime)s | trace_id=%(trace_id)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s |%(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" }, "no_datetime": { - "format": "%(trace_id)s| %(env)s | %(user_type)s | %(user_name)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" + "format": "trace_id=%(trace_id)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s | %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" }, "simplified": { - "format": "%(asctime)s | %(trace_id)s | %(env)s | %(user_type)s | %(user_name)s | %(levelname)s | %(filename)s:%(lineno)d: %(funcName)s | %(message)s" + "format": "%(asctime)s | trace_id=%(trace_id)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s | % %(levelname)s | %(filename)s:%(lineno)d: %(funcName)s | %(message)s" }, }, "filters": { From a19584f71174647d6545e260fb87576de6d6cbcd Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Fri, 24 Oct 2025 11:44:11 +0800 Subject: [PATCH 09/36] feat: add chat_time --- src/memos/api/middleware/request_context.py | 11 +++++++++-- src/memos/log.py | 6 +++--- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/memos/api/middleware/request_context.py b/src/memos/api/middleware/request_context.py index b9063a64d..bdcbcc004 100644 --- a/src/memos/api/middleware/request_context.py +++ b/src/memos/api/middleware/request_context.py @@ -68,13 +68,20 @@ async def dispatch(self, request: Request, call_next: Callable) -> Response: response = await call_next(request) end_time = time.time() + try: + print(f"response.json(): {response}") + response_json = response.json() + except Exception as e: + response_json = None + logger.error(f"Error getting response body: {e}") + if response.status_code == 200: logger.info( - f"Request completed: {request.url.path}, status: {response.status_code}, cost: {(end_time - start_time) * 1000:.2f}ms" + f"Request completed: {request.url.path}, response: {response_json}, status: {response.status_code}, cost: {(end_time - start_time) * 1000:.2f}ms" ) else: logger.error( - f"Request Failed: {request.url.path}, response: {response.json()}, status: {response.status_code}, cost: {(end_time - start_time) * 1000:.2f}ms" + f"Request Failed: {request.url.path}, response: {response_json}, status: {response.status_code}, cost: {(end_time - start_time) * 1000:.2f}ms" ) return response diff --git a/src/memos/log.py b/src/memos/log.py index 4abdb19c7..8113f098e 100644 --- a/src/memos/log.py +++ b/src/memos/log.py @@ -171,13 +171,13 @@ def close(self): "disable_existing_loggers": False, "formatters": { "standard": { - "format": "%(asctime)s | trace_id=%(trace_id)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s |%(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" + "format": "%(asctime)s | %(trace_id)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s |%(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" }, "no_datetime": { - "format": "trace_id=%(trace_id)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s | %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" + "format": "%(trace_id)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s | %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" }, "simplified": { - "format": "%(asctime)s | trace_id=%(trace_id)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s | % %(levelname)s | %(filename)s:%(lineno)d: %(funcName)s | %(message)s" + "format": "%(asctime)s | %(trace_id)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s | % %(levelname)s | %(filename)s:%(lineno)d: %(funcName)s | %(message)s" }, }, "filters": { From 8dfa33844de3a45a499f5a49494566eb0dcfbeab Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Fri, 24 Oct 2025 12:07:24 +0800 Subject: [PATCH 10/36] feat: add chat_time --- src/memos/api/middleware/request_context.py | 27 ++++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/src/memos/api/middleware/request_context.py b/src/memos/api/middleware/request_context.py index bdcbcc004..5fdd94765 100644 --- a/src/memos/api/middleware/request_context.py +++ b/src/memos/api/middleware/request_context.py @@ -6,6 +6,7 @@ from collections.abc import Callable +from fastapi.responses import StreamingResponse from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request from starlette.responses import Response @@ -15,6 +16,20 @@ from memos.context.context import RequestContext, generate_trace_id, set_request_context +async def _tee_stream( + original: StreamingResponse, +) -> str: + chunks = [] + + async for chunk in original.body_iterator: + chunks.append(chunk) + yield chunk + + body_str = "".join(chunks) + + return body_str + + logger = memos.log.get_logger(__name__) @@ -67,21 +82,15 @@ async def dispatch(self, request: Request, call_next: Callable) -> Response: # Process the request response = await call_next(request) end_time = time.time() - - try: - print(f"response.json(): {response}") - response_json = response.json() - except Exception as e: - response_json = None - logger.error(f"Error getting response body: {e}") + content = await _tee_stream(response) if response.status_code == 200: logger.info( - f"Request completed: {request.url.path}, response: {response_json}, status: {response.status_code}, cost: {(end_time - start_time) * 1000:.2f}ms" + f"Request completed: {request.url.path}, content: {content}, status: {response.status_code}, cost: {(end_time - start_time) * 1000:.2f}ms" ) else: logger.error( - f"Request Failed: {request.url.path}, response: {response_json}, status: {response.status_code}, cost: {(end_time - start_time) * 1000:.2f}ms" + f"Request Failed: {request.url.path}, content: {content}, status: {response.status_code}, cost: {(end_time - start_time) * 1000:.2f}ms" ) return response From a91e3e260f2ccaf92c0f61e8c71142498a04559e Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Fri, 24 Oct 2025 12:50:07 +0800 Subject: [PATCH 11/36] feat: add chat_time --- src/memos/api/middleware/request_context.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/memos/api/middleware/request_context.py b/src/memos/api/middleware/request_context.py index 5fdd94765..a26d2c2aa 100644 --- a/src/memos/api/middleware/request_context.py +++ b/src/memos/api/middleware/request_context.py @@ -18,16 +18,16 @@ async def _tee_stream( original: StreamingResponse, -) -> str: +) -> StreamingResponse: chunks = [] async for chunk in original.body_iterator: chunks.append(chunk) yield chunk - body_str = "".join(chunks) + body_str = "".join(chunks).decode("utf-8", errors="replace") - return body_str + logger.info(f"Response content: {body_str}") logger = memos.log.get_logger(__name__) From 5b962e26cac70a4bde33e0198d2398e40cc4c55d Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Fri, 24 Oct 2025 13:22:45 +0800 Subject: [PATCH 12/36] feat: update log --- src/memos/api/middleware/request_context.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/memos/api/middleware/request_context.py b/src/memos/api/middleware/request_context.py index a26d2c2aa..32ce1ccec 100644 --- a/src/memos/api/middleware/request_context.py +++ b/src/memos/api/middleware/request_context.py @@ -82,7 +82,7 @@ async def dispatch(self, request: Request, call_next: Callable) -> Response: # Process the request response = await call_next(request) end_time = time.time() - content = await _tee_stream(response) + content = _tee_stream(response) if response.status_code == 200: logger.info( From 69a6e9a7eac5bb2181362e4e7986425a1dd87da7 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Fri, 24 Oct 2025 13:34:48 +0800 Subject: [PATCH 13/36] feat: update log --- src/memos/api/middleware/request_context.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/memos/api/middleware/request_context.py b/src/memos/api/middleware/request_context.py index 32ce1ccec..76bd8f77a 100644 --- a/src/memos/api/middleware/request_context.py +++ b/src/memos/api/middleware/request_context.py @@ -82,7 +82,16 @@ async def dispatch(self, request: Request, call_next: Callable) -> Response: # Process the request response = await call_next(request) end_time = time.time() - content = _tee_stream(response) + if isinstance(response, StreamingResponse): + response.body_iterator = _tee_stream(response) + content = "Streaming response" + else: + try: + content = ( + response.body.decode("utf-8") if hasattr(response, "body") else str(response) + ) + except Exception as e: + content = f"Unable to decode response content: {e!s}" if response.status_code == 200: logger.info( From d325a3119369b573cf1bfa90d04fbf3561bfddb6 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Fri, 24 Oct 2025 15:03:46 +0800 Subject: [PATCH 14/36] feat: update log --- src/memos/api/routers/server_router.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/memos/api/routers/server_router.py b/src/memos/api/routers/server_router.py index a332de583..8a2f0a968 100644 --- a/src/memos/api/routers/server_router.py +++ b/src/memos/api/routers/server_router.py @@ -237,6 +237,7 @@ def search_memories(search_req: APISearchRequest): } ) + logger.info(f"search_memories response data: {memories_result}") return SearchResponse( message="Search completed successfully", data=memories_result, @@ -285,6 +286,7 @@ def add_memories(add_req: APIADDRequest): } for memory_id, memory in zip(mem_id_list, flattened_memories, strict=False) ] + logger.info(f"add_memories response data: {response_data}") return MemoryResponse( message="Memory added successfully", data=response_data, From f0e5f5cc5e07b82c1bf1277e88c5b93713bfd310 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Fri, 24 Oct 2025 15:34:46 +0800 Subject: [PATCH 15/36] feat: update log --- src/memos/api/middleware/request_context.py | 33 ++++++++++----------- src/memos/context/context.py | 4 +-- src/memos/log.py | 2 +- 3 files changed, 18 insertions(+), 21 deletions(-) diff --git a/src/memos/api/middleware/request_context.py b/src/memos/api/middleware/request_context.py index 76bd8f77a..a7d0dc967 100644 --- a/src/memos/api/middleware/request_context.py +++ b/src/memos/api/middleware/request_context.py @@ -80,26 +80,23 @@ async def dispatch(self, request: Request, call_next: Callable) -> Response: logger.info(f"Request started, params: {params_log}, headers: {request.headers}") # Process the request - response = await call_next(request) - end_time = time.time() - if isinstance(response, StreamingResponse): - response.body_iterator = _tee_stream(response) - content = "Streaming response" - else: - try: - content = ( - response.body.decode("utf-8") if hasattr(response, "body") else str(response) - ) - except Exception as e: - content = f"Unable to decode response content: {e!s}" + try: + response = await call_next(request) + end_time = time.time() - if response.status_code == 200: - logger.info( - f"Request completed: {request.url.path}, content: {content}, status: {response.status_code}, cost: {(end_time - start_time) * 1000:.2f}ms" - ) - else: + if response.status_code == 200: + logger.info( + f"Request completed: {request.url.path}, status: {response.status_code}, cost: {(end_time - start_time) * 1000:.2f}ms" + ) + else: + logger.error( + f"Request Failed: {request.url.path}, status: {response.status_code}, cost: {(end_time - start_time) * 1000:.2f}ms" + ) + except Exception as e: + end_time = time.time() logger.error( - f"Request Failed: {request.url.path}, content: {content}, status: {response.status_code}, cost: {(end_time - start_time) * 1000:.2f}ms" + f"Request Exception Error: {e}, cost: {(end_time - start_time) * 1000:.2f}ms" ) + raise e return response diff --git a/src/memos/context/context.py b/src/memos/context/context.py index 2eecca92c..d6a0f3bf1 100644 --- a/src/memos/context/context.py +++ b/src/memos/context/context.py @@ -133,7 +133,7 @@ def get_current_user_type() -> str | None: context = _request_context.get() if context: return context.get("user_type") - return "normal" + return "opensource" def get_current_user_name() -> str | None: @@ -143,7 +143,7 @@ def get_current_user_name() -> str | None: context = _request_context.get() if context: return context.get("user_name") - return "unknown" + return "memos" def get_current_context() -> RequestContext | None: diff --git a/src/memos/log.py b/src/memos/log.py index 8113f098e..dac79b4af 100644 --- a/src/memos/log.py +++ b/src/memos/log.py @@ -171,7 +171,7 @@ def close(self): "disable_existing_loggers": False, "formatters": { "standard": { - "format": "%(asctime)s | %(trace_id)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s |%(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" + "format": "%(asctime)s | %(trace_id)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s | %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" }, "no_datetime": { "format": "%(trace_id)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s | %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" From 7fc8c052fc95b4ace4b32b68f02dc51d393df7b8 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Fri, 24 Oct 2025 15:38:35 +0800 Subject: [PATCH 16/36] feat: update log --- src/memos/log.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/memos/log.py b/src/memos/log.py index dac79b4af..d46bfa7f5 100644 --- a/src/memos/log.py +++ b/src/memos/log.py @@ -50,6 +50,7 @@ def filter(self, record): record.env = get_current_env() record.user_type = get_current_user_type() record.user_name = get_current_user_name() + record.api_path = get_current_api_path() except Exception: record.trace_id = "trace-id" record.env = "prod" @@ -171,13 +172,13 @@ def close(self): "disable_existing_loggers": False, "formatters": { "standard": { - "format": "%(asctime)s | %(trace_id)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s | %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" + "format": "%(asctime)s | %(trace_id)s | %(api_path)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s | %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" }, "no_datetime": { - "format": "%(trace_id)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s | %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" + "format": "%(trace_id)s | %(api_path)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s | %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" }, "simplified": { - "format": "%(asctime)s | %(trace_id)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s | % %(levelname)s | %(filename)s:%(lineno)d: %(funcName)s | %(message)s" + "format": "%(asctime)s | %(trace_id)s | %(api_path)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s | % %(levelname)s | %(filename)s:%(lineno)d: %(funcName)s | %(message)s" }, }, "filters": { From 185ed93fa12309e099fb66b63bbf23b77d9d2a5c Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Sun, 26 Oct 2025 11:20:21 +0800 Subject: [PATCH 17/36] feat: add arms --- src/memos/api/exceptions.py | 27 ++++++++++++++- src/memos/api/middleware/request_context.py | 38 ++++++++++++++++----- src/memos/api/routers/server_router.py | 5 +++ src/memos/api/server_api.py | 10 ++++-- src/memos/log.py | 2 +- 5 files changed, 70 insertions(+), 12 deletions(-) diff --git a/src/memos/api/exceptions.py b/src/memos/api/exceptions.py index 2fd22ad52..10a14b4d1 100644 --- a/src/memos/api/exceptions.py +++ b/src/memos/api/exceptions.py @@ -1,5 +1,6 @@ import logging +from fastapi.exceptions import HTTPException, RequestValidationError from fastapi.requests import Request from fastapi.responses import JSONResponse @@ -10,9 +11,24 @@ class APIExceptionHandler: """Centralized exception handling for MemOS APIs.""" + @staticmethod + async def validation_error_handler(request: Request, exc: RequestValidationError): + """Handle request validation errors.""" + logger.error(f"Validation error: {exc.errors()}") + return JSONResponse( + status_code=422, + content={ + "code": 422, + "message": "Parameter validation error", + "detail": exc.errors(), + "data": None, + }, + ) + @staticmethod async def value_error_handler(request: Request, exc: ValueError): """Handle ValueError exceptions globally.""" + logger.error(f"ValueError: {exc}") return JSONResponse( status_code=400, content={"code": 400, "message": str(exc), "data": None}, @@ -21,8 +37,17 @@ async def value_error_handler(request: Request, exc: ValueError): @staticmethod async def global_exception_handler(request: Request, exc: Exception): """Handle all unhandled exceptions globally.""" - logger.exception("Unhandled error:") + logger.error(f"Exception: {exc}") return JSONResponse( status_code=500, content={"code": 500, "message": str(exc), "data": None}, ) + + @staticmethod + async def http_error_handler(request: Request, exc: HTTPException): + """Handle HTTP exceptions globally.""" + logger.error(f"HTTP error {exc.status_code}: {exc.detail}") + return JSONResponse( + status_code=exc.status_code, + content={"code": exc.status_code, "message": str(exc.detail), "data": None}, + ) diff --git a/src/memos/api/middleware/request_context.py b/src/memos/api/middleware/request_context.py index cb41428d4..1b526481b 100644 --- a/src/memos/api/middleware/request_context.py +++ b/src/memos/api/middleware/request_context.py @@ -2,6 +2,9 @@ Request context middleware for automatic trace_id injection. """ +import os +import time + from collections.abc import Callable from starlette.middleware.base import BaseHTTPMiddleware @@ -15,6 +18,10 @@ logger = memos.log.get_logger(__name__) +print("ARMS_APP_NAME", os.environ["ARMS_APP_NAME"]) +print("ARMS_REGION_ID", os.environ["ARMS_REGION_ID"]) +print("ARMS_LICENSE_KEY", os.environ["ARMS_LICENSE_KEY"]) + def extract_trace_id_from_headers(request: Request) -> str | None: """Extract trace_id from various possible headers with priority: g-trace-id > x-trace-id > trace-id.""" @@ -38,6 +45,8 @@ async def dispatch(self, request: Request, call_next: Callable) -> Response: # Extract or generate trace_id trace_id = extract_trace_id_from_headers(request) or generate_trace_id() + start_time = time.time() + # Create and set request context context = RequestContext(trace_id=trace_id, api_path=request.url.path) set_request_context(context) @@ -49,15 +58,28 @@ async def dispatch(self, request: Request, call_next: Callable) -> Response: if request.query_params: params_log["query_params"] = dict(request.query_params) - logger.info(f"Request started: {request.method} {request.url.path}, {params_log}") + logger.info(f"Request started, params: {params_log}, headers: {request.headers}") # Process the request - response = await call_next(request) - - # Log request completion with output - logger.info(f"Request completed: {request.url.path}, status: {response.status_code}") - - # Add trace_id to response headers for debugging - response.headers["x-trace-id"] = trace_id + try: + response = await call_next(request) + end_time = time.time() + logger.info(f"response is: {response.body}") + + # 记录请求状态 + if response.status_code == 200: + logger.info( + f"Request completed: {request.url.path}, status: {response.status_code}, cost: {(end_time - start_time) * 1000:.2f}ms" + ) + else: + logger.error( + f"Request Failed: {request.url.path}, status: {response.status_code}, cost: {(end_time - start_time) * 1000:.2f}ms" + ) + except Exception as e: + end_time = time.time() + logger.error( + f"Request Exception Error: {e}, cost: {(end_time - start_time) * 1000:.2f}ms" + ) + raise e return response diff --git a/src/memos/api/routers/server_router.py b/src/memos/api/routers/server_router.py index a332de583..52d7dfbc9 100644 --- a/src/memos/api/routers/server_router.py +++ b/src/memos/api/routers/server_router.py @@ -237,6 +237,8 @@ def search_memories(search_req: APISearchRequest): } ) + logger.info(f"search_memories response is: {memories_result}") + return SearchResponse( message="Search completed successfully", data=memories_result, @@ -285,6 +287,9 @@ def add_memories(add_req: APIADDRequest): } for memory_id, memory in zip(mem_id_list, flattened_memories, strict=False) ] + + logger.info(f"add_memories response is: {response_data}") + return MemoryResponse( message="Memory added successfully", data=response_data, diff --git a/src/memos/api/server_api.py b/src/memos/api/server_api.py index 78e05ef85..24c67de48 100644 --- a/src/memos/api/server_api.py +++ b/src/memos/api/server_api.py @@ -1,6 +1,7 @@ import logging -from fastapi import FastAPI +from fastapi import FastAPI, HTTPException +from fastapi.exceptions import RequestValidationError from memos.api.exceptions import APIExceptionHandler from memos.api.middleware.request_context import RequestContextMiddleware @@ -21,8 +22,13 @@ # Include routers app.include_router(server_router) -# Exception handlers +# Request validation failed +app.exception_handler(RequestValidationError)(APIExceptionHandler.validation_error_handler) +# Invalid business code parameters app.exception_handler(ValueError)(APIExceptionHandler.value_error_handler) +# Business layer manual exception +app.exception_handler(HTTPException)(APIExceptionHandler.http_error_handler) +# Fallback for unknown errors app.exception_handler(Exception)(APIExceptionHandler.global_exception_handler) diff --git a/src/memos/log.py b/src/memos/log.py index 339d13f26..964587b7b 100644 --- a/src/memos/log.py +++ b/src/memos/log.py @@ -160,7 +160,7 @@ def close(self): }, "handlers": { "console": { - "level": selected_log_level, + "level": "DEBUG", "class": "logging.StreamHandler", "stream": stdout, "formatter": "no_datetime", From d5c59a0ff7d51583a45ed27c86c00af75bd31c16 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Sun, 26 Oct 2025 11:39:26 +0800 Subject: [PATCH 18/36] fix: format --- src/memos/api/server_api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/memos/api/server_api.py b/src/memos/api/server_api.py index 24c67de48..555da8c6f 100644 --- a/src/memos/api/server_api.py +++ b/src/memos/api/server_api.py @@ -13,7 +13,7 @@ logger = logging.getLogger(__name__) app = FastAPI( - title="MemOS Product REST APIs", + title="MemOS Product REST API", description="A REST API for managing multiple users with MemOS Product.", version="1.0.1", ) From b1444706400ba59fb7757f06a3aa091e59073f20 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Sun, 26 Oct 2025 12:04:37 +0800 Subject: [PATCH 19/36] fix: format --- src/memos/api/middleware/request_context.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/memos/api/middleware/request_context.py b/src/memos/api/middleware/request_context.py index def6288c2..2922ab3eb 100644 --- a/src/memos/api/middleware/request_context.py +++ b/src/memos/api/middleware/request_context.py @@ -2,7 +2,6 @@ Request context middleware for automatic trace_id injection. """ -import os import time from collections.abc import Callable @@ -18,10 +17,6 @@ logger = memos.log.get_logger(__name__) -print("ARMS_APP_NAME", os.environ["ARMS_APP_NAME"]) -print("ARMS_REGION_ID", os.environ["ARMS_REGION_ID"]) -print("ARMS_LICENSE_KEY", os.environ["ARMS_LICENSE_KEY"]) - def extract_trace_id_from_headers(request: Request) -> str | None: """Extract trace_id from various possible headers with priority: g-trace-id > x-trace-id > trace-id.""" From 33921b72443fb27206870484072e7de5a72db1f5 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Sun, 26 Oct 2025 14:23:35 +0800 Subject: [PATCH 20/36] feat: add dockerfile --- Dockerfile | 23 ++++++++++++++++ docker-compose.yml | 67 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+) create mode 100644 Dockerfile create mode 100644 docker-compose.yml diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 000000000..7fc81ee7f --- /dev/null +++ b/Dockerfile @@ -0,0 +1,23 @@ +# Base image +FROM registry.cn-shanghai.aliyuncs.com/memtensor/memos:amd-v1.2 + +# Set Hugging Face mirror +ENV HF_ENDPOINT=https://hf-mirror.com + +RUN rm -rf /app/ + +WORKDIR /app + +COPY . /app/ + +# Set Python import path +ENV PYTHONPATH=/app/src +RUN pip install pymilvus +RUN pip config set global.index-url https://mirrors.aliyun.com/pypi/simple/ && pip config set install.trusted-host mirrors.aliyun.com +RUN aliyun-bootstrap -a install + +# Expose port +EXPOSE 9002 + +# Start the docker +CMD ["uvicorn", "memos.api.product_api:app", "--host", "0.0.0.0", "--port", "9002", "--reload"] diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 000000000..d8998b6f7 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,67 @@ +name: memos-dev + +services: + memos: + container_name: memos-api-docker + build: + context: .. + dockerfile: docker/Dockerfile + ports: + - "8000:8000" + env_file: + - ../.env + depends_on: + - neo4j + - qdrant + environment: + - PYTHONPATH=/app/src + - HF_ENDPOINT=https://hf-mirror.com + volumes: + - ../src:/app/src + - .:/app/docker + networks: + - memos_network + + neo4j: + image: neo4j:5.26.4 + container_name: neo4j-docker + ports: + - "7474:7474" # HTTP + - "7687:7687" # Bolt + healthcheck: + test: wget http://localhost:7687 || exit 1 + interval: 1s + timeout: 10s + retries: 20 + start_period: 3s + environment: + NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes" + NEO4J_AUTH: "neo4j/12345678" + volumes: + - neo4j_data:/data + - neo4j_logs:/logs + networks: + - memos_network + + qdrant: + image: qdrant/qdrant:v1.15.0 + container_name: qdrant-docker + ports: + - "6333:6333" # REST API + - "6334:6334" # gRPC API + volumes: + - ./qdrant_data:/qdrant/storage + environment: + QDRANT__SERVICE__GRPC_PORT: 6334 + QDRANT__SERVICE__HTTP_PORT: 6333 + restart: unless-stopped + networks: + - memos_network + +volumes: + neo4j_data: + neo4j_logs: + +networks: + memos_network: + driver: bridge From 49a90798078d53422c984c321fa1b6de68cf14af Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Sun, 26 Oct 2025 14:30:05 +0800 Subject: [PATCH 21/36] feat: add dockerfile --- Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/Dockerfile b/Dockerfile index 7fc81ee7f..e3be58867 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,6 +14,7 @@ COPY . /app/ ENV PYTHONPATH=/app/src RUN pip install pymilvus RUN pip config set global.index-url https://mirrors.aliyun.com/pypi/simple/ && pip config set install.trusted-host mirrors.aliyun.com +RUN pip install aliyun-bootstrap RUN aliyun-bootstrap -a install # Expose port From 27c49b65a6db1276297d9d6683ef831075fc5f80 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Sun, 26 Oct 2025 14:48:30 +0800 Subject: [PATCH 22/36] feat: add arms config --- src/memos/api/server_api.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/memos/api/server_api.py b/src/memos/api/server_api.py index 555da8c6f..8699952e1 100644 --- a/src/memos/api/server_api.py +++ b/src/memos/api/server_api.py @@ -1,3 +1,5 @@ +from aliyun.opentelemetry.instrumentation.auto_instrumentation import sitecustomize + import logging from fastapi import FastAPI, HTTPException From 60c5dd8b4cc293b2e3543f6545cd74f01aec5c39 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Sun, 26 Oct 2025 16:15:27 +0800 Subject: [PATCH 23/36] feat: update log --- src/memos/log.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/memos/log.py b/src/memos/log.py index 6da7a1da1..6a1fabf35 100644 --- a/src/memos/log.py +++ b/src/memos/log.py @@ -172,13 +172,13 @@ def close(self): "disable_existing_loggers": False, "formatters": { "standard": { - "format": "%(asctime)s | %(trace_id)s | %(api_path)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s | %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" + "format": "%(asctime)s | %(trace_id)s | url=%(api_path)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s | %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" }, "no_datetime": { - "format": "%(trace_id)s | %(api_path)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s | %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" + "format": "%(trace_id)s | url=%(api_path)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s | %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" }, "simplified": { - "format": "%(asctime)s | %(trace_id)s | %(api_path)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s | % %(levelname)s | %(filename)s:%(lineno)d: %(funcName)s | %(message)s" + "format": "%(asctime)s | %(trace_id)s | url=%(api_path)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s | % %(levelname)s | %(filename)s:%(lineno)d: %(funcName)s | %(message)s" }, }, "filters": { From 3096321f8ee4976c08b2f6674b9c8474356b760b Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Sun, 26 Oct 2025 17:06:21 +0800 Subject: [PATCH 24/36] feat: add sleep time --- src/memos/api/routers/server_router.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/memos/api/routers/server_router.py b/src/memos/api/routers/server_router.py index 52d7dfbc9..77faf435e 100644 --- a/src/memos/api/routers/server_router.py +++ b/src/memos/api/routers/server_router.py @@ -1,4 +1,5 @@ import os +import time import traceback from typing import Any @@ -202,6 +203,7 @@ def search_memories(search_req: APISearchRequest): session_id=search_req.session_id or "default_session", ) logger.info(f"Search user_id is: {user_context.mem_cube_id}") + time.sleep(6) memories_result: MOSSearchResult = { "text_mem": [], "act_mem": [], @@ -254,6 +256,7 @@ def add_memories(add_req: APIADDRequest): mem_cube_id=add_req.mem_cube_id, session_id=add_req.session_id or "default_session", ) + time.sleep(6) naive_mem_cube = _create_naive_mem_cube() target_session_id = add_req.session_id if not target_session_id: From 204efef0a09a3a7512725779d88cfe3d40629818 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Sun, 26 Oct 2025 19:10:24 +0800 Subject: [PATCH 25/36] feat: add sleep time --- src/memos/api/routers/server_router.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/memos/api/routers/server_router.py b/src/memos/api/routers/server_router.py index 77faf435e..52d7dfbc9 100644 --- a/src/memos/api/routers/server_router.py +++ b/src/memos/api/routers/server_router.py @@ -1,5 +1,4 @@ import os -import time import traceback from typing import Any @@ -203,7 +202,6 @@ def search_memories(search_req: APISearchRequest): session_id=search_req.session_id or "default_session", ) logger.info(f"Search user_id is: {user_context.mem_cube_id}") - time.sleep(6) memories_result: MOSSearchResult = { "text_mem": [], "act_mem": [], @@ -256,7 +254,6 @@ def add_memories(add_req: APIADDRequest): mem_cube_id=add_req.mem_cube_id, session_id=add_req.session_id or "default_session", ) - time.sleep(6) naive_mem_cube = _create_naive_mem_cube() target_session_id = add_req.session_id if not target_session_id: From 33a41e8d68c9eb04b198f2f0009d76c1cfc528c4 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Tue, 28 Oct 2025 13:36:39 +0800 Subject: [PATCH 26/36] feat: update log --- src/memos/log.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/memos/log.py b/src/memos/log.py index 6a1fabf35..e1a32aa11 100644 --- a/src/memos/log.py +++ b/src/memos/log.py @@ -175,10 +175,10 @@ def close(self): "format": "%(asctime)s | %(trace_id)s | url=%(api_path)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s | %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" }, "no_datetime": { - "format": "%(trace_id)s | url=%(api_path)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s | %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" + "format": "%(trace_id)s | url=%(api_path)s | %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" }, "simplified": { - "format": "%(asctime)s | %(trace_id)s | url=%(api_path)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s | % %(levelname)s | %(filename)s:%(lineno)d: %(funcName)s | %(message)s" + "format": "%(asctime)s | %(trace_id)s | url=%(api_path)s | % %(levelname)s | %(filename)s:%(lineno)d: %(funcName)s | %(message)s" }, }, "filters": { @@ -187,7 +187,7 @@ def close(self): }, "handlers": { "console": { - "level": "DEBUG", + "level": selected_log_level, "class": "logging.StreamHandler", "stream": stdout, "formatter": "no_datetime", From cf231740f0c6d5cbad97173a80d89be6fb316306 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Tue, 28 Oct 2025 13:42:48 +0800 Subject: [PATCH 27/36] feat: delete dockerfile --- Dockerfile | 24 ----------------- docker-compose.yml | 67 ---------------------------------------------- 2 files changed, 91 deletions(-) delete mode 100644 Dockerfile delete mode 100644 docker-compose.yml diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index e3be58867..000000000 --- a/Dockerfile +++ /dev/null @@ -1,24 +0,0 @@ -# Base image -FROM registry.cn-shanghai.aliyuncs.com/memtensor/memos:amd-v1.2 - -# Set Hugging Face mirror -ENV HF_ENDPOINT=https://hf-mirror.com - -RUN rm -rf /app/ - -WORKDIR /app - -COPY . /app/ - -# Set Python import path -ENV PYTHONPATH=/app/src -RUN pip install pymilvus -RUN pip config set global.index-url https://mirrors.aliyun.com/pypi/simple/ && pip config set install.trusted-host mirrors.aliyun.com -RUN pip install aliyun-bootstrap -RUN aliyun-bootstrap -a install - -# Expose port -EXPOSE 9002 - -# Start the docker -CMD ["uvicorn", "memos.api.product_api:app", "--host", "0.0.0.0", "--port", "9002", "--reload"] diff --git a/docker-compose.yml b/docker-compose.yml deleted file mode 100644 index d8998b6f7..000000000 --- a/docker-compose.yml +++ /dev/null @@ -1,67 +0,0 @@ -name: memos-dev - -services: - memos: - container_name: memos-api-docker - build: - context: .. - dockerfile: docker/Dockerfile - ports: - - "8000:8000" - env_file: - - ../.env - depends_on: - - neo4j - - qdrant - environment: - - PYTHONPATH=/app/src - - HF_ENDPOINT=https://hf-mirror.com - volumes: - - ../src:/app/src - - .:/app/docker - networks: - - memos_network - - neo4j: - image: neo4j:5.26.4 - container_name: neo4j-docker - ports: - - "7474:7474" # HTTP - - "7687:7687" # Bolt - healthcheck: - test: wget http://localhost:7687 || exit 1 - interval: 1s - timeout: 10s - retries: 20 - start_period: 3s - environment: - NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes" - NEO4J_AUTH: "neo4j/12345678" - volumes: - - neo4j_data:/data - - neo4j_logs:/logs - networks: - - memos_network - - qdrant: - image: qdrant/qdrant:v1.15.0 - container_name: qdrant-docker - ports: - - "6333:6333" # REST API - - "6334:6334" # gRPC API - volumes: - - ./qdrant_data:/qdrant/storage - environment: - QDRANT__SERVICE__GRPC_PORT: 6334 - QDRANT__SERVICE__HTTP_PORT: 6333 - restart: unless-stopped - networks: - - memos_network - -volumes: - neo4j_data: - neo4j_logs: - -networks: - memos_network: - driver: bridge From 18e2edaef355bc821b3dbe8afee8e73a8310c673 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Tue, 28 Oct 2025 13:56:30 +0800 Subject: [PATCH 28/36] feat: delete dockerfile --- Dockerfile | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index e3be58867..e6aa96458 100644 --- a/Dockerfile +++ b/Dockerfile @@ -12,8 +12,9 @@ COPY . /app/ # Set Python import path ENV PYTHONPATH=/app/src -RUN pip install pymilvus RUN pip config set global.index-url https://mirrors.aliyun.com/pypi/simple/ && pip config set install.trusted-host mirrors.aliyun.com +RUN pip install pymilvus +RUN pip install gunicorn RUN pip install aliyun-bootstrap RUN aliyun-bootstrap -a install From f9a18a5132c483b1099f04d611406aafe87f3baf Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Tue, 28 Oct 2025 15:11:50 +0800 Subject: [PATCH 29/36] feat: update dockerfile --- Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/Dockerfile b/Dockerfile index e6aa96458..ba618badc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,6 +16,7 @@ RUN pip config set global.index-url https://mirrors.aliyun.com/pypi/simple/ && p RUN pip install pymilvus RUN pip install gunicorn RUN pip install aliyun-bootstrap +RUN pip install "sglang>=0.4.0" RUN aliyun-bootstrap -a install # Expose port From 1d4f3d1acceff352bc9001a8b8b162084d44d0a5 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Tue, 28 Oct 2025 15:38:34 +0800 Subject: [PATCH 30/36] fix: conflict --- src/memos/api/server_api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/memos/api/server_api.py b/src/memos/api/server_api.py index 555da8c6f..24c67de48 100644 --- a/src/memos/api/server_api.py +++ b/src/memos/api/server_api.py @@ -13,7 +13,7 @@ logger = logging.getLogger(__name__) app = FastAPI( - title="MemOS Product REST API", + title="MemOS Product REST APIs", description="A REST API for managing multiple users with MemOS Product.", version="1.0.1", ) From 92be50b31c81526402f4f9af3e1715250c576619 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Tue, 28 Oct 2025 15:44:26 +0800 Subject: [PATCH 31/36] feat: replace ThreadPool to context --- src/memos/api/routers/server_router.py | 6 +++--- src/memos/mem_os/core.py | 6 +++--- src/memos/mem_scheduler/general_scheduler.py | 7 ++++--- src/memos/memories/textual/prefer_text_memory/adder.py | 7 ++++--- src/memos/memories/textual/prefer_text_memory/extractor.py | 5 +++-- .../memories/textual/prefer_text_memory/retrievers.py | 4 ++-- 6 files changed, 19 insertions(+), 16 deletions(-) diff --git a/src/memos/api/routers/server_router.py b/src/memos/api/routers/server_router.py index 1331094a8..796f56e24 100644 --- a/src/memos/api/routers/server_router.py +++ b/src/memos/api/routers/server_router.py @@ -2,7 +2,6 @@ import os import traceback -from concurrent.futures import ThreadPoolExecutor from typing import Any from fastapi import APIRouter, HTTPException @@ -23,6 +22,7 @@ from memos.configs.mem_scheduler import SchedulerConfigFactory from memos.configs.reranker import RerankerConfigFactory from memos.configs.vec_db import VectorDBConfigFactory +from memos.context.context import ContextThreadPoolExecutor from memos.embedders.factory import EmbedderFactory from memos.graph_dbs.factory import GraphStoreFactory from memos.llms.factory import LLMFactory @@ -370,7 +370,7 @@ def _search_pref(): ) return [_format_memory_item(data) for data in results] - with ThreadPoolExecutor(max_workers=2) as executor: + with ContextThreadPoolExecutor(max_workers=2) as executor: text_future = executor.submit(_search_text) pref_future = executor.submit(_search_pref) text_formatted_memories = text_future.result() @@ -616,7 +616,7 @@ def _process_pref_mem() -> list[dict[str, str]]: for memory_id, memory in zip(pref_ids_local, pref_memories_local, strict=False) ] - with ThreadPoolExecutor(max_workers=2) as executor: + with ContextThreadPoolExecutor(max_workers=2) as executor: text_future = executor.submit(_process_text_mem) pref_future = executor.submit(_process_pref_mem) text_response_data = text_future.result() diff --git a/src/memos/mem_os/core.py b/src/memos/mem_os/core.py index ec8a673d7..939b0c68d 100644 --- a/src/memos/mem_os/core.py +++ b/src/memos/mem_os/core.py @@ -2,13 +2,13 @@ import os import time -from concurrent.futures import ThreadPoolExecutor from datetime import datetime from pathlib import Path from threading import Lock from typing import Any, Literal from memos.configs.mem_os import MOSConfig +from memos.context.context import ContextThreadPoolExecutor from memos.llms.factory import LLMFactory from memos.log import get_logger from memos.mem_cube.general import GeneralMemCube @@ -665,7 +665,7 @@ def search_preference_memory(cube_id, cube): return None # Execute both search functions in parallel - with ThreadPoolExecutor(max_workers=2) as executor: + with ContextThreadPoolExecutor(max_workers=2) as executor: text_future = executor.submit(search_textual_memory, mem_cube_id, mem_cube) pref_future = executor.submit(search_preference_memory, mem_cube_id, mem_cube) @@ -824,7 +824,7 @@ def process_preference_memory(): self.mem_scheduler.submit_messages(messages=[message_item]) # Execute both memory processing functions in parallel - with ThreadPoolExecutor(max_workers=2) as executor: + with ContextThreadPoolExecutor(max_workers=2) as executor: text_future = executor.submit(process_textual_memory) pref_future = executor.submit(process_preference_memory) diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index d84ebb242..434cef3e9 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -3,6 +3,7 @@ import traceback from memos.configs.mem_scheduler import GeneralSchedulerConfig +from memos.context.context import ContextThreadPoolExecutor from memos.log import get_logger from memos.mem_cube.general import GeneralMemCube from memos.mem_scheduler.base_scheduler import BaseScheduler @@ -281,7 +282,7 @@ def process_message(message: ScheduleMessageItem): except Exception as e: logger.error(f"Error processing mem_read message: {e}", exc_info=True) - with concurrent.futures.ThreadPoolExecutor(max_workers=min(8, len(messages))) as executor: + with ContextThreadPoolExecutor(max_workers=min(8, len(messages))) as executor: futures = [executor.submit(process_message, msg) for msg in messages] for future in concurrent.futures.as_completed(futures): try: @@ -413,7 +414,7 @@ def process_message(message: ScheduleMessageItem): except Exception as e: logger.error(f"Error processing mem_read message: {e}", exc_info=True) - with concurrent.futures.ThreadPoolExecutor(max_workers=min(8, len(messages))) as executor: + with ContextThreadPoolExecutor(max_workers=min(8, len(messages))) as executor: futures = [executor.submit(process_message, msg) for msg in messages] for future in concurrent.futures.as_completed(futures): try: @@ -506,7 +507,7 @@ def process_message(message: ScheduleMessageItem): except Exception as e: logger.error(f"Error processing pref_add message: {e}", exc_info=True) - with concurrent.futures.ThreadPoolExecutor(max_workers=min(8, len(messages))) as executor: + with ContextThreadPoolExecutor(max_workers=min(8, len(messages))) as executor: futures = [executor.submit(process_message, msg) for msg in messages] for future in concurrent.futures.as_completed(futures): try: diff --git a/src/memos/memories/textual/prefer_text_memory/adder.py b/src/memos/memories/textual/prefer_text_memory/adder.py index 390f048ef..eb284cd6d 100644 --- a/src/memos/memories/textual/prefer_text_memory/adder.py +++ b/src/memos/memories/textual/prefer_text_memory/adder.py @@ -1,9 +1,10 @@ import json from abc import ABC, abstractmethod -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import as_completed from typing import Any +from memos.context.context import ContextThreadPoolExecutor from memos.log import get_logger from memos.memories.textual.item import TextualMemoryItem from memos.templates.prefer_complete_prompt import ( @@ -162,7 +163,7 @@ def execute_op(op): self.vector_db.delete(collection_name, [op["target_id"]]) return None - with ThreadPoolExecutor(max_workers=min(len(rsp["trace"]), 5)) as executor: + with ContextThreadPoolExecutor(max_workers=min(len(rsp["trace"]), 5)) as executor: future_to_op = {executor.submit(execute_op, op): op for op in rsp["trace"]} added_ids = [] for future in as_completed(future_to_op): @@ -263,7 +264,7 @@ def add( return [] added_ids = [] - with ThreadPoolExecutor(max_workers=min(max_workers, len(memories))) as executor: + with ContextThreadPoolExecutor(max_workers=min(max_workers, len(memories))) as executor: future_to_memory = { executor.submit(self._process_single_memory, memory): memory for memory in memories } diff --git a/src/memos/memories/textual/prefer_text_memory/extractor.py b/src/memos/memories/textual/prefer_text_memory/extractor.py index 460b31f4f..41d90d10e 100644 --- a/src/memos/memories/textual/prefer_text_memory/extractor.py +++ b/src/memos/memories/textual/prefer_text_memory/extractor.py @@ -2,10 +2,11 @@ import uuid from abc import ABC, abstractmethod -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import as_completed from datetime import datetime from typing import Any +from memos.context.context import ContextThreadPoolExecutor from memos.log import get_logger from memos.memories.textual.item import PreferenceTextualMemoryMetadata, TextualMemoryItem from memos.memories.textual.prefer_text_memory.spliter import Splitter @@ -150,7 +151,7 @@ def extract( return [] memories = [] - with ThreadPoolExecutor(max_workers=min(max_workers, len(chunks))) as executor: + with ContextThreadPoolExecutor(max_workers=min(max_workers, len(chunks))) as executor: futures = { executor.submit(self._process_single_chunk_explicit, chunk, msg_type, info): ( "explicit", diff --git a/src/memos/memories/textual/prefer_text_memory/retrievers.py b/src/memos/memories/textual/prefer_text_memory/retrievers.py index 7f70bac3b..807a8b55e 100644 --- a/src/memos/memories/textual/prefer_text_memory/retrievers.py +++ b/src/memos/memories/textual/prefer_text_memory/retrievers.py @@ -1,7 +1,7 @@ from abc import ABC, abstractmethod -from concurrent.futures import ThreadPoolExecutor from typing import Any +from memos.context.context import ContextThreadPoolExecutor from memos.memories.textual.item import PreferenceTextualMemoryMetadata, TextualMemoryItem @@ -42,7 +42,7 @@ def retrieve( query_embedding = query_embeddings[0] # Get the first (and only) embedding # Use thread pool to parallelize the searches - with ThreadPoolExecutor(max_workers=2) as executor: + with ContextThreadPoolExecutor(max_workers=2) as executor: # Submit all search tasks future_explicit = executor.submit( self.vector_db.search, query_embedding, "explicit_preference", top_k * 2, info From 8a1fd64108e67ea4f61a2b5b4cef91c49502fb05 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Tue, 28 Oct 2025 16:16:34 +0800 Subject: [PATCH 32/36] feat: add timed log --- src/memos/embedders/universal_api.py | 23 ++++-- src/memos/graph_dbs/polardb.py | 98 ++++++++++++------------- src/memos/llms/openai.py | 3 + src/memos/log.py | 8 +- src/memos/reranker/http_bge.py | 4 +- src/memos/reranker/http_bge_strategy.py | 2 + src/memos/utils.py | 30 +++++--- 7 files changed, 94 insertions(+), 74 deletions(-) diff --git a/src/memos/embedders/universal_api.py b/src/memos/embedders/universal_api.py index 72116cf05..fc51cf073 100644 --- a/src/memos/embedders/universal_api.py +++ b/src/memos/embedders/universal_api.py @@ -3,6 +3,11 @@ from memos.configs.embedder import UniversalAPIEmbedderConfig from memos.embedders.base import BaseEmbedder +from memos.log import get_logger +from memos.utils import timed + + +logger = get_logger(__name__) class UniversalAPIEmbedder(BaseEmbedder): @@ -19,14 +24,18 @@ def __init__(self, config: UniversalAPIEmbedderConfig): api_key=config.api_key, ) else: - raise ValueError(f"Unsupported provider: {self.provider}") + raise ValueError(f"Embeddings unsupported provider: {self.provider}") + @timed(log=True, log_prefix="EmbedderAPI") def embed(self, texts: list[str]) -> list[list[float]]: if self.provider == "openai" or self.provider == "azure": - response = self.client.embeddings.create( - model=getattr(self.config, "model_name_or_path", "text-embedding-3-large"), - input=texts, - ) - return [r.embedding for r in response.data] + try: + response = self.client.embeddings.create( + model=getattr(self.config, "model_name_or_path", "text-embedding-3-large"), + input=texts, + ) + return [r.embedding for r in response.data] + except Exception as e: + raise Exception(f"Embeddings request ended with error: {e}") from e else: - raise ValueError(f"Unsupported provider: {self.provider}") + raise ValueError(f"Embeddings unsupported provider: {self.provider}") diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index 38e71298f..45957a10e 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -1,18 +1,18 @@ import json -import time import random + from datetime import datetime from typing import Any, Literal import numpy as np - from memos.configs.graph_db import PolarDBGraphDBConfig from memos.dependency import require_python_package from memos.graph_dbs.base import BaseGraphDB from memos.log import get_logger from memos.utils import timed + logger = get_logger(__name__) # Graph database configuration @@ -200,31 +200,31 @@ def _create_graph(self): # Add embedding column if it doesn't exist (using JSONB for compatibility) try: cursor.execute(f""" - ALTER TABLE "{self.db_name}_graph"."Memory" + ALTER TABLE "{self.db_name}_graph"."Memory" ADD COLUMN IF NOT EXISTS embedding JSONB; """) - logger.info(f"Embedding column added to Memory table.") + logger.info("Embedding column added to Memory table.") except Exception as e: logger.warning(f"Failed to add embedding column: {e}") # Create indexes cursor.execute(f""" - CREATE INDEX IF NOT EXISTS idx_memory_properties + CREATE INDEX IF NOT EXISTS idx_memory_properties ON "{self.db_name}_graph"."Memory" USING GIN (properties); """) # Create vector index for embedding field try: cursor.execute(f""" - CREATE INDEX IF NOT EXISTS idx_memory_embedding + CREATE INDEX IF NOT EXISTS idx_memory_embedding ON "{self.db_name}_graph"."Memory" USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100); """) - logger.info(f"Vector index created for Memory table.") + logger.info("Vector index created for Memory table.") except Exception as e: logger.warning(f"Vector index creation failed (might not be supported): {e}") - logger.info(f"Indexes created for Memory table.") + logger.info("Indexes created for Memory table.") except Exception as e: logger.error(f"Failed to create graph schema: {e}") @@ -246,20 +246,20 @@ def create_index( # Create indexes on the underlying PostgreSQL tables # Apache AGE stores data in regular PostgreSQL tables cursor.execute(f""" - CREATE INDEX IF NOT EXISTS idx_memory_properties + CREATE INDEX IF NOT EXISTS idx_memory_properties ON "{self.db_name}_graph"."Memory" USING GIN (properties); """) # Try to create vector index, but don't fail if it doesn't work try: cursor.execute(f""" - CREATE INDEX IF NOT EXISTS idx_memory_embedding + CREATE INDEX IF NOT EXISTS idx_memory_embedding ON "{self.db_name}_graph"."Memory" USING ivfflat (embedding vector_cosine_ops); """) except Exception as ve: logger.warning(f"Vector index creation failed (might not be supported): {ve}") - logger.debug(f"Indexes created successfully.") + logger.debug("Indexes created successfully.") except Exception as e: logger.warning(f"Failed to create indexes: {e}") @@ -267,8 +267,8 @@ def get_memory_count(self, memory_type: str, user_name: str | None = None) -> in """Get count of memory nodes by type.""" user_name = user_name if user_name else self._get_config_value("user_name") query = f""" - SELECT COUNT(*) - FROM "{self.db_name}_graph"."Memory" + SELECT COUNT(*) + FROM "{self.db_name}_graph"."Memory" WHERE ag_catalog.agtype_access_operator(properties, '"memory_type"'::agtype) = %s::agtype """ query += "\nAND ag_catalog.agtype_access_operator(properties, '\"user_name\"'::agtype) = %s::agtype" @@ -290,8 +290,8 @@ def node_not_exist(self, scope: str, user_name: str | None = None) -> int: """Check if a node with given scope exists.""" user_name = user_name if user_name else self._get_config_value("user_name") query = f""" - SELECT id - FROM "{self.db_name}_graph"."Memory" + SELECT id + FROM "{self.db_name}_graph"."Memory" WHERE ag_catalog.agtype_access_operator(properties, '"memory_type"'::agtype) = %s::agtype """ query += "\nAND ag_catalog.agtype_access_operator(properties, '\"user_name\"'::agtype) = %s::agtype" @@ -327,15 +327,13 @@ def remove_oldest_memory( # Use actual OFFSET logic, consistent with nebular.py # First find IDs to delete, then delete them select_query = f""" - SELECT id FROM "{self.db_name}_graph"."Memory" + SELECT id FROM "{self.db_name}_graph"."Memory" WHERE ag_catalog.agtype_access_operator(properties, '"memory_type"'::agtype) = %s::agtype AND ag_catalog.agtype_access_operator(properties, '"user_name"'::agtype) = %s::agtype - ORDER BY ag_catalog.agtype_access_operator(properties, '"updated_at"'::agtype) DESC + ORDER BY ag_catalog.agtype_access_operator(properties, '"updated_at"'::agtype) DESC OFFSET %s """ select_params = [f'"{memory_type}"', f'"{user_name}"', keep_latest] - print(f"[remove_oldest_memory] Select query: {select_query}") - print(f"[remove_oldest_memory] Select params: {select_params}") try: with self.connection.cursor() as cursor: @@ -403,14 +401,14 @@ def update_node(self, id: str, fields: dict[str, Any], user_name: str | None = N # Build update query if embedding_vector is not None: query = f""" - UPDATE "{self.db_name}_graph"."Memory" + UPDATE "{self.db_name}_graph"."Memory" SET properties = %s, embedding = %s WHERE ag_catalog.agtype_access_operator(properties, '"id"'::agtype) = %s::agtype """ params = [json.dumps(properties), json.dumps(embedding_vector), f'"{id}"'] else: query = f""" - UPDATE "{self.db_name}_graph"."Memory" + UPDATE "{self.db_name}_graph"."Memory" SET properties = %s WHERE ag_catalog.agtype_access_operator(properties, '"id"'::agtype) = %s::agtype """ @@ -438,7 +436,7 @@ def delete_node(self, id: str, user_name: str | None = None) -> None: user_name (str, optional): User name for filtering in non-multi-db mode """ query = f""" - DELETE FROM "{self.db_name}_graph"."Memory" + DELETE FROM "{self.db_name}_graph"."Memory" WHERE ag_catalog.agtype_access_operator(properties, '"id"'::agtype) = %s::agtype """ params = [f'"{id}"'] @@ -462,7 +460,7 @@ def create_extension(self): try: with self.connection.cursor() as cursor: # Ensure in the correct database context - cursor.execute(f"SELECT current_database();") + cursor.execute("SELECT current_database();") current_db = cursor.fetchone()[0] print(f"Current database context: {current_db}") @@ -487,7 +485,7 @@ def create_graph(self): try: with self.connection.cursor() as cursor: cursor.execute(f""" - SELECT COUNT(*) FROM ag_catalog.ag_graph + SELECT COUNT(*) FROM ag_catalog.ag_graph WHERE name = '{self.db_name}_graph'; """) graph_exists = cursor.fetchone()[0] > 0 @@ -664,11 +662,11 @@ def edge_exists( # Prepare the match pattern with direction if direction == "OUTGOING": - pattern = f"(a:Memory)-[r]->(b:Memory)" + pattern = "(a:Memory)-[r]->(b:Memory)" elif direction == "INCOMING": - pattern = f"(a:Memory)<-[r]-(b:Memory)" + pattern = "(a:Memory)<-[r]-(b:Memory)" elif direction == "ANY": - pattern = f"(a:Memory)-[r]-(b:Memory)" + pattern = "(a:Memory)-[r]-(b:Memory)" else: raise ValueError( f"Invalid direction: {direction}. Must be 'OUTGOING', 'INCOMING', or 'ANY'." @@ -720,7 +718,7 @@ def format_param_value(value: str) -> str: query = f""" SELECT {select_fields} - FROM "{self.db_name}_graph"."Memory" + FROM "{self.db_name}_graph"."Memory" WHERE ag_catalog.agtype_access_operator(properties, '"id"'::agtype) = %s::agtype """ params = [format_param_value(id)] @@ -806,7 +804,7 @@ def get_nodes( query = f""" SELECT id, properties, embedding - FROM "{self.db_name}_graph"."Memory" + FROM "{self.db_name}_graph"."Memory" WHERE ({where_clause}) """ @@ -893,15 +891,15 @@ def get_edges_old( # Create indexes cursor.execute(f""" - CREATE INDEX IF NOT EXISTS idx_edges_source + CREATE INDEX IF NOT EXISTS idx_edges_source ON "{self.db_name}_graph"."Edges" (source_id); """) cursor.execute(f""" - CREATE INDEX IF NOT EXISTS idx_edges_target + CREATE INDEX IF NOT EXISTS idx_edges_target ON "{self.db_name}_graph"."Edges" (target_id); """) cursor.execute(f""" - CREATE INDEX IF NOT EXISTS idx_edges_type + CREATE INDEX IF NOT EXISTS idx_edges_type ON "{self.db_name}_graph"."Edges" (edge_type); """) except Exception as e: @@ -998,7 +996,7 @@ def get_neighbors_by_tag_old( # Get all candidate nodes query = f""" SELECT id, properties, embedding - FROM "{self.db_name}_graph"."Memory" + FROM "{self.db_name}_graph"."Memory" WHERE {where_clause} """ @@ -1061,7 +1059,7 @@ def get_children_with_embeddings( SELECT * FROM cypher('{self.db_name}_graph', $$ MATCH (p:Memory)-[r:PARENT]->(c:Memory) - WHERE p.id = '{id}' {where_user} + WHERE p.id = '{id}' {where_user} RETURN id(c) as cid, c.id AS id, c.memory AS memory $$) as (cid agtype, id agtype, memory agtype) ) @@ -1518,7 +1516,7 @@ def get_grouped_counts1( MATCH (n:Memory) {where_clause} RETURN {group_fields_cypher}, COUNT(n) AS count1 - $$ ) as ({group_fields_cypher_polardb}, count1 agtype); + $$ ) as ({group_fields_cypher_polardb}, count1 agtype); """ print("get_grouped_counts:" + query) try: @@ -1673,8 +1671,8 @@ def clear(self, user_name: str | None = None) -> None: try: query = f""" SELECT * FROM cypher('{self.db_name}_graph', $$ - MATCH (n:Memory) - WHERE n.user_name = '{user_name}' + MATCH (n:Memory) + WHERE n.user_name = '{user_name}' DETACH DELETE n $$) AS (result agtype) """ @@ -1765,7 +1763,7 @@ def export_graph( SELECT * FROM cypher('{self.db_name}_graph', $$ MATCH (a:Memory)-[r]->(b:Memory) WHERE a.user_name = '{user_name}' AND b.user_name = '{user_name}' - RETURN a.id AS source, b.id AS target, type(r) as edge + RETURN a.id AS source, b.id AS target, type(r) as edge $$) AS (source agtype, target agtype, edge agtype) """ @@ -1803,7 +1801,7 @@ def count_nodes(self, scope: str, user_name: str | None = None) -> int: query = f""" SELECT * FROM cypher('{self.db_name}_graph', $$ MATCH (n:Memory) - WHERE n.memory_type = '{scope}' + WHERE n.memory_type = '{scope}' AND n.user_name = '{user_name}' RETURN count(n) $$) AS (count agtype) @@ -1842,8 +1840,8 @@ def get_all_memory_items( LIMIT 100 $$) AS (id1 agtype,n agtype) ) - SELECT - m.embedding, + SELECT + m.embedding, t.n FROM t, {self.db_name}_graph."Memory" m @@ -1939,8 +1937,8 @@ def get_all_memory_items_old( LIMIT 100 $$) AS (id1 agtype,n agtype) ) - SELECT - m.embedding, + SELECT + m.embedding, t.n FROM t, {self.db_name}_graph."Memory" m @@ -2107,8 +2105,8 @@ def get_structure_optimization_candidates( WITH t as ( {cypher_query} ) - SELECT - m.embedding, + SELECT + m.embedding, t.n FROM t, {self.db_name}_graph."Memory" m @@ -2321,7 +2319,7 @@ def add_node( with self.connection.cursor() as cursor: # Delete existing record first (if any) delete_query = f""" - DELETE FROM {self.db_name}_graph."Memory" + DELETE FROM {self.db_name}_graph."Memory" WHERE id = ag_catalog._make_graph_id('{self.db_name}_graph'::name, 'Memory'::name, %s::text::cstring) """ cursor.execute(delete_query, (id,)) @@ -2456,7 +2454,7 @@ def get_neighbors_by_tag( # Fetch all candidate nodes query = f""" SELECT id, properties, embedding - FROM "{self.db_name}_graph"."Memory" + FROM "{self.db_name}_graph"."Memory" WHERE {where_clause} """ @@ -2732,13 +2730,13 @@ def get_edges( user_name = user_name if user_name else self._get_config_value("user_name") if direction == "OUTGOING": - pattern = f"(a:Memory)-[r]->(b:Memory)" + pattern = "(a:Memory)-[r]->(b:Memory)" where_clause = f"a.id = '{id}'" elif direction == "INCOMING": - pattern = f"(a:Memory)<-[r]-(b:Memory)" + pattern = "(a:Memory)<-[r]-(b:Memory)" where_clause = f"a.id = '{id}'" elif direction == "ANY": - pattern = f"(a:Memory)-[r]-(b:Memory)" + pattern = "(a:Memory)-[r]-(b:Memory)" where_clause = f"a.id = '{id}' OR b.id = '{id}'" else: raise ValueError("Invalid direction. Must be 'OUTGOING', 'INCOMING', or 'ANY'.") diff --git a/src/memos/llms/openai.py b/src/memos/llms/openai.py index 698bc3265..ca1df5c1f 100644 --- a/src/memos/llms/openai.py +++ b/src/memos/llms/openai.py @@ -11,6 +11,7 @@ from memos.llms.utils import remove_thinking_tags from memos.log import get_logger from memos.types import MessageList +from memos.utils import timed logger = get_logger(__name__) @@ -56,6 +57,7 @@ def clear_cache(cls): cls._instances.clear() logger.info("OpenAI LLM instance cache cleared") + @timed(log=True, log_prefix="OpenAI LLM") def generate(self, messages: MessageList) -> str: """Generate a response from OpenAI LLM.""" response = self.client.chat.completions.create( @@ -73,6 +75,7 @@ def generate(self, messages: MessageList) -> str: else: return response_content + @timed(log=True, log_prefix="OpenAI LLM") def generate_stream(self, messages: MessageList, **kwargs) -> Generator[str, None, None]: """Stream response from OpenAI LLM with optional reasoning support.""" response = self.client.chat.completions.create( diff --git a/src/memos/log.py b/src/memos/log.py index e1a32aa11..2a538fdde 100644 --- a/src/memos/log.py +++ b/src/memos/log.py @@ -172,13 +172,13 @@ def close(self): "disable_existing_loggers": False, "formatters": { "standard": { - "format": "%(asctime)s | %(trace_id)s | url=%(api_path)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s | %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" + "format": "%(asctime)s | %(trace_id)s | path=%(api_path)s | env=%(env)s | user_type=%(user_type)s | user_name=%(user_name)s | %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" }, "no_datetime": { - "format": "%(trace_id)s | url=%(api_path)s | %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" + "format": "%(trace_id)s | path=%(api_path)s | %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s" }, "simplified": { - "format": "%(asctime)s | %(trace_id)s | url=%(api_path)s | % %(levelname)s | %(filename)s:%(lineno)d: %(funcName)s | %(message)s" + "format": "%(asctime)s | %(trace_id)s | path=%(api_path)s | % %(levelname)s | %(filename)s:%(lineno)d: %(funcName)s | %(message)s" }, }, "filters": { @@ -187,7 +187,7 @@ def close(self): }, "handlers": { "console": { - "level": selected_log_level, + "level": "DEBUG", "class": "logging.StreamHandler", "stream": stdout, "formatter": "no_datetime", diff --git a/src/memos/reranker/http_bge.py b/src/memos/reranker/http_bge.py index 2c423e6b6..41011df14 100644 --- a/src/memos/reranker/http_bge.py +++ b/src/memos/reranker/http_bge.py @@ -9,10 +9,10 @@ import requests from memos.log import get_logger +from memos.utils import timed from .base import BaseReranker from .concat import concat_original_source -from memos.utils import timed logger = get_logger(__name__) @@ -119,7 +119,7 @@ def __init__( self.warn_unknown_filter_keys = bool(warn_unknown_filter_keys) self._warned_missing_keys: set[str] = set() - @timed + @timed(log=True, log_prefix="RerankerAPI") def rerank( self, query: str, diff --git a/src/memos/reranker/http_bge_strategy.py b/src/memos/reranker/http_bge_strategy.py index 8cbf633a6..b0567698c 100644 --- a/src/memos/reranker/http_bge_strategy.py +++ b/src/memos/reranker/http_bge_strategy.py @@ -10,6 +10,7 @@ from memos.log import get_logger from memos.reranker.strategies import RerankerStrategyFactory +from memos.utils import timed from .base import BaseReranker @@ -119,6 +120,7 @@ def __init__( self._warned_missing_keys: set[str] = set() self.reranker_strategy = RerankerStrategyFactory.from_config(reranker_strategy) + @timed(log=True, log_prefix="RerankerStrategy") def rerank( self, query: str, diff --git a/src/memos/utils.py b/src/memos/utils.py index 5801bc2d2..08934ed34 100644 --- a/src/memos/utils.py +++ b/src/memos/utils.py @@ -1,21 +1,29 @@ import time -from memos import settings from memos.log import get_logger logger = get_logger(__name__) -def timed(func): - """Decorator to measure and log time of retrieval steps.""" +def timed(func=None, *, log=False, log_prefix=""): + """Decorator to measure and optionally log time of retrieval steps. - def wrapper(*args, **kwargs): - start = time.perf_counter() - result = func(*args, **kwargs) - elapsed = time.perf_counter() - start - if settings.DEBUG: - logger.info(f"[TIMER] {func.__name__} took {elapsed:.2f} s") - return result + Can be used as @timed or @timed(log=True) + """ - return wrapper + def decorator(fn): + def wrapper(*args, **kwargs): + start = time.perf_counter() + result = fn(*args, **kwargs) + elapsed = time.perf_counter() - start + if log: + logger.info(f"[TIMER] {log_prefix or fn.__name__} took {elapsed:.2f} seconds") + return result + + return wrapper + + # Handle both @timed and @timed(log=True) cases + if func is None: + return decorator + return decorator(func) From 9fea59b7b06ee21824856497fa9b39f6d38e10c7 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Mon, 3 Nov 2025 14:28:35 +0800 Subject: [PATCH 33/36] feat: add request log --- src/memos/api/middleware/request_context.py | 58 ++++++++++++++++++--- src/memos/api/routers/server_router.py | 5 ++ 2 files changed, 57 insertions(+), 6 deletions(-) diff --git a/src/memos/api/middleware/request_context.py b/src/memos/api/middleware/request_context.py index 2922ab3eb..90fa9f757 100644 --- a/src/memos/api/middleware/request_context.py +++ b/src/memos/api/middleware/request_context.py @@ -2,6 +2,7 @@ Request context middleware for automatic trace_id injection. """ +import json import time from collections.abc import Callable @@ -26,6 +27,43 @@ def extract_trace_id_from_headers(request: Request) -> str | None: return None +async def get_request_params(request: Request) -> tuple[dict, bytes | None]: + """ + Extract request parameters (query params and body) for logging. + + Args: + request: The incoming request object + + Returns: + Tuple of (params_dict, body_bytes). body_bytes is None if body was not read. + """ + params_log = {} + + # Get query parameters + if request.query_params: + params_log["query_params"] = dict(request.query_params) + + # Get request body for requests with body + body_bytes = None + content_type = request.headers.get("content-type", "") + if request.method in ("POST", "PUT", "PATCH", "DELETE") and content_type: + try: + body_bytes = await request.body() + if body_bytes: + if "application/json" in content_type.lower(): + try: + params_log["body"] = json.loads(body_bytes) + except (json.JSONDecodeError, UnicodeDecodeError) as e: + params_log["body"] = f"" + else: + # For non-JSON requests, log body size only + params_log["body_size"] = len(body_bytes) + except Exception as e: + logger.error(f"Failed to read request body: {e}") + + return params_log, body_bytes + + class RequestContextMiddleware(BaseHTTPMiddleware): """ Middleware to automatically inject request context for every HTTP request. @@ -55,14 +93,22 @@ async def dispatch(self, request: Request, call_next: Callable) -> Response: ) set_request_context(context) - # Log request start with parameters - params_log = {} + # Get request parameters for logging + params_log, body_bytes = await get_request_params(request) - # Get query parameters - if request.query_params: - params_log["query_params"] = dict(request.query_params) + # Re-create the request receive function if body was read + # This ensures downstream handlers can still read the body + if body_bytes is not None: - logger.info(f"Request started, params: {params_log}, headers: {request.headers}") + async def receive(): + return {"type": "http.request", "body": body_bytes, "more_body": False} + + request._receive = receive + + logger.info( + f"Request started, method: {request.method}, path: {request.url.path}, " + f"request params: {params_log}, headers: {request.headers}" + ) # Process the request try: diff --git a/src/memos/api/routers/server_router.py b/src/memos/api/routers/server_router.py index 3ba12c1ce..e2609cfef 100644 --- a/src/memos/api/routers/server_router.py +++ b/src/memos/api/routers/server_router.py @@ -387,6 +387,8 @@ def _search_pref(): memories_result, pref_formatted_memories, search_req.mem_cube_id, search_req.handle_pref_mem ) + logger.info(f"Search memories result: {memories_result}") + return SearchResponse( message="Search completed successfully", data=memories_result, @@ -538,6 +540,9 @@ def _process_pref_mem() -> list[dict[str, str]]: text_response_data = text_future.result() pref_response_data = pref_future.result() + logger.info(f"add_memories Text response data: {text_response_data}") + logger.info(f"add_memories Pref response data: {pref_response_data}") + return MemoryResponse( message="Memory added successfully", data=text_response_data + pref_response_data, From 4b72a63a5d13480ca8c73c255d533c2632e49404 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Mon, 3 Nov 2025 14:49:48 +0800 Subject: [PATCH 34/36] feat: add request log --- src/memos/api/middleware/request_context.py | 143 ++++++++++++++++---- 1 file changed, 118 insertions(+), 25 deletions(-) diff --git a/src/memos/api/middleware/request_context.py b/src/memos/api/middleware/request_context.py index 90fa9f757..74865b66f 100644 --- a/src/memos/api/middleware/request_context.py +++ b/src/memos/api/middleware/request_context.py @@ -3,6 +3,7 @@ """ import json +import os import time from collections.abc import Callable @@ -18,6 +19,9 @@ logger = memos.log.get_logger(__name__) +# Maximum body size to read for logging (in bytes) - bodies larger than this will be skipped +MAX_BODY_LOG_SIZE = os.getenv("MAX_BODY_LOG_SIZE", 10 * 1024) + def extract_trace_id_from_headers(request: Request) -> str | None: """Extract trace_id from various possible headers with priority: g-trace-id > x-trace-id > trace-id.""" @@ -27,41 +31,125 @@ def extract_trace_id_from_headers(request: Request) -> str | None: return None +def _is_json_request(request: Request) -> tuple[bool, str]: + """ + Check if request is a JSON request. + + Args: + request: The request object + + Returns: + Tuple of (is_json, content_type) + """ + if request.method not in ("POST", "PUT", "PATCH", "DELETE"): + return False, "" + + content_type = request.headers.get("content-type", "") + if not content_type: + return False, "" + + is_json = "application/json" in content_type.lower() + return is_json, content_type + + +def _should_read_body(content_length: str | None) -> tuple[bool, int | None]: + """ + Check if body should be read based on content-length header. + + Args: + content_length: Content-Length header value + + Returns: + Tuple of (should_read, body_size). body_size is None if header is invalid. + """ + if not content_length: + return True, None + + try: + body_size = int(content_length) + return body_size <= MAX_BODY_LOG_SIZE, body_size + except ValueError: + return True, None + + +def _create_body_info(content_type: str, body_size: int) -> dict: + """Create body_info dict for large bodies that are skipped.""" + return { + "content_type": content_type, + "content_length": body_size, + "note": f"body too large ({body_size} bytes), skipping read", + } + + +def _parse_json_body(body_bytes: bytes) -> dict | str: + """ + Parse JSON body bytes. + + Args: + body_bytes: Raw body bytes + + Returns: + Parsed JSON dict, or error message string if parsing fails + """ + try: + return json.loads(body_bytes) + except (json.JSONDecodeError, UnicodeDecodeError) as e: + return f"" + + async def get_request_params(request: Request) -> tuple[dict, bytes | None]: """ Extract request parameters (query params and body) for logging. + Only reads body for application/json requests that are within size limits. + + This function is wrapped with exception handling to ensure logging failures + don't affect the actual request processing. + Args: request: The incoming request object Returns: Tuple of (params_dict, body_bytes). body_bytes is None if body was not read. + Returns empty dict and None on any error. """ - params_log = {} + try: + params_log = {} - # Get query parameters - if request.query_params: - params_log["query_params"] = dict(request.query_params) + # Check if this is a JSON request + is_json, content_type = _is_json_request(request) + if not is_json: + return params_log, None - # Get request body for requests with body - body_bytes = None - content_type = request.headers.get("content-type", "") - if request.method in ("POST", "PUT", "PATCH", "DELETE") and content_type: - try: - body_bytes = await request.body() - if body_bytes: - if "application/json" in content_type.lower(): - try: - params_log["body"] = json.loads(body_bytes) - except (json.JSONDecodeError, UnicodeDecodeError) as e: - params_log["body"] = f"" - else: - # For non-JSON requests, log body size only - params_log["body_size"] = len(body_bytes) - except Exception as e: - logger.error(f"Failed to read request body: {e}") + # Pre-check body size using content-length header + content_length = request.headers.get("content-length") + should_read, body_size = _should_read_body(content_length) + + if not should_read and body_size is not None: + params_log["body_info"] = _create_body_info(content_type, body_size) + return params_log, None + + # Read body + body_bytes = await request.body() + + if not body_bytes: + return params_log, None + + # Post-check: verify actual size (content-length might be missing or wrong) + actual_size = len(body_bytes) + if actual_size > MAX_BODY_LOG_SIZE: + params_log["body_info"] = _create_body_info(content_type, actual_size) + return params_log, None + + # Parse JSON body + params_log["body"] = _parse_json_body(body_bytes) + return params_log, body_bytes - return params_log, body_bytes + except Exception as e: + # Catch-all for any unexpected errors + logger.error(f"Unexpected error in get_request_params: {e}", exc_info=True) + # Return empty dict to ensure request can continue + return {}, None class RequestContextMiddleware(BaseHTTPMiddleware): @@ -94,16 +182,21 @@ async def dispatch(self, request: Request, call_next: Callable) -> Response: set_request_context(context) # Get request parameters for logging + # Wrap in try-catch to ensure logging failures don't break the request params_log, body_bytes = await get_request_params(request) # Re-create the request receive function if body was read # This ensures downstream handlers can still read the body if body_bytes is not None: + try: - async def receive(): - return {"type": "http.request", "body": body_bytes, "more_body": False} + async def receive(): + return {"type": "http.request", "body": body_bytes, "more_body": False} - request._receive = receive + request._receive = receive + except Exception as e: + logger.error(f"Failed to recreate request receive function: {e}") + # Continue without restoring body, downstream handlers will handle it logger.info( f"Request started, method: {request.method}, path: {request.url.path}, " From 258ea927b489580ef764e75849c9ffa8163425aa Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Mon, 3 Nov 2025 16:19:57 +0800 Subject: [PATCH 35/36] feat: add source in request --- src/memos/api/middleware/request_context.py | 23 +++++++++++++++++---- src/memos/api/product_api.py | 2 +- src/memos/api/server_api.py | 2 +- 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/src/memos/api/middleware/request_context.py b/src/memos/api/middleware/request_context.py index 74865b66f..6cbf6d549 100644 --- a/src/memos/api/middleware/request_context.py +++ b/src/memos/api/middleware/request_context.py @@ -162,6 +162,17 @@ class RequestContextMiddleware(BaseHTTPMiddleware): 3. Ensures the context is available throughout the request lifecycle """ + def __init__(self, app, source: str | None = None): + """ + Initialize the middleware. + + Args: + app: The ASGI application + source: Source identifier (e.g., 'product' or 'server') to distinguish request origin + """ + super().__init__(app) + self.source = source + async def dispatch(self, request: Request, call_next: Callable) -> Response: # Extract or generate trace_id trace_id = extract_trace_id_from_headers(request) or generate_trace_id() @@ -178,6 +189,7 @@ async def dispatch(self, request: Request, call_next: Callable) -> Response: env=env, user_type=user_type, user_name=user_name, + source=self.source, ) set_request_context(context) @@ -198,8 +210,9 @@ async def receive(): logger.error(f"Failed to recreate request receive function: {e}") # Continue without restoring body, downstream handlers will handle it + source_info = f", source: {self.source}" if self.source else "" logger.info( - f"Request started, method: {request.method}, path: {request.url.path}, " + f"Request started, method: {request.method}, path: {request.url.path}{source_info}, " f"request params: {params_log}, headers: {request.headers}" ) @@ -207,18 +220,20 @@ async def receive(): try: response = await call_next(request) end_time = time.time() + source_info = f", source: {self.source}" if self.source else "" if response.status_code == 200: logger.info( - f"Request completed: {request.url.path}, status: {response.status_code}, cost: {(end_time - start_time) * 1000:.2f}ms" + f"Request completed: {request.url.path}{source_info}, status: {response.status_code}, cost: {(end_time - start_time) * 1000:.2f}ms" ) else: logger.error( - f"Request Failed: {request.url.path}, status: {response.status_code}, cost: {(end_time - start_time) * 1000:.2f}ms" + f"Request Failed: {request.url.path}{source_info}, status: {response.status_code}, cost: {(end_time - start_time) * 1000:.2f}ms" ) except Exception as e: end_time = time.time() + source_info = f", source: {self.source}" if self.source else "" logger.error( - f"Request Exception Error: {e}, cost: {(end_time - start_time) * 1000:.2f}ms" + f"Request Exception Error: {e}{source_info}, cost: {(end_time - start_time) * 1000:.2f}ms" ) raise e diff --git a/src/memos/api/product_api.py b/src/memos/api/product_api.py index 709ad74fb..ec5cccae1 100644 --- a/src/memos/api/product_api.py +++ b/src/memos/api/product_api.py @@ -17,7 +17,7 @@ version="1.0.1", ) -app.add_middleware(RequestContextMiddleware) +app.add_middleware(RequestContextMiddleware, source="product_api") # Include routers app.include_router(product_router) diff --git a/src/memos/api/server_api.py b/src/memos/api/server_api.py index 24c67de48..0dfef99d9 100644 --- a/src/memos/api/server_api.py +++ b/src/memos/api/server_api.py @@ -18,7 +18,7 @@ version="1.0.1", ) -app.add_middleware(RequestContextMiddleware) +app.add_middleware(RequestContextMiddleware, source="server_api") # Include routers app.include_router(server_router) From 76ce57e5bb7516953b5f27f279cd458ba659aa02 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Mon, 3 Nov 2025 16:50:47 +0800 Subject: [PATCH 36/36] feat: source --- src/memos/api/middleware/request_context.py | 2 +- src/memos/context/context.py | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/memos/api/middleware/request_context.py b/src/memos/api/middleware/request_context.py index c3e1d50b5..488f59625 100644 --- a/src/memos/api/middleware/request_context.py +++ b/src/memos/api/middleware/request_context.py @@ -171,7 +171,7 @@ def __init__(self, app, source: str | None = None): source: Source identifier (e.g., 'product' or 'server') to distinguish request origin """ super().__init__(app) - self.source = source + self.source = source or "api" async def dispatch(self, request: Request, call_next: Callable) -> Response: # Extract or generate trace_id diff --git a/src/memos/context/context.py b/src/memos/context/context.py index d6a0f3bf1..b5d4c24fe 100644 --- a/src/memos/context/context.py +++ b/src/memos/context/context.py @@ -36,12 +36,14 @@ def __init__( env: str | None = None, user_type: str | None = None, user_name: str | None = None, + source: str | None = None, ): self.trace_id = trace_id or "trace-id" self.api_path = api_path self.env = env self.user_type = user_type self.user_name = user_name + self.source = source self._data: dict[str, Any] = {} def set(self, key: str, value: Any) -> None: @@ -59,6 +61,7 @@ def __setattr__(self, name: str, value: Any) -> None: "env", "user_type", "user_name", + "source", ): super().__setattr__(name, value) else: @@ -80,6 +83,7 @@ def to_dict(self) -> dict[str, Any]: "env": self.env, "user_type": self.user_type, "user_name": self.user_name, + "source": self.source, "data": self._data.copy(), } @@ -146,6 +150,16 @@ def get_current_user_name() -> str | None: return "memos" +def get_current_source() -> str | None: + """ + Get the current request's source (e.g., 'product_api' or 'server_api'). + """ + context = _request_context.get() + if context: + return context.get("source") + return None + + def get_current_context() -> RequestContext | None: """ Get the current request context. @@ -161,6 +175,7 @@ def get_current_context() -> RequestContext | None: env=context_dict.get("env"), user_type=context_dict.get("user_type"), user_name=context_dict.get("user_name"), + source=context_dict.get("source"), ) ctx._data = context_dict.get("data", {}).copy() return ctx