From 83d157689c5d1dc4a2a10d8176e81dff97e2b9c6 Mon Sep 17 00:00:00 2001 From: chentang Date: Thu, 18 Dec 2025 14:38:19 +0800 Subject: [PATCH 1/5] fix bugs: try to fix bugs in _submit_web_logs --- src/memos/mem_scheduler/base_scheduler.py | 37 ++++++++++------------- 1 file changed, 16 insertions(+), 21 deletions(-) diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index 81defaa0f..9ab356f1d 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -846,28 +846,23 @@ def _submit_web_logs( f"[DIAGNOSTIC] base_scheduler._submit_web_logs called. Message to publish: {message.model_dump_json(indent=2)}" ) - if self.rabbitmq_config is None: - logger.info( - "[DIAGNOSTIC] base_scheduler._submit_web_logs: RabbitMQ config not loaded; skipping publish." - ) - return - - for message in messages: - message_info = message.debug_info() - logger.info(f"[DIAGNOSTIC] base_scheduler._submit_web_logs: submitted {message_info}") + try: + for message in messages: + # Always call publish; the publisher now caches when offline and flushes after reconnect + logger.info( + f"[DIAGNOSTIC] base_scheduler._submit_web_logs: enqueue publish {message.model_dump_json(indent=2)}" + ) + self.rabbitmq_publish_message(message=message.to_dict()) + logger.info( + "[DIAGNOSTIC] base_scheduler._submit_web_logs: publish dispatched " + "item_id=%s task_id=%s label=%s", + message.item_id, + message.task_id, + message.label, + ) + except Exception as e: + logger.error(f"[DIAGNOSTIC] base_scheduler._submit_web_logs failed: {e}", exc_info=True) - # Always call publish; the publisher now caches when offline and flushes after reconnect - logger.info( - f"[DIAGNOSTIC] base_scheduler._submit_web_logs: enqueue publish {message_info}" - ) - self.rabbitmq_publish_message(message=message.to_dict()) - logger.info( - "[DIAGNOSTIC] base_scheduler._submit_web_logs: publish dispatched " - "item_id=%s task_id=%s label=%s", - message.item_id, - message.task_id, - message.label, - ) logger.debug( f"{len(messages)} submitted. {self._web_log_message_queue.qsize()} in queue. additional_log_info: {additional_log_info}" ) From e50c56cf817cb6d63b8e8e882aeaa4de12c444b8 Mon Sep 17 00:00:00 2001 From: chentang Date: Thu, 18 Dec 2025 15:00:28 +0800 Subject: [PATCH 2/5] fix bugs: try to address bugs --- src/memos/mem_scheduler/base_scheduler.py | 13 +++++-------- .../webservice_modules/rabbitmq_service.py | 6 ++++-- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index 9ab356f1d..1e0ecaadb 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -842,12 +842,7 @@ def _submit_web_logs( messages = [messages] # transform single message to list for message in messages: - logger.info( - f"[DIAGNOSTIC] base_scheduler._submit_web_logs called. Message to publish: {message.model_dump_json(indent=2)}" - ) - - try: - for message in messages: + try: # Always call publish; the publisher now caches when offline and flushes after reconnect logger.info( f"[DIAGNOSTIC] base_scheduler._submit_web_logs: enqueue publish {message.model_dump_json(indent=2)}" @@ -860,8 +855,10 @@ def _submit_web_logs( message.task_id, message.label, ) - except Exception as e: - logger.error(f"[DIAGNOSTIC] base_scheduler._submit_web_logs failed: {e}", exc_info=True) + except Exception as e: + logger.error( + f"[DIAGNOSTIC] base_scheduler._submit_web_logs failed: {e}", exc_info=True + ) logger.debug( f"{len(messages)} submitted. {self._web_log_message_queue.qsize()} in queue. additional_log_info: {additional_log_info}" diff --git a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py index a8a09760c..db8320879 100644 --- a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py +++ b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py @@ -7,6 +7,8 @@ from pathlib import Path from queue import Empty +from pyglet.libs.win32.constants import FALSE + from memos.configs.mem_scheduler import AuthConfig, RabbitMQConfig from memos.context.context import ContextThread from memos.dependency import require_python_package @@ -325,14 +327,14 @@ def rabbitmq_publish_message(self, message: dict): f"[DIAGNOSTIC] Publishing {label} message in Cloud Env. " f"Exchange: {exchange_name}, Routing Key: '{routing_key}'." ) - logger.info(f" - Message Content: {json.dumps(message, indent=2)}") + logger.info(f" - Message Content: {json.dumps(message, indent=2, ensure_ascii=FALSE)}") elif label == "knowledgeBaseUpdate": # Original diagnostic logging for knowledgeBaseUpdate if NOT in cloud env logger.info( f"[DIAGNOSTIC] Publishing knowledgeBaseUpdate message (Local Env). " f"Current configured Exchange: {exchange_name}, Routing Key: '{routing_key}'." ) - logger.info(f" - Message Content: {json.dumps(message, indent=2)}") + logger.info(f" - Message Content: {json.dumps(message, indent=2, ensure_ascii=FALSE)}") with self._rabbitmq_lock: logger.info( From 58eb6b81af34437677e929e629f25dd3ddf0c1ff Mon Sep 17 00:00:00 2001 From: chentang Date: Thu, 18 Dec 2025 15:13:21 +0800 Subject: [PATCH 3/5] fix bugs --- .../mem_scheduler/webservice_modules/rabbitmq_service.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py index db8320879..43d24c5b9 100644 --- a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py +++ b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py @@ -7,8 +7,6 @@ from pathlib import Path from queue import Empty -from pyglet.libs.win32.constants import FALSE - from memos.configs.mem_scheduler import AuthConfig, RabbitMQConfig from memos.context.context import ContextThread from memos.dependency import require_python_package @@ -327,14 +325,14 @@ def rabbitmq_publish_message(self, message: dict): f"[DIAGNOSTIC] Publishing {label} message in Cloud Env. " f"Exchange: {exchange_name}, Routing Key: '{routing_key}'." ) - logger.info(f" - Message Content: {json.dumps(message, indent=2, ensure_ascii=FALSE)}") + logger.info(f" - Message Content: {json.dumps(message, indent=2, ensure_ascii=False)}") elif label == "knowledgeBaseUpdate": # Original diagnostic logging for knowledgeBaseUpdate if NOT in cloud env logger.info( f"[DIAGNOSTIC] Publishing knowledgeBaseUpdate message (Local Env). " f"Current configured Exchange: {exchange_name}, Routing Key: '{routing_key}'." ) - logger.info(f" - Message Content: {json.dumps(message, indent=2, ensure_ascii=FALSE)}") + logger.info(f" - Message Content: {json.dumps(message, indent=2, ensure_ascii=False)}") with self._rabbitmq_lock: logger.info( From 0d72ce7669f3a9b30aa6849893a0e6ec6f991063 Mon Sep 17 00:00:00 2001 From: chentang Date: Thu, 18 Dec 2025 15:59:20 +0800 Subject: [PATCH 4/5] refactor: modify examples --- examples/mem_scheduler/memos_w_scheduler.py | 40 --------------------- 1 file changed, 40 deletions(-) diff --git a/examples/mem_scheduler/memos_w_scheduler.py b/examples/mem_scheduler/memos_w_scheduler.py index 09aec4cba..ef7d853df 100644 --- a/examples/mem_scheduler/memos_w_scheduler.py +++ b/examples/mem_scheduler/memos_w_scheduler.py @@ -4,7 +4,6 @@ from datetime import datetime from pathlib import Path -from queue import Queue from memos.configs.mem_cube import GeneralMemCubeConfig from memos.configs.mem_os import MOSConfig @@ -12,7 +11,6 @@ from memos.log import get_logger from memos.mem_cube.general import GeneralMemCube from memos.mem_os.main import MOS -from memos.mem_scheduler.general_scheduler import GeneralScheduler from memos.mem_scheduler.schemas.message_schemas import ScheduleLogForWebItem from memos.mem_scheduler.schemas.task_schemas import ( ADD_TASK_LABEL, @@ -160,42 +158,6 @@ def _first_content() -> str: return title, _truncate_with_rules(_first_content()) -def show_web_logs(mem_scheduler: GeneralScheduler): - """Display all web log entries from the scheduler's log queue. - - Args: - mem_scheduler: The scheduler instance containing web logs to display - """ - if mem_scheduler._web_log_message_queue.empty(): - print("Web log queue is currently empty.") - return - - print("\n" + "=" * 50 + " WEB LOGS " + "=" * 50) - - # Create a temporary queue to preserve the original queue contents - temp_queue = Queue() - collected: list[ScheduleLogForWebItem] = [] - - while not mem_scheduler._web_log_message_queue.empty(): - log_item: ScheduleLogForWebItem = mem_scheduler._web_log_message_queue.get() - collected.append(log_item) - temp_queue.put(log_item) - - for idx, log_item in enumerate(sorted(collected, key=lambda x: x.timestamp, reverse=True), 1): - title, content = _format_entry(log_item) - print(f"\nLog Entry #{idx}:") - print(title) - print(content) - print("-" * 50) - - # Restore items back to the original queue - while not temp_queue.empty(): - mem_scheduler._web_log_message_queue.put(temp_queue.get()) - - print(f"\nTotal {len(collected)} web log entries displayed.") - print("=" * 110 + "\n") - - def run_with_scheduler_init(): print("==== run_with_automatic_scheduler_init ====") conversations, questions = init_task() @@ -253,8 +215,6 @@ def run_with_scheduler_init(): response = mos.chat(query=query, user_id=user_id) print(f"Answer:\n {response}\n") - show_web_logs(mem_scheduler=mos.mem_scheduler) - mos.mem_scheduler.stop() From 2fe965be240ea0e68c511b5573d88e9599b7cbd2 Mon Sep 17 00:00:00 2001 From: chentang Date: Thu, 18 Dec 2025 20:06:40 +0800 Subject: [PATCH 5/5] revise add operation and fix an unbelievable bug --- .../mem_scheduler/try_schedule_modules.py | 47 ------------------- src/memos/mem_reader/simple_struct.py | 2 +- .../webservice_modules/rabbitmq_service.py | 3 +- src/memos/templates/mem_reader_prompts.py | 39 ++++++++------- 4 files changed, 21 insertions(+), 70 deletions(-) diff --git a/examples/mem_scheduler/try_schedule_modules.py b/examples/mem_scheduler/try_schedule_modules.py index a5c5bc737..d942aad4e 100644 --- a/examples/mem_scheduler/try_schedule_modules.py +++ b/examples/mem_scheduler/try_schedule_modules.py @@ -1,8 +1,6 @@ import sys from pathlib import Path -from queue import Queue -from typing import TYPE_CHECKING from tqdm import tqdm @@ -11,18 +9,11 @@ ) from memos.log import get_logger from memos.mem_scheduler.analyzer.api_analyzer import DirectSearchMemoriesAnalyzer -from memos.mem_scheduler.base_scheduler import BaseScheduler from memos.mem_scheduler.optimized_scheduler import OptimizedScheduler from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem from memos.mem_scheduler.schemas.task_schemas import MEM_UPDATE_TASK_LABEL -if TYPE_CHECKING: - from memos.mem_scheduler.schemas import ( - ScheduleLogForWebItem, - ) - - FILE_PATH = Path(__file__).absolute() BASE_DIR = FILE_PATH.parent.parent.parent sys.path.insert(0, str(BASE_DIR)) # Enable execution from any working directory @@ -105,41 +96,6 @@ def init_task(): return conversations, questions -def show_web_logs(mem_scheduler: BaseScheduler): - """Display all web log entries from the scheduler's log queue. - - Args: - mem_scheduler: The scheduler instance containing web logs to display - """ - if mem_scheduler._web_log_message_queue.empty(): - print("Web log queue is currently empty.") - return - - print("\n" + "=" * 50 + " WEB LOGS " + "=" * 50) - - # Create a temporary queue to preserve the original queue contents - temp_queue = Queue() - log_count = 0 - - while not mem_scheduler._web_log_message_queue.empty(): - log_item: ScheduleLogForWebItem = mem_scheduler._web_log_message_queue.get() - temp_queue.put(log_item) - log_count += 1 - - # Print log entry details - print(f"\nLog Entry #{log_count}:") - print(f'- "{log_item.label}" log: {log_item}') - - print("-" * 50) - - # Restore items back to the original queue - while not temp_queue.empty(): - mem_scheduler._web_log_message_queue.put(temp_queue.get()) - - print(f"\nTotal {log_count} web log entries displayed.") - print("=" * 110 + "\n") - - class ScheduleModulesRunner(DirectSearchMemoriesAnalyzer): def __init__(self): super().__init__() @@ -215,6 +171,3 @@ def add_msgs( mem_scheduler._memory_update_consumer( messages=[message], ) - - # Show accumulated web logs - show_web_logs(mem_scheduler) diff --git a/src/memos/mem_reader/simple_struct.py b/src/memos/mem_reader/simple_struct.py index ac79c246b..b870bf70a 100644 --- a/src/memos/mem_reader/simple_struct.py +++ b/src/memos/mem_reader/simple_struct.py @@ -522,7 +522,7 @@ def filter_hallucination_in_memories( raw = self.llm.generate([{"role": "user", "content": prompt}]) success, parsed = self._parse_hallucination_filter_response(raw) logger.info( - f"[filter_hallucination_in_memories] Hallucination filter parsed successfully: {success}" + f"[filter_hallucination_in_memories] Hallucination filter parsed successfully: {success};prompt: {prompt}" ) if success: logger.info(f"Hallucination filter result: {parsed}") diff --git a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py index 43d24c5b9..46b2ad3d1 100644 --- a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py +++ b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py @@ -108,8 +108,7 @@ def initialize_rabbitmq( elif Path(config_path).exists(): auth_config = AuthConfig.from_local_config(config_path=config_path) else: - logger.error("Fail to initialize auth_config") - return + auth_config = AuthConfig.from_local_env() self.rabbitmq_config = auth_config.rabbitmq elif isinstance(config, RabbitMQConfig): self.rabbitmq_config = config diff --git a/src/memos/templates/mem_reader_prompts.py b/src/memos/templates/mem_reader_prompts.py index 12c445df7..fef3ee6c0 100644 --- a/src/memos/templates/mem_reader_prompts.py +++ b/src/memos/templates/mem_reader_prompts.py @@ -625,21 +625,20 @@ SIMPLE_STRUCT_HALLUCINATION_FILTER_PROMPT = """ You are a strict, language-preserving memory validator and rewriter. -Your task is to compare each memory against the provided user messages (the ground truth) and produce a corrected version only when necessary. Always preserve the original language of the memory—do not translate. +Your task is to eliminate hallucinations and tighten memories by grounding them strictly in the user’s explicit messages. Memories must be factual, unambiguous, and free of any inferred or speculative content. Rules: -1. **Language Consistency**: The rewritten memory must be in the exact same language as the original input memory. Never switch languages. -2. **Strict Grounding**: Only use information explicitly stated in the user messages. Do not introduce external facts, assumptions, or common sense. -3. **Ambiguity Resolution**: - - Replace vague pronouns (e.g., "he", "it", "they") or unclear references with specific, unambiguous entities based solely on the messages. - - Convert relative time expressions (e.g., "yesterday", "last week", "in two days") into absolute dates or times **only if the messages provide enough context** (e.g., current date is known or implied). -4. **Handling Assistant Inferences**: - - If a memory contains any content **not directly stated by the user**—such as interpretations, summaries, emotional attributions, predictions, causal claims, or generalizations—this is considered an assistant inference. - - In such cases, you **must** set `need_rewrite = true`. - - The `rewritten` text **must explicitly indicate that the statement is an inference**, using a clear and natural prefix in the memory’s language. For English memories, use: - > "The assistant inferred that [rest of the memory]." - - Do **not** present inferred content as factual user statements. -5. **No Rewrite Needed**: If the memory is factually accurate, fully grounded in the messages, unambiguous, and contains no unsupported content, set `need_rewrite = false` and copy the original memory exactly. +1. **Language Consistency**: Keep the exact original language of each memory—no translation or language switching. +2. **Strict Factual Grounding**: Include only what the user explicitly stated. Remove or flag anything not directly present in the messages—no assumptions, interpretations, predictions, emotional labels, summaries, or generalizations. +3. **Ambiguity Elimination**: + - Replace vague pronouns (e.g., “he”, “it”, “they”) with clear, specific entities **only if** the messages identify them. + - Convert relative time expressions (e.g., “yesterday”) to absolute dates **only if** the messages provide enough temporal context. +4. **Hallucination Removal**: + - If a memory contains **any content not verbatim or directly implied by the user**, it must be rewritten. + - Do **not** rephrase inferences as facts. Instead, either: + - Remove the unsupported part and retain only the grounded core, or + - If the entire memory is ungrounded, mark it for rewrite and make the lack of user support explicit. +5. **No Change if Fully Grounded**: If the memory is concise, unambiguous, and fully supported by the user’s messages, keep it unchanged. Inputs: messages: @@ -649,15 +648,15 @@ {memories_inline} Output Format: -- Return a JSON object with string keys ("0", "1", "2", ...) corresponding to the input memory indices. +- Return a JSON object with string keys ("0", "1", "2", ...) matching input memory indices. - Each value must be: {{ "need_rewrite": boolean, "rewritten": string, "reason": string }} -- The "reason" should be concise and specific, e.g.: - - "contains assistant inference not stated by user" - - "pronoun 'it' has no clear referent in messages" - - "relative time 'yesterday' converted to 2025-12-16" - - "accurate and directly supported by user message" +- The "reason" must be brief and precise, e.g.: + - "contains unsupported inference" + - "vague pronoun with no referent in messages" + - "relative time resolved to 2025-12-16" + - "fully grounded and concise" -Important: Output **only** the JSON. No additional text, explanations, markdown, or fields. +Important: Output **only** the JSON. No extra text, explanations, markdown, or fields. """