diff --git a/examples/mem_feedback/example_feedback.py b/examples/mem_feedback/example_feedback.py index 8f4446863..794ddf111 100644 --- a/examples/mem_feedback/example_feedback.py +++ b/examples/mem_feedback/example_feedback.py @@ -144,7 +144,7 @@ def init_components(): mem_reader=mem_reader, searcher=searcher, reranker=mem_reranker, - pref_mem=None, + pref_feedback=True, ) return feedback_server, memory_manager, embedder diff --git a/src/memos/api/handlers/__init__.py b/src/memos/api/handlers/__init__.py index 90347768c..bd4c9f4b0 100644 --- a/src/memos/api/handlers/__init__.py +++ b/src/memos/api/handlers/__init__.py @@ -32,7 +32,6 @@ ) from memos.api.handlers.formatters_handler import ( format_memory_item, - post_process_pref_mem, to_iter, ) @@ -54,7 +53,6 @@ "formatters_handler", "init_server", "memory_handler", - "post_process_pref_mem", "scheduler_handler", "search_handler", "suggestion_handler", diff --git a/src/memos/api/handlers/add_handler.py b/src/memos/api/handlers/add_handler.py index 3cdbedabf..e9ed4f955 100644 --- a/src/memos/api/handlers/add_handler.py +++ b/src/memos/api/handlers/add_handler.py @@ -22,7 +22,7 @@ class AddHandler(BaseHandler): """ Handler for memory addition operations. - Handles both text and preference memory additions with sync/async support. + Handles text memory additions with sync/async support. """ def __init__(self, dependencies: HandlerDependencies): @@ -41,7 +41,7 @@ def handle_add_memories(self, add_req: APIADDRequest) -> MemoryResponse: """ Main handler for add memories endpoint. - Orchestrates the addition of both text and preference memories, + Orchestrates the addition of text memories, supporting concurrent processing. Args: diff --git a/src/memos/api/handlers/component_init.py b/src/memos/api/handlers/component_init.py index ba527d602..aa2525878 100644 --- a/src/memos/api/handlers/component_init.py +++ b/src/memos/api/handlers/component_init.py @@ -19,11 +19,7 @@ build_llm_config, build_mem_reader_config, build_nli_client_config, - build_pref_adder_config, - build_pref_extractor_config, - build_pref_retriever_config, build_reranker_config, - build_vec_db_config, ) from memos.configs.mem_scheduler import SchedulerConfigFactory from memos.embedders.factory import EmbedderFactory @@ -36,12 +32,6 @@ from memos.mem_reader.factory import MemReaderFactory from memos.mem_scheduler.orm_modules.base_model import BaseDBManager from memos.mem_scheduler.scheduler_factory import SchedulerFactory -from memos.memories.textual.prefer_text_memory.factory import ( - AdderFactory, - ExtractorFactory, - RetrieverFactory, -) -from memos.memories.textual.simple_preference import SimplePreferenceTextMemory from memos.memories.textual.simple_tree import SimpleTreeTextMemory from memos.memories.textual.tree_text_memory.organize.history_manager import MemoryHistoryManager from memos.memories.textual.tree_text_memory.organize.manager import MemoryManager @@ -56,7 +46,6 @@ InternetRetrieverFactory, ) from memos.reranker.factory import RerankerFactory -from memos.vec_dbs.factory import VecDBFactory if TYPE_CHECKING: @@ -125,7 +114,7 @@ def init_server() -> dict[str, Any]: required by the MemOS server, including: - Database connections (graph DB, vector DB) - Language models and embedders - - Memory systems (text, preference) + - Memory systems (text) - Scheduler and related modules Returns: @@ -169,20 +158,11 @@ def init_server() -> dict[str, Any]: reranker_config = build_reranker_config() feedback_reranker_config = build_feedback_reranker_config() internet_retriever_config = build_internet_retriever_config() - vector_db_config = build_vec_db_config() - pref_extractor_config = build_pref_extractor_config() - pref_adder_config = build_pref_adder_config() - pref_retriever_config = build_pref_retriever_config() logger.debug("Component configurations built successfully") # Create component instances graph_db = GraphStoreFactory.from_config(graph_db_config) - vector_db = ( - VecDBFactory.from_config(vector_db_config) - if os.getenv("ENABLE_PREFERENCE_MEMORY", "false") == "true" - else None - ) llm = LLMFactory.from_config(llm_config) chat_llms = ( _init_chat_llms(chat_llm_config) @@ -231,61 +211,6 @@ def init_server() -> dict[str, Any]: logger.debug("Text memory initialized") - # Initialize preference memory components - pref_extractor = ( - ExtractorFactory.from_config( - config_factory=pref_extractor_config, - llm_provider=llm, - embedder=embedder, - vector_db=vector_db, - ) - if os.getenv("ENABLE_PREFERENCE_MEMORY", "false") == "true" - else None - ) - - pref_adder = ( - AdderFactory.from_config( - config_factory=pref_adder_config, - llm_provider=llm, - embedder=embedder, - vector_db=vector_db, - text_mem=text_mem, - ) - if os.getenv("ENABLE_PREFERENCE_MEMORY", "false") == "true" - else None - ) - - pref_retriever = ( - RetrieverFactory.from_config( - config_factory=pref_retriever_config, - llm_provider=llm, - embedder=embedder, - reranker=feedback_reranker, - vector_db=vector_db, - ) - if os.getenv("ENABLE_PREFERENCE_MEMORY", "false") == "true" - else None - ) - - logger.debug("Preference memory components initialized") - - # Initialize preference memory - pref_mem = ( - SimplePreferenceTextMemory( - extractor_llm=llm, - vector_db=vector_db, - embedder=embedder, - reranker=feedback_reranker, - extractor=pref_extractor, - adder=pref_adder, - retriever=pref_retriever, - ) - if os.getenv("ENABLE_PREFERENCE_MEMORY", "false") == "true" - else None - ) - - logger.debug("Preference memory initialized") - # Initialize MOS Server mos_server = MOSServer( mem_reader=mem_reader, @@ -298,7 +223,6 @@ def init_server() -> dict[str, Any]: # Create MemCube with pre-initialized memory instances naive_mem_cube = NaiveMemCube( text_mem=text_mem, - pref_mem=pref_mem, act_mem=None, para_mem=None, ) @@ -325,7 +249,7 @@ def init_server() -> dict[str, Any]: mem_reader=mem_reader, searcher=searcher, reranker=feedback_reranker, - pref_mem=pref_mem, + pref_feedback=True, ) # Initialize Scheduler @@ -384,12 +308,7 @@ def init_server() -> dict[str, Any]: "naive_mem_cube": naive_mem_cube, "searcher": searcher, "api_module": api_module, - "vector_db": vector_db, - "pref_extractor": pref_extractor, - "pref_adder": pref_adder, - "pref_retriever": pref_retriever, "text_mem": text_mem, - "pref_mem": pref_mem, "online_bot": online_bot, "feedback_server": feedback_server, "redis_client": redis_client, diff --git a/src/memos/api/handlers/formatters_handler.py b/src/memos/api/handlers/formatters_handler.py index 06c4fd223..ee88ae639 100644 --- a/src/memos/api/handlers/formatters_handler.py +++ b/src/memos/api/handlers/formatters_handler.py @@ -65,49 +65,14 @@ def format_memory_item( return memory -def post_process_pref_mem( - memories_result: dict[str, Any], - pref_formatted_mem: list[dict[str, Any]], - mem_cube_id: str, - include_preference: bool, -) -> dict[str, Any]: - """ - Post-process preference memory results. - - Adds formatted preference memories to the result dictionary and generates - instruction completion strings if preferences are included. - - Args: - memories_result: Result dictionary to update - pref_formatted_mem: List of formatted preference memories - mem_cube_id: Memory cube ID - include_preference: Whether to include preferences in result - - Returns: - Updated memories_result dictionary - """ - if include_preference: - memories_result["pref_mem"].append( - { - "cube_id": mem_cube_id, - "memories": pref_formatted_mem, - "total_nodes": len(pref_formatted_mem), - } - ) - pref_instruction, pref_note = instruct_completion(pref_formatted_mem) - memories_result["pref_string"] = pref_instruction - memories_result["pref_note"] = pref_note - - return memories_result - - def post_process_textual_mem( memories_result: dict[str, Any], text_formatted_mem: list[dict[str, Any]], mem_cube_id: str, ) -> dict[str, Any]: """ - Post-process text and tool memory results. + Post-process text, tool, skill and preference memory results. + Now automatically handles preference memories. """ fact_mem = [ mem @@ -124,6 +89,11 @@ def post_process_textual_mem( mem for mem in text_formatted_mem if mem["metadata"]["memory_type"] == "SkillMemory" ] + # Extract preference memories + pref_mem = [ + mem for mem in text_formatted_mem if mem["metadata"]["memory_type"] == "PreferenceMemory" + ] + memories_result["text_mem"].append( { "cube_id": mem_cube_id, @@ -145,6 +115,19 @@ def post_process_textual_mem( "total_nodes": len(skill_mem), } ) + + memories_result["pref_mem"].append( + { + "cube_id": mem_cube_id, + "memories": pref_mem, + "total_nodes": len(pref_mem), + } + ) + if pref_mem: + pref_instruction, pref_note = instruct_completion(pref_mem) + memories_result["pref_string"] = pref_instruction + memories_result["pref_note"] = pref_note + return memories_result diff --git a/src/memos/api/handlers/memory_handler.py b/src/memos/api/handlers/memory_handler.py index ef56c7489..2ab8f50c7 100644 --- a/src/memos/api/handlers/memory_handler.py +++ b/src/memos/api/handlers/memory_handler.py @@ -4,12 +4,8 @@ This module handles retrieving all memories or specific subgraphs based on queries. """ -from typing import TYPE_CHECKING, Any, Literal +from typing import Any, Literal -from memos.api.handlers.formatters_handler import ( - format_memory_item, - post_process_pref_mem, -) from memos.api.product_models import ( DeleteMemoryRequest, DeleteMemoryResponse, @@ -29,10 +25,6 @@ ) -if TYPE_CHECKING: - from memos.memories.textual.preference import TextualMemoryItem - - logger = get_logger(__name__) @@ -171,8 +163,7 @@ def handle_get_subgraph( def handle_get_memory(memory_id: str, naive_mem_cube: NaiveMemCube) -> GetMemoryResponse: """ Handler for getting a single memory by its ID. - - Tries to retrieve from text memory first, then preference memory if not found. + Now unified to retrieve from text_mem only (includes preferences). Args: memory_id: The ID of the memory to retrieve @@ -184,37 +175,12 @@ def handle_get_memory(memory_id: str, naive_mem_cube: NaiveMemCube) -> GetMemory try: memory = naive_mem_cube.text_mem.get(memory_id) - except Exception: + except Exception as e: + logger.error(f"Failed to get memory {memory_id}: {e}") memory = None - # If not found in text memory, try preference memory - pref = None - if memory is None and naive_mem_cube.pref_mem is not None: - collection_names = ["explicit_preference", "implicit_preference"] - for collection_name in collection_names: - try: - pref = naive_mem_cube.pref_mem.get_with_collection_name(collection_name, memory_id) - if pref is not None: - break - except Exception: - continue - - # Get the data from whichever memory source succeeded - data = (memory or pref).model_dump() if (memory or pref) else None - - if data is not None: - # For each returned item, tackle with metadata.info project_id / - # operation / manager_user_id - metadata = data.get("metadata", None) - if metadata is not None and isinstance(metadata, dict): - info = metadata.get("info", None) - if info is not None and isinstance(info, dict): - for key in ("project_id", "operation", "manager_user_id"): - if key not in info: - continue - value = info.pop(key) - if key not in metadata: - metadata[key] = value + # Get the data + data = memory.model_dump() if memory else None return GetMemoryResponse( message="Memory retrieved successfully" @@ -230,50 +196,20 @@ def handle_get_memory_by_ids( ) -> GetMemoryResponse: """ Handler for getting multiple memories by their IDs. + Now unified to retrieve from text_mem only (includes preferences). Retrieves multiple memories and formats them as a list of dictionaries. """ try: memories = naive_mem_cube.text_mem.get_by_ids(memory_ids=memory_ids) - except Exception: + except Exception as e: + logger.error(f"Failed to get memories: {e}") memories = [] # Ensure memories is not None if memories is None: memories = [] - if naive_mem_cube.pref_mem is not None: - collection_names = ["explicit_preference", "implicit_preference"] - for collection_name in collection_names: - try: - result = naive_mem_cube.pref_mem.get_by_ids_with_collection_name( - collection_name, memory_ids - ) - if result is not None: - result = [format_memory_item(item, save_sources=False) for item in result] - memories.extend(result) - except Exception: - continue - - # For each returned item, tackle with metadata.info project_id / - # operation / manager_user_id - for item in memories: - if not isinstance(item, dict): - continue - metadata = item.get("metadata") - if not isinstance(metadata, dict): - continue - info = metadata.get("info") - if not isinstance(info, dict): - continue - - for key in ("project_id", "operation", "manager_user_id"): - if key not in info: - continue - value = info.pop(key) - if key not in metadata: - metadata[key] = value - return GetMemoryResponse( message="Memories retrieved successfully", code=200, data={"memories": memories} ) @@ -343,67 +279,31 @@ def handle_get_memories( "total_nodes": total_skill_nodes, } ] - preferences: list[TextualMemoryItem] = [] - total_preference_nodes = 0 - format_preferences = [] - if get_mem_req.include_preference and naive_mem_cube.pref_mem is not None: - filter_params: dict[str, Any] = {} - if get_mem_req.user_id is not None: - filter_params["user_id"] = get_mem_req.user_id - if get_mem_req.mem_cube_id is not None: - filter_params["mem_cube_id"] = get_mem_req.mem_cube_id - if get_mem_req.filter is not None: - # Check and remove user_id/mem_cube_id from filter if present - filter_copy = get_mem_req.filter.copy() - removed_fields = [] - - if "user_id" in filter_copy: - filter_copy.pop("user_id") - removed_fields.append("user_id") - if "mem_cube_id" in filter_copy: - filter_copy.pop("mem_cube_id") - removed_fields.append("mem_cube_id") - - if removed_fields: - logger.warning( - f"Fields {removed_fields} found in filter will be ignored. " - f"Use request-level user_id/mem_cube_id parameters instead." - ) - - filter_params.update(filter_copy) - - preferences, total_preference_nodes = naive_mem_cube.pref_mem.get_memory_by_filter( - filter_params, page=get_mem_req.page, page_size=get_mem_req.page_size + # Get preference memories (same pattern as other memory types) + if get_mem_req.include_preference: + pref_memories_info = naive_mem_cube.text_mem.get_all( + user_name=get_mem_req.mem_cube_id, + user_id=get_mem_req.user_id, + page=get_mem_req.page, + page_size=get_mem_req.page_size, + filter=get_mem_req.filter, + memory_type=["PreferenceMemory"], ) - format_preferences = [format_memory_item(item, save_sources=False) for item in preferences] - - # For each returned item, tackle with metadata.info project_id / - # operation / manager_user_id - for item in format_preferences: - if not isinstance(item, dict): - continue - metadata = item.get("metadata") - if not isinstance(metadata, dict): - continue - info = metadata.get("info") - if not isinstance(info, dict): - continue - - for key in ("project_id", "operation", "manager_user_id"): - if key not in info: - continue - value = info.pop(key) - if key not in metadata: - metadata[key] = value - - results = post_process_pref_mem( - results, format_preferences, get_mem_req.mem_cube_id, get_mem_req.include_preference - ) - if total_preference_nodes > 0 and results.get("pref_mem", []): - results["pref_mem"][0]["total_nodes"] = total_preference_nodes + pref_memories, total_pref_nodes = ( + pref_memories_info["nodes"], + pref_memories_info["total_nodes"], + ) + + results["pref_mem"] = [ + { + "cube_id": get_mem_req.mem_cube_id, + "memories": pref_memories, + "total_nodes": total_pref_nodes, + } + ] - # Filter to only keep text_mem, pref_mem, tool_mem + # Filter to only keep text_mem, pref_mem, tool_mem, skill_mem filtered_results = { "text_mem": results.get("text_mem", []), "pref_mem": results.get("pref_mem", []), @@ -415,6 +315,10 @@ def handle_get_memories( def handle_delete_memories(delete_mem_req: DeleteMemoryRequest, naive_mem_cube: NaiveMemCube): + """ + Handler for deleting memories. + Now unified to delete from text_mem only (includes preferences). + """ logger.info( f"[Delete memory request] writable_cube_ids: {delete_mem_req.writable_cube_ids}, memory_ids: {delete_mem_req.memory_ids}" ) @@ -432,17 +336,14 @@ def handle_delete_memories(delete_mem_req: DeleteMemoryRequest, naive_mem_cube: try: if delete_mem_req.memory_ids is not None: + # Unified deletion from text_mem (includes preferences) naive_mem_cube.text_mem.delete_by_memory_ids(delete_mem_req.memory_ids) - if naive_mem_cube.pref_mem is not None: - naive_mem_cube.pref_mem.delete(delete_mem_req.memory_ids) elif delete_mem_req.file_ids is not None: naive_mem_cube.text_mem.delete_by_filter( writable_cube_ids=delete_mem_req.writable_cube_ids, file_ids=delete_mem_req.file_ids ) elif delete_mem_req.filter is not None: naive_mem_cube.text_mem.delete_by_filter(filter=delete_mem_req.filter) - if naive_mem_cube.pref_mem is not None: - naive_mem_cube.pref_mem.delete_by_filter(filter=delete_mem_req.filter) except Exception as e: logger.error(f"Failed to delete memories: {e}", exc_info=True) return DeleteMemoryResponse( @@ -572,49 +473,29 @@ def handle_get_memories_dashboard( for cube_id, memories in skill_mem_by_cube.items() ] - preferences: list[TextualMemoryItem] = [] - - format_preferences = [] - if get_mem_req.include_preference and naive_mem_cube.pref_mem is not None: - filter_params: dict[str, Any] = {} - if get_mem_req.user_id is not None: - filter_params["user_id"] = get_mem_req.user_id - if get_mem_req.mem_cube_id is not None: - filter_params["mem_cube_id"] = get_mem_req.mem_cube_id - if get_mem_req.filter is not None: - # Check and remove user_id/mem_cube_id from filter if present - filter_copy = get_mem_req.filter.copy() - removed_fields = [] - - if "user_id" in filter_copy: - filter_copy.pop("user_id") - removed_fields.append("user_id") - if "mem_cube_id" in filter_copy: - filter_copy.pop("mem_cube_id") - removed_fields.append("mem_cube_id") - - if removed_fields: - logger.warning( - f"Fields {removed_fields} found in filter will be ignored. " - f"Use request-level user_id/mem_cube_id parameters instead." - ) - - filter_params.update(filter_copy) - - preferences, total_preference_nodes = naive_mem_cube.pref_mem.get_memory_by_filter( - filter_params, page=get_mem_req.page, page_size=get_mem_req.page_size + if get_mem_req.include_preference: + pref_memories_info = naive_mem_cube.text_mem.get_all( + user_name=get_mem_req.mem_cube_id, + user_id=get_mem_req.user_id, + page=get_mem_req.page, + page_size=get_mem_req.page_size, + filter=get_mem_req.filter, + memory_type=["PreferenceMemory"], + ) + pref_memories, total_preference_nodes = ( + pref_memories_info["nodes"], + pref_memories_info["total_nodes"], ) - format_preferences = [format_memory_item(item, save_sources=False) for item in preferences] - # Group preferences by cube_id from metadata.mem_cube_id + # Group preference memories by cube_id from metadata.user_name pref_mem_by_cube: dict[str, list] = {} - for pref in format_preferences: - cube_id = pref.get("metadata", {}).get("mem_cube_id", get_mem_req.mem_cube_id) + for memory in pref_memories: + cube_id = memory.get("metadata", {}).get("user_name", get_mem_req.mem_cube_id) if cube_id not in pref_mem_by_cube: pref_mem_by_cube[cube_id] = [] - pref_mem_by_cube[cube_id].append(pref) + pref_mem_by_cube[cube_id].append(memory) - # If no preferences found, create a default entry with the requested cube_id + # If no memories found, create a default entry with the requested cube_id if not pref_mem_by_cube and get_mem_req.mem_cube_id: pref_mem_by_cube[get_mem_req.mem_cube_id] = [] diff --git a/src/memos/api/handlers/search_handler.py b/src/memos/api/handlers/search_handler.py index 8e7785ad5..58121776e 100644 --- a/src/memos/api/handlers/search_handler.py +++ b/src/memos/api/handlers/search_handler.py @@ -49,7 +49,7 @@ def handle_search_memories(self, search_req: APISearchRequest) -> SearchResponse Main handler for search memories endpoint. Orchestrates the search process based on the requested search mode, - supporting both text and preference memory searches. + supporting text memory searches. Args: search_req: Search request containing query and parameters diff --git a/src/memos/api/product_models.py b/src/memos/api/product_models.py index 5bf27e985..6f112b9a7 100644 --- a/src/memos/api/product_models.py +++ b/src/memos/api/product_models.py @@ -434,7 +434,7 @@ class APISearchRequest(BaseRequest): # Internal field for search memory type search_memory_type: str = Field( "All", - description="Type of memory to search: All, WorkingMemory, LongTermMemory, UserMemory, OuterMemory, ToolSchemaMemory, ToolTrajectoryMemory, RawFileMemory, AllSummaryMemory, SkillMemory", + description="Type of memory to search: All, WorkingMemory, LongTermMemory, UserMemory, OuterMemory, ToolSchemaMemory, ToolTrajectoryMemory, RawFileMemory, AllSummaryMemory, SkillMemory, PreferenceMemory", ) # ==== Context ==== diff --git a/src/memos/api/routers/server_router.py b/src/memos/api/routers/server_router.py index af6ae4fe5..fa8a0b396 100644 --- a/src/memos/api/routers/server_router.py +++ b/src/memos/api/routers/server_router.py @@ -94,7 +94,6 @@ redis_client = components["redis_client"] status_tracker = TaskStatusTracker(redis_client=redis_client) graph_db = components["graph_db"] -vector_db = components["vector_db"] # ============================================================================= @@ -369,15 +368,9 @@ def feedback_memories(feedback_req: APIFeedbackRequest): response_model=GetUserNamesByMemoryIdsResponse, ) def get_user_names_by_memory_ids(request: GetUserNamesByMemoryIdsRequest): - """Get user names by memory ids.""" + """Get user names by memory ids. Now unified to query from graph_db only.""" result = graph_db.get_user_names_by_memory_ids(memory_ids=request.memory_ids) - if vector_db: - prefs = [] - for collection_name in ["explicit_preference", "implicit_preference"]: - prefs.extend( - vector_db.get_by_ids(collection_name=collection_name, ids=request.memory_ids) - ) - result.update({pref.id: pref.payload.get("mem_cube_id", None) for pref in prefs}) + return GetUserNamesByMemoryIdsResponse( code=200, message="Successfully", diff --git a/src/memos/mem_cube/navie.py b/src/memos/mem_cube/navie.py index 3afa78bab..b9395ea0d 100644 --- a/src/memos/mem_cube/navie.py +++ b/src/memos/mem_cube/navie.py @@ -20,7 +20,6 @@ class NaiveMemCube(BaseMemCube): def __init__( self, text_mem: BaseTextMemory | None = None, - pref_mem: BaseTextMemory | None = None, act_mem: BaseActMemory | None = None, para_mem: BaseParaMemory | None = None, ): @@ -28,19 +27,20 @@ def __init__( self._text_mem: BaseTextMemory = text_mem self._act_mem: BaseActMemory | None = act_mem self._para_mem: BaseParaMemory | None = para_mem - self._pref_mem: BaseTextMemory | None = pref_mem + # pref_mem removed - now handled by text_mem def load( self, dir: str, - memory_types: list[Literal["text_mem", "act_mem", "para_mem", "pref_mem"]] | None = None, + memory_types: list[Literal["text_mem", "act_mem", "para_mem"]] | None = None, ) -> None: """Load memories. Args: dir (str): The directory containing the memory files. memory_types (list[str], optional): List of memory types to load. If None, loads all available memory types. - Options: ["text_mem", "act_mem", "para_mem", "pref_mem"] + Options: ["text_mem", "act_mem", "para_mem"] + Note: pref_mem is now integrated into text_mem """ loaded_schema = get_json_file_model_schema(os.path.join(dir, self.config.config_filename)) if loaded_schema != self.config.model_schema: @@ -51,7 +51,7 @@ def load( # If no specific memory types specified, load all if memory_types is None: - memory_types = ["text_mem", "act_mem", "para_mem", "pref_mem"] + memory_types = ["text_mem", "act_mem", "para_mem"] # Load specified memory types if "text_mem" in memory_types and self.text_mem: @@ -66,23 +66,20 @@ def load( self.para_mem.load(dir) logger.info(f"Loaded para_mem from {dir}") - if "pref_mem" in memory_types and self.pref_mem: - self.pref_mem.load(dir) - logger.info(f"Loaded pref_mem from {dir}") - logger.info(f"MemCube loaded successfully from {dir} (types: {memory_types})") def dump( self, dir: str, - memory_types: list[Literal["text_mem", "act_mem", "para_mem", "pref_mem"]] | None = None, + memory_types: list[Literal["text_mem", "act_mem", "para_mem"]] | None = None, ) -> None: """Dump memories. Args: dir (str): The directory where the memory files will be saved. memory_types (list[str], optional): List of memory types to dump. If None, dumps all available memory types. - Options: ["text_mem", "act_mem", "para_mem", "pref_mem"] + Options: ["text_mem", "act_mem", "para_mem"] + Note: pref_mem is now integrated into text_mem """ if os.path.exists(dir) and os.listdir(dir): raise MemCubeError( @@ -94,7 +91,7 @@ def dump( # If no specific memory types specified, dump all if memory_types is None: - memory_types = ["text_mem", "act_mem", "para_mem", "pref_mem"] + memory_types = ["text_mem", "act_mem", "para_mem"] # Dump specified memory types if "text_mem" in memory_types and self.text_mem: @@ -109,10 +106,6 @@ def dump( self.para_mem.dump(dir) logger.info(f"Dumped para_mem to {dir}") - if "pref_mem" in memory_types and self.pref_mem: - self.pref_mem.dump(dir) - logger.info(f"Dumped pref_mem to {dir}") - logger.info(f"MemCube dumped successfully to {dir} (types: {memory_types})") @property @@ -157,16 +150,4 @@ def para_mem(self, value: BaseParaMemory) -> None: raise TypeError(f"Expected BaseParaMemory, got {type(value).__name__}") self._para_mem = value - @property - def pref_mem(self) -> "BaseTextMemory | None": - """Get the preference memory.""" - if self._pref_mem is None: - logger.warning("Preference memory is not initialized. Returning None.") - return self._pref_mem - - @pref_mem.setter - def pref_mem(self, value: BaseTextMemory) -> None: - """Set the preference memory.""" - if not isinstance(value, BaseTextMemory): - raise TypeError(f"Expected BaseTextMemory, got {type(value).__name__}") - self._pref_mem = value + # pref_mem property removed - preferences now handled by text_mem diff --git a/src/memos/mem_feedback/feedback.py b/src/memos/mem_feedback/feedback.py index 6c6d1821f..18045af2c 100644 --- a/src/memos/mem_feedback/feedback.py +++ b/src/memos/mem_feedback/feedback.py @@ -2,7 +2,6 @@ import difflib import json import re -import uuid from datetime import datetime from typing import TYPE_CHECKING, Any, Literal @@ -36,7 +35,6 @@ if TYPE_CHECKING: - from memos.memories.textual.simple_preference import SimplePreferenceTextMemory from memos.memories.textual.tree_text_memory.retrieve.searcher import Searcher from memos.templates.mem_feedback_prompts import ( FEEDBACK_ANSWER_PROMPT, @@ -95,7 +93,6 @@ def __init__(self, config: MemFeedbackConfig): self.stopword_manager = StopwordManager self.searcher: Searcher = None self.reranker = None - self.pref_mem: SimplePreferenceTextMemory = None self.pref_feedback: bool = False self.DB_IDX_READY = False @@ -239,6 +236,9 @@ def _single_add_operation( else: to_add_memory = new_memory_item.model_copy(deep=True) + if to_add_memory.metadata.memory_type == "PreferenceMemory": + to_add_memory.metadata.preference = new_memory_item.memory + to_add_memory.metadata.created_at = to_add_memory.metadata.updated_at = ( datetime.now().isoformat() ) @@ -274,13 +274,6 @@ def _single_update_operation( """ Individual update operations """ - if "preference" in old_memory_item.metadata.__dict__: - logger.info( - f"[0107 Feedback Core: _single_update_operation] pref_memory: {old_memory_item.id}" - ) - return self._single_update_pref( - old_memory_item, new_memory_item, user_id, user_name, operation - ) memory_type = old_memory_item.metadata.memory_type source_doc_id = ( @@ -329,68 +322,6 @@ def _single_update_operation( "origin_memory": old_memory_item.memory, } - def _single_update_pref( - self, - old_memory_item: TextualMemoryItem, - new_memory_item: TextualMemoryItem, - user_id: str, - user_name: str, - operation: dict, - ): - """update preference memory""" - - feedback_context = new_memory_item.memory - if operation and "text" in operation and operation["text"]: - new_memory_item.memory = operation["text"] - new_memory_item.metadata.embedding = self._batch_embed([operation["text"]])[0] - - to_add_memory = old_memory_item.model_copy(deep=True) - to_add_memory.metadata.key = new_memory_item.metadata.key - to_add_memory.metadata.tags = new_memory_item.metadata.tags - to_add_memory.memory = new_memory_item.memory - to_add_memory.metadata.preference = new_memory_item.memory - to_add_memory.metadata.embedding = new_memory_item.metadata.embedding - - to_add_memory.metadata.user_id = new_memory_item.metadata.user_id - to_add_memory.metadata.original_text = old_memory_item.memory - to_add_memory.metadata.covered_history = old_memory_item.id - - to_add_memory.metadata.created_at = to_add_memory.metadata.updated_at = ( - datetime.now().isoformat() - ) - to_add_memory.metadata.context_summary = ( - old_memory_item.metadata.context_summary + " \n" + feedback_context - ) - - # add new memory - to_add_memory.id = str(uuid.uuid4()) - added_ids = self._retry_db_operation(lambda: self.pref_mem.add([to_add_memory])) - # delete - deleted_id = old_memory_item.id - collection_name = old_memory_item.metadata.preference_type - self._retry_db_operation( - lambda: self.pref_mem.delete_with_collection_name(collection_name, [deleted_id]) - ) - # add archived - old_memory_item.metadata.status = "archived" - old_memory_item.metadata.original_text = "archived" - old_memory_item.metadata.embedding = [0.0] * 1024 - - archived_ids = self._retry_db_operation(lambda: self.pref_mem.add([old_memory_item])) - - logger.info( - f"[Memory Feedback UPDATE Pref] New Add:{added_ids!s} | Set archived:{archived_ids!s}" - ) - - return { - "id": to_add_memory.id, - "text": new_memory_item.memory, - "source_doc_id": "", - "archived_id": old_memory_item.id, - "origin_memory": old_memory_item.memory, - "type": "preference", - } - def _del_working_binding(self, user_name, mem_items: list[TextualMemoryItem]) -> set[str]: """Delete working memory bindings""" bindings_to_delete = extract_working_binding_ids(mem_items) @@ -460,7 +391,7 @@ def semantics_feedback( for chunk in memory_chunks: chunk_list = [] for item in chunk: - if "preference" in item.metadata.__dict__: + if item.metadata.memory_type == "PreferenceMemory": chunk_list.append(f"{item.id}: {item.metadata.preference}") else: chunk_list.append(f"{item.id}: {item.memory}") @@ -638,6 +569,19 @@ def check_has_edges(mem_item: TextualMemoryItem) -> tuple[TextualMemoryItem, boo ) text_mems = [item[0] for item in text_mems if float(item[1]) > 0.01] + if self.pref_feedback: + pref_mems = self.searcher.search( + query, + info=info, + memory_type="PreferenceMemory", + user_name=user_name, + top_k=top_k, + include_preference_memory=True, + full_recall=True, + ) + pref_mems = [item[0] for item in pref_mems if float(item[1]) > 0.01] + text_mems.extend(pref_mems) + # Memory with edges is not modified by feedback retrieved_mems = [] with ContextThreadPoolExecutor(max_workers=10) as executor: @@ -656,14 +600,7 @@ def check_has_edges(mem_item: TextualMemoryItem) -> tuple[TextualMemoryItem, boo f"text memories are not modified by feedback due to edges." ) - if self.pref_feedback: - pref_info = {} - if "user_id" in info: - pref_info = {"user_id": info["user_id"]} - retrieved_prefs = self.pref_mem.search(query, top_k, pref_info) - return retrieved_mems + retrieved_prefs - else: - return retrieved_mems + return retrieved_mems def _vec_query(self, new_memories_embedding: list[float], user_name=None): """Vector retrieval query""" diff --git a/src/memos/mem_feedback/simple_feedback.py b/src/memos/mem_feedback/simple_feedback.py index 2ac0a0a39..dfc9b9fdf 100644 --- a/src/memos/mem_feedback/simple_feedback.py +++ b/src/memos/mem_feedback/simple_feedback.py @@ -4,7 +4,6 @@ from memos.llms.factory import AzureLLM, OllamaLLM, OpenAILLM from memos.mem_feedback.feedback import MemFeedback from memos.mem_reader.simple_struct import SimpleStructMemReader -from memos.memories.textual.simple_preference import SimplePreferenceTextMemory from memos.memories.textual.tree_text_memory.organize.manager import MemoryManager from memos.memories.textual.tree_text_memory.retrieve.retrieve_utils import StopwordManager from memos.memories.textual.tree_text_memory.retrieve.searcher import Searcher @@ -24,7 +23,6 @@ def __init__( mem_reader: SimpleStructMemReader, searcher: Searcher, reranker: BaseReranker, - pref_mem: SimplePreferenceTextMemory, pref_feedback: bool = False, ): self.llm = llm @@ -34,7 +32,6 @@ def __init__( self.mem_reader = mem_reader self.searcher = searcher self.stopword_manager = StopwordManager - self.pref_mem = pref_mem self.reranker = reranker self.DB_IDX_READY = False self.pref_feedback = pref_feedback diff --git a/src/memos/mem_reader/multi_modal_struct.py b/src/memos/mem_reader/multi_modal_struct.py index e3d2bece9..62e8f2d75 100644 --- a/src/memos/mem_reader/multi_modal_struct.py +++ b/src/memos/mem_reader/multi_modal_struct.py @@ -10,6 +10,7 @@ from memos.context.context import ContextThreadPoolExecutor from memos.mem_reader.read_multi_modal import MultiModalParser, detect_lang from memos.mem_reader.read_multi_modal.base import _derive_key +from memos.mem_reader.read_pref_memory.process_preference_memory import process_preference_fine from memos.mem_reader.read_skill_memory.process_skill_memory import process_skill_memory_fine from memos.mem_reader.simple_struct import PROMPT_DICT, SimpleStructMemReader from memos.mem_reader.utils import parse_json_result @@ -993,7 +994,7 @@ def _process_multi_modal_data( # Part A: call llm in parallel using thread pool fine_memory_items = [] - with ContextThreadPoolExecutor(max_workers=3) as executor: + with ContextThreadPoolExecutor(max_workers=4) as executor: future_string = executor.submit( self._process_string_fine, fast_memory_items, info, custom_tags, **kwargs ) @@ -1012,15 +1013,25 @@ def _process_multi_modal_data( skills_dir_config=self.skills_dir_config, **kwargs, ) + future_pref = executor.submit( + process_preference_fine, + fast_memory_items, + info, + self.llm, + self.embedder, + **kwargs, + ) # Collect results fine_memory_items_string_parser = future_string.result() fine_memory_items_tool_trajectory_parser = future_tool.result() fine_memory_items_skill_memory_parser = future_skill.result() + fine_memory_items_pref_parser = future_pref.result() fine_memory_items.extend(fine_memory_items_string_parser) fine_memory_items.extend(fine_memory_items_tool_trajectory_parser) fine_memory_items.extend(fine_memory_items_skill_memory_parser) + fine_memory_items.extend(fine_memory_items_pref_parser) # Part B: get fine multimodal items for fast_item in fast_memory_items: @@ -1060,7 +1071,7 @@ def _process_transfer_multi_modal_data( fine_memory_items = [] # Part A: call llm in parallel using thread pool - with ContextThreadPoolExecutor(max_workers=2) as executor: + with ContextThreadPoolExecutor(max_workers=4) as executor: future_string = executor.submit( self._process_string_fine, raw_nodes, info, custom_tags, **kwargs ) @@ -1079,14 +1090,21 @@ def _process_transfer_multi_modal_data( skills_dir_config=self.skills_dir_config, **kwargs, ) + # Add preference memory extraction + future_pref = executor.submit( + process_preference_fine, raw_nodes, info, self.llm, self.embedder, **kwargs + ) # Collect results fine_memory_items_string_parser = future_string.result() fine_memory_items_tool_trajectory_parser = future_tool.result() fine_memory_items_skill_memory_parser = future_skill.result() + fine_memory_items_pref_parser = future_pref.result() + fine_memory_items.extend(fine_memory_items_string_parser) fine_memory_items.extend(fine_memory_items_tool_trajectory_parser) fine_memory_items.extend(fine_memory_items_skill_memory_parser) + fine_memory_items.extend(fine_memory_items_pref_parser) # Part B: get fine multimodal items for raw_node in raw_nodes: diff --git a/src/memos/mem_reader/read_pref_memory/process_preference_memory.py b/src/memos/mem_reader/read_pref_memory/process_preference_memory.py new file mode 100644 index 000000000..1ff1fba52 --- /dev/null +++ b/src/memos/mem_reader/read_pref_memory/process_preference_memory.py @@ -0,0 +1,296 @@ +"""Preference memory extractor.""" + +import json +import os +import uuid + +from concurrent.futures import as_completed +from typing import TYPE_CHECKING, Any + +from memos.context.context import ContextThreadPoolExecutor +from memos.log import get_logger +from memos.mem_reader.read_multi_modal import detect_lang +from memos.memories.textual.item import TextualMemoryItem, TreeNodeTextualMemoryMetadata +from memos.templates.prefer_complete_prompt import ( + NAIVE_EXPLICIT_PREFERENCE_EXTRACT_PROMPT, + NAIVE_EXPLICIT_PREFERENCE_EXTRACT_PROMPT_ZH, + NAIVE_IMPLICIT_PREFERENCE_EXTRACT_PROMPT, + NAIVE_IMPLICIT_PREFERENCE_EXTRACT_PROMPT_ZH, +) + + +if TYPE_CHECKING: + from memos.types.general_types import UserContext + + +logger = get_logger(__name__) + + +def _extract_explicit_preference(qa_pair_str: str, llm) -> list[dict[str, Any]] | None: + """Extract explicit preference from a QA pair string.""" + lang = detect_lang(qa_pair_str) + _map = { + "zh": NAIVE_EXPLICIT_PREFERENCE_EXTRACT_PROMPT_ZH, + "en": NAIVE_EXPLICIT_PREFERENCE_EXTRACT_PROMPT, + } + prompt = _map[lang].replace("{qa_pair}", qa_pair_str) + + try: + response = llm.generate([{"role": "user", "content": prompt}]) + if not response: + logger.info( + f"[prefer_extractor]: (Error) LLM response content is {response} when extracting explicit preference" + ) + return None + response = response.strip().replace("```json", "").replace("```", "").strip() + result = json.loads(response) + for d in result: + d["preference"] = d.pop("explicit_preference") + return result + except Exception as e: + logger.info(f"Error extracting explicit preference: {e}, return None") + return None + + +def _extract_implicit_preference(qa_pair_str: str, llm) -> list[dict[str, Any]] | None: + """Extract implicit preferences from a QA pair string.""" + if not qa_pair_str: + return None + + lang = detect_lang(qa_pair_str) + _map = { + "zh": NAIVE_IMPLICIT_PREFERENCE_EXTRACT_PROMPT_ZH, + "en": NAIVE_IMPLICIT_PREFERENCE_EXTRACT_PROMPT, + } + prompt = _map[lang].replace("{qa_pair}", qa_pair_str) + + try: + response = llm.generate([{"role": "user", "content": prompt}]) + if not response: + logger.info( + f"[prefer_extractor]: (Error) LLM response content is {response} when extracting implicit preference" + ) + return None + response = response.strip().replace("```json", "").replace("```", "").strip() + result = json.loads(response) + for d in result: + d["preference"] = d.pop("implicit_preference") + return result + except Exception as e: + logger.info(f"Error extracting implicit preferences: {e}, return None") + return None + + +def _create_preference_memory_item( + preference_data: dict[str, Any], + preference_type: str, + fast_item: TextualMemoryItem | None, + info: dict[str, Any], + embedder, + **kwargs, +) -> TextualMemoryItem: + """ + Create a preference memory item with proper metadata. + + Args: + preference_data: Dictionary containing preference, context_summary, reasoning, topic + preference_type: "explicit_preference" or "implicit_preference" + fast_item: Original fast memory item (for extracting sources and other metadata) + info: Dictionary containing user_id, session_id, etc. + embedder: Embedder instance + kwargs: Additional parameters including user_context + + Returns: + TextualMemoryItem with TreeNodeTextualMemoryMetadata + """ + # Make a copy of info to avoid modifying the original + info_ = info.copy() + + # Extract fields that should be at metadata level + user_id = info_.pop("user_id", "") + session_id = info_.pop("session_id", "") + + # Extract manager_user_id, project_id, and operation from user_context + user_context: UserContext | None = kwargs.get("user_context") + manager_user_id = user_context.manager_user_id if user_context else None + project_id = user_context.project_id if user_context else None + + # Generate embedding for context_summary + context_summary = preference_data.get("context_summary", "") + embedding = embedder.embed([context_summary])[0] if embedder and context_summary else None + + # Extract sources from fast_item + sources = getattr(fast_item.metadata, "sources", []) if fast_item else [] + + # Create metadata + metadata = TreeNodeTextualMemoryMetadata( + memory_type="PreferenceMemory", + embedding=embedding, + user_id=user_id, + session_id=session_id, + status="activated", + tags=[], + type="chat", + info=info_, + sources=sources, + usage=[], + background="", + # Preference-specific fields + preference_type=preference_type, + preference=preference_data.get("preference", ""), + reasoning=preference_data.get("reasoning", ""), + topic=preference_data.get("topic", ""), + # User-specific fields + manager_user_id=manager_user_id, + project_id=project_id, + ) + + # Create and return memory item + return TextualMemoryItem(id=str(uuid.uuid4()), memory=context_summary, metadata=metadata) + + +def _process_single_chunk_explicit( + original_text: str, + fast_item: TextualMemoryItem | None, + info: dict[str, Any], + llm, + embedder, + **kwargs, +) -> list[TextualMemoryItem]: + """Process a single chunk for explicit preferences.""" + if not original_text.strip(): + return [] + + explicit_pref = _extract_explicit_preference(original_text, llm) + if not explicit_pref: + return [] + + memories = [] + for pref in explicit_pref: + memory = _create_preference_memory_item( + preference_data=pref, + preference_type="explicit_preference", + fast_item=fast_item, + info=info, + embedder=embedder, + **kwargs, + ) + memories.append(memory) + + return memories + + +def _process_single_chunk_implicit( + original_text: str, + fast_item: TextualMemoryItem | None, + info: dict[str, Any], + llm, + embedder, + **kwargs, +) -> list[TextualMemoryItem]: + """Process a single chunk for implicit preferences.""" + if not original_text.strip(): + return [] + + implicit_pref = _extract_implicit_preference(original_text, llm) + if not implicit_pref: + return [] + + memories = [] + for pref in implicit_pref: + memory = _create_preference_memory_item( + preference_data=pref, + preference_type="implicit_preference", + fast_item=fast_item, + info=info, + embedder=embedder, + **kwargs, + ) + memories.append(memory) + + return memories + + +def process_preference_fine( + fast_memory_items: list[TextualMemoryItem], + info: dict[str, Any], + llm=None, + embedder=None, + **kwargs, +) -> list[TextualMemoryItem]: + """ + Extract preference memories from fast_memory_items (for fine mode processing). + + Args: + fast_memory_items: List of TextualMemoryItem from fast parsing + info: Dictionary containing user_id and session_id + llm: LLM instance + embedder: Embedder instance + kwargs: Additional parameters (including user_context) + + Returns: + List of preference memory items + """ + + if os.getenv("ENABLE_PREFERENCE_MEMORY", "false").lower() != "true": + return [] + + if not fast_memory_items or not llm: + return [] + + try: + # Convert fast_memory_items to messages format + chunks = [] + for fast_item in fast_memory_items: + mem_str = fast_item.memory or "" + if not mem_str.strip(): + continue + chunks.append((mem_str, fast_item)) + + if not chunks: + return [] + + # Process chunks in parallel + memories = [] + with ContextThreadPoolExecutor(max_workers=min(10, len(chunks))) as executor: + futures = {} + + # Submit explicit extraction tasks + for chunk, fast_item in chunks: + future = executor.submit( + _process_single_chunk_explicit, chunk, fast_item, info, llm, embedder, **kwargs + ) + futures[future] = ("explicit_preference", chunk) + + # Submit implicit extraction tasks + for chunk, fast_item in chunks: + future = executor.submit( + _process_single_chunk_implicit, chunk, fast_item, info, llm, embedder, **kwargs + ) + futures[future] = ("implicit_preference", chunk) + + # Collect results + for future in as_completed(futures): + try: + memory = future.result() + if memory: + if isinstance(memory, list): + memories.extend(memory) + else: + memories.append(memory) + except Exception as e: + task_type, chunk = futures[future] + logger.warning( + f"[process_preference_fine] Error processing {task_type} chunk, original text: {chunk}: {e}" + ) + continue + + if memories: + logger.info(f"[process_preference_fine] Extracted {len(memories)} preference memories") + + return memories + except Exception as e: + logger.warning( + f"[process_preference_fine] Failed to extract preferences: {e}", exc_info=True + ) + return [] diff --git a/src/memos/mem_scheduler/general_modules/init_components_for_scheduler.py b/src/memos/mem_scheduler/general_modules/init_components_for_scheduler.py index b103acf3a..8777b9f2e 100644 --- a/src/memos/mem_scheduler/general_modules/init_components_for_scheduler.py +++ b/src/memos/mem_scheduler/general_modules/init_components_for_scheduler.py @@ -18,17 +18,6 @@ from memos.mem_cube.navie import NaiveMemCube from memos.mem_feedback.simple_feedback import SimpleMemFeedback from memos.mem_reader.factory import MemReaderFactory -from memos.memories.textual.prefer_text_memory.config import ( - AdderConfigFactory, - ExtractorConfigFactory, - RetrieverConfigFactory, -) -from memos.memories.textual.prefer_text_memory.factory import ( - AdderFactory, - ExtractorFactory, - RetrieverFactory, -) -from memos.memories.textual.simple_preference import SimplePreferenceTextMemory from memos.memories.textual.simple_tree import SimpleTreeTextMemory from memos.memories.textual.tree_text_memory.organize.manager import MemoryManager from memos.memories.textual.tree_text_memory.retrieve.internet_retriever_factory import ( @@ -40,7 +29,6 @@ if TYPE_CHECKING: from memos.memories.textual.tree_text_memory.retrieve.searcher import Searcher from memos.reranker.factory import RerankerFactory -from memos.vec_dbs.factory import VecDBFactory logger = get_logger(__name__) @@ -182,36 +170,6 @@ def build_internet_retriever_config() -> dict[str, Any]: return InternetRetrieverConfigFactory.model_validate(APIConfig.get_internet_config()) -def build_pref_extractor_config() -> dict[str, Any]: - """ - Build preference memory extractor configuration. - - Returns: - Validated extractor configuration dictionary - """ - return ExtractorConfigFactory.model_validate({"backend": "naive", "config": {}}) - - -def build_pref_adder_config() -> dict[str, Any]: - """ - Build preference memory adder configuration. - - Returns: - Validated adder configuration dictionary - """ - return AdderConfigFactory.model_validate({"backend": "naive", "config": {}}) - - -def build_pref_retriever_config() -> dict[str, Any]: - """ - Build preference memory retriever configuration. - - Returns: - Validated retriever configuration dictionary - """ - return RetrieverConfigFactory.model_validate({"backend": "naive", "config": {}}) - - def _get_default_memory_size(cube_config: Any) -> dict[str, int]: """ Get default memory size configuration. @@ -291,20 +249,11 @@ def init_components() -> dict[str, Any]: reranker_config = build_reranker_config() feedback_reranker_config = build_feedback_reranker_config() internet_retriever_config = build_internet_retriever_config() - vector_db_config = build_vec_db_config() - pref_extractor_config = build_pref_extractor_config() - pref_adder_config = build_pref_adder_config() - pref_retriever_config = build_pref_retriever_config() logger.debug("Component configurations built successfully") # Create component instances graph_db = GraphStoreFactory.from_config(graph_db_config) - vector_db = ( - VecDBFactory.from_config(vector_db_config) - if os.getenv("ENABLE_PREFERENCE_MEMORY", "false") == "true" - else None - ) llm = LLMFactory.from_config(llm_config) embedder = EmbedderFactory.from_config(embedder_config) # Pass graph_db to mem_reader for recall operations (deduplication, conflict detection) @@ -345,63 +294,9 @@ def init_components() -> dict[str, Any]: logger.debug("Text memory initialized") - # Initialize preference memory components - pref_extractor = ( - ExtractorFactory.from_config( - config_factory=pref_extractor_config, - llm_provider=llm, - embedder=embedder, - vector_db=vector_db, - ) - if os.getenv("ENABLE_PREFERENCE_MEMORY", "false") == "true" - else None - ) - - pref_adder = ( - AdderFactory.from_config( - config_factory=pref_adder_config, - llm_provider=llm, - embedder=embedder, - vector_db=vector_db, - text_mem=text_mem, - ) - if os.getenv("ENABLE_PREFERENCE_MEMORY", "false") == "true" - else None - ) - - pref_retriever = ( - RetrieverFactory.from_config( - config_factory=pref_retriever_config, - llm_provider=llm, - embedder=embedder, - reranker=feedback_reranker, - vector_db=vector_db, - ) - if os.getenv("ENABLE_PREFERENCE_MEMORY", "false") == "true" - else None - ) - - logger.debug("Preference memory components initialized") - - # Initialize preference memory - pref_mem = ( - SimplePreferenceTextMemory( - extractor_llm=llm, - vector_db=vector_db, - embedder=embedder, - reranker=feedback_reranker, - extractor=pref_extractor, - adder=pref_adder, - retriever=pref_retriever, - ) - if os.getenv("ENABLE_PREFERENCE_MEMORY", "false") == "true" - else None - ) - # Create MemCube with pre-initialized memory instances naive_mem_cube = NaiveMemCube( text_mem=text_mem, - pref_mem=pref_mem, act_mem=None, para_mem=None, ) @@ -421,7 +316,7 @@ def init_components() -> dict[str, Any]: mem_reader=mem_reader, searcher=searcher, reranker=feedback_reranker, - pref_mem=pref_mem, + pref_feedback=True, ) # Return all components as a dictionary for easy access and extension return {"naive_mem_cube": naive_mem_cube, "feedback_server": feedback_server} diff --git a/src/memos/memories/textual/item.py b/src/memos/memories/textual/item.py index 7e40f1d50..60af67830 100644 --- a/src/memos/memories/textual/item.py +++ b/src/memos/memories/textual/item.py @@ -171,6 +171,7 @@ class TreeNodeTextualMemoryMetadata(TextualMemoryMetadata): "ToolTrajectoryMemory", "RawFileMemory", "SkillMemory", + "PreferenceMemory", ] = Field(default="WorkingMemory", description="Memory lifecycle type.") sources: list[SourceMessage] | None = Field( default=None, description="Multiple origins of the memory (e.g., URLs, notes)." @@ -337,8 +338,6 @@ def _coerce_metadata(cls, v: Any): if v.get("relativity") is not None: return SearchedTreeNodeTextualMemoryMetadata(**v) - if v.get("preference_type") is not None: - return PreferenceTextualMemoryMetadata(**v) if any(k in v for k in ("sources", "memory_type", "embedding", "background", "usage")): return TreeNodeTextualMemoryMetadata(**v) return TextualMemoryMetadata(**v) diff --git a/src/memos/memories/textual/preference.py b/src/memos/memories/textual/preference.py index dba321f55..0cc6d1930 100644 --- a/src/memos/memories/textual/preference.py +++ b/src/memos/memories/textual/preference.py @@ -74,6 +74,7 @@ def get_memory( messages (list[MessageList]): The messages to get memory from. type (str): The type of memory to get. info (dict[str, Any]): The info to get memory. + **kwargs: Additional keyword arguments to pass to the extractor. """ return self.extractor.extract(messages, type, info, **kwargs) @@ -91,7 +92,6 @@ def search( if not isinstance(search_filter, dict): search_filter = {} search_filter.update({"status": "activated"}) - logger.info(f"search_filter for preference memory: {search_filter}") return self.retriever.retrieve(query, top_k, info, search_filter) def load(self, dir: str) -> None: diff --git a/src/memos/memories/textual/simple_preference.py b/src/memos/memories/textual/simple_preference.py index db7101744..51523d364 100644 --- a/src/memos/memories/textual/simple_preference.py +++ b/src/memos/memories/textual/simple_preference.py @@ -1,5 +1,3 @@ -from typing import Any - from memos.embedders.factory import ( ArkEmbedder, OllamaEmbedder, @@ -8,9 +6,7 @@ ) from memos.llms.factory import AzureLLM, OllamaLLM, OpenAILLM from memos.log import get_logger -from memos.memories.textual.item import PreferenceTextualMemoryMetadata, TextualMemoryItem from memos.memories.textual.preference import PreferenceTextMemory -from memos.types import MessageList from memos.vec_dbs.factory import MilvusVecDB, QdrantVecDB @@ -38,125 +34,3 @@ def __init__( self.extractor = extractor self.adder = adder self.retriever = retriever - - def get_memory( - self, messages: list[MessageList], type: str, info: dict[str, Any], **kwargs - ) -> list[TextualMemoryItem]: - """Get memory based on the messages. - Args: - messages (MessageList): The messages to get memory from. - type (str): The type of memory to get. - info (dict[str, Any]): The info to get memory. - **kwargs: Additional keyword arguments to pass to the extractor. - """ - return self.extractor.extract(messages, type, info, **kwargs) - - def search( - self, query: str, top_k: int, info=None, search_filter=None, **kwargs - ) -> list[TextualMemoryItem]: - """Search for memories based on a query. - Args: - query (str): The query to search for. - top_k (int): The number of top results to return. - info (dict): Leave a record of memory consumption. - Returns: - list[TextualMemoryItem]: List of matching memories. - """ - if not isinstance(search_filter, dict): - search_filter = {} - search_filter.update({"status": "activated"}) - return self.retriever.retrieve(query, top_k, info, search_filter) - - def add(self, memories: list[TextualMemoryItem | dict[str, Any]]) -> list[str]: - """Add memories. - - Args: - memories: List of TextualMemoryItem objects or dictionaries to add. - """ - return self.adder.add(memories) - - def get_with_collection_name( - self, collection_name: str, memory_id: str - ) -> TextualMemoryItem | None: - """Get a memory by its ID and collection name. - Args: - memory_id (str): The ID of the memory to retrieve. - collection_name (str): The name of the collection to retrieve the memory from. - Returns: - TextualMemoryItem: The memory with the given ID and collection name. - """ - try: - res = self.vector_db.get_by_id(collection_name, memory_id) - if res is None: - return None - return TextualMemoryItem( - id=res.id, - memory=res.memory, - metadata=PreferenceTextualMemoryMetadata(**res.payload), - ) - except Exception as e: - # Convert any other exception to ValueError for consistent error handling - raise ValueError( - f"Memory with ID {memory_id} not found in collection {collection_name}: {e}" - ) from e - - def get_by_ids_with_collection_name( - self, collection_name: str, memory_ids: list[str] - ) -> list[TextualMemoryItem]: - """Get memories by their IDs and collection name. - Args: - collection_name (str): The name of the collection to retrieve the memory from. - memory_ids (list[str]): List of memory IDs to retrieve. - Returns: - list[TextualMemoryItem]: List of memories with the specified IDs and collection name. - """ - try: - res = self.vector_db.get_by_ids(collection_name, memory_ids) - if not res: - return [] - return [ - TextualMemoryItem( - id=memo.id, - memory=memo.memory, - metadata=PreferenceTextualMemoryMetadata(**memo.payload), - ) - for memo in res - ] - except Exception as e: - # Convert any other exception to ValueError for consistent error handling - raise ValueError( - f"Memory with IDs {memory_ids} not found in collection {collection_name}: {e}" - ) from e - - def get_all(self) -> list[TextualMemoryItem]: - """Get all memories. - Returns: - list[TextualMemoryItem]: List of all memories. - """ - all_collections = ["explicit_preference", "implicit_preference"] - all_memories = {} - for collection_name in all_collections: - items = self.vector_db.get_all(collection_name) - all_memories[collection_name] = [ - TextualMemoryItem( - id=memo.id, - memory=memo.memory, - metadata=PreferenceTextualMemoryMetadata(**memo.payload), - ) - for memo in items - ] - return all_memories - - def delete_with_collection_name(self, collection_name: str, memory_ids: list[str]) -> None: - """Delete memories by their IDs and collection name. - Args: - collection_name (str): The name of the collection to delete the memory from. - memory_ids (list[str]): List of memory IDs to delete. - """ - self.vector_db.delete(collection_name, memory_ids) - - def delete_all(self) -> None: - """Delete all memories.""" - for collection_name in self.vector_db.config.collection_name: - self.vector_db.delete_collection(collection_name) - self.vector_db.create_collection() diff --git a/src/memos/memories/textual/tree.py b/src/memos/memories/textual/tree.py index 5b210ba61..8c896f538 100644 --- a/src/memos/memories/textual/tree.py +++ b/src/memos/memories/textual/tree.py @@ -169,6 +169,8 @@ def search( tool_mem_top_k: int = 6, include_skill_memory: bool = False, skill_mem_top_k: int = 3, + include_preference_memory: bool = False, + pref_mem_top_k: int = 6, dedup: str | None = None, include_embedding: bool | None = None, **kwargs, @@ -222,6 +224,8 @@ def search( tool_mem_top_k=tool_mem_top_k, include_skill_memory=include_skill_memory, skill_mem_top_k=skill_mem_top_k, + include_preference_memory=include_preference_memory, + pref_mem_top_k=pref_mem_top_k, dedup=dedup, **kwargs, ) diff --git a/src/memos/memories/textual/tree_text_memory/organize/manager.py b/src/memos/memories/textual/tree_text_memory/organize/manager.py index 4ca30c7b8..df419f0c1 100644 --- a/src/memos/memories/textual/tree_text_memory/organize/manager.py +++ b/src/memos/memories/textual/tree_text_memory/organize/manager.py @@ -185,6 +185,7 @@ def _add_memories_batch( "ToolTrajectoryMemory", "RawFileMemory", "SkillMemory", + "PreferenceMemory", ): graph_node_id = ( memory.id if hasattr(memory, "id") else memory.id or str(uuid.uuid4()) @@ -341,6 +342,7 @@ def _process_memory(self, memory: TextualMemoryItem, user_name: str | None = Non "ToolTrajectoryMemory", "RawFileMemory", "SkillMemory", + "PreferenceMemory", ): f_graph = ex.submit( self._add_to_graph_memory, diff --git a/src/memos/memories/textual/tree_text_memory/retrieve/recall.py b/src/memos/memories/textual/tree_text_memory/retrieve/recall.py index e5e96dd58..dd90b8932 100644 --- a/src/memos/memories/textual/tree_text_memory/retrieve/recall.py +++ b/src/memos/memories/textual/tree_text_memory/retrieve/recall.py @@ -69,6 +69,7 @@ def retrieve( "ToolTrajectoryMemory", "RawFileMemory", "SkillMemory", + "PreferenceMemory", ]: raise ValueError(f"Unsupported memory scope: {memory_scope}") diff --git a/src/memos/memories/textual/tree_text_memory/retrieve/searcher.py b/src/memos/memories/textual/tree_text_memory/retrieve/searcher.py index cc269e8c4..b4994671f 100644 --- a/src/memos/memories/textual/tree_text_memory/retrieve/searcher.py +++ b/src/memos/memories/textual/tree_text_memory/retrieve/searcher.py @@ -87,6 +87,8 @@ def retrieve( tool_mem_top_k: int = 6, include_skill_memory: bool = False, skill_mem_top_k: int = 3, + include_preference_memory: bool = False, + pref_mem_top_k: int = 6, **kwargs, ) -> list[tuple[TextualMemoryItem, float]]: logger.info( @@ -116,6 +118,8 @@ def retrieve( tool_mem_top_k, include_skill_memory, skill_mem_top_k, + include_preference_memory, + pref_mem_top_k, ) return results @@ -129,6 +133,8 @@ def post_retrieve( tool_mem_top_k: int = 6, include_skill_memory: bool = False, skill_mem_top_k: int = 3, + include_preference_memory: bool = False, + pref_mem_top_k: int = 6, dedup: str | None = None, plugin=False, ): @@ -144,6 +150,8 @@ def post_retrieve( tool_mem_top_k, include_skill_memory, skill_mem_top_k, + include_preference_memory, + pref_mem_top_k, ) self._update_usage_history(final_results, info, user_name) return final_results @@ -163,6 +171,8 @@ def search( tool_mem_top_k: int = 6, include_skill_memory: bool = False, skill_mem_top_k: int = 3, + include_preference_memory: bool = False, + pref_mem_top_k: int = 6, dedup: str | None = None, **kwargs, ) -> list[TextualMemoryItem]: @@ -212,6 +222,8 @@ def search( tool_mem_top_k=tool_mem_top_k, include_skill_memory=include_skill_memory, skill_mem_top_k=skill_mem_top_k, + include_preference_memory=include_preference_memory, + pref_mem_top_k=pref_mem_top_k, **kwargs, ) @@ -229,6 +241,8 @@ def search( tool_mem_top_k=tool_mem_top_k, include_skill_memory=include_skill_memory, skill_mem_top_k=skill_mem_top_k, + include_preference_memory=include_preference_memory, + pref_mem_top_k=pref_mem_top_k, dedup=dedup, ) @@ -329,8 +343,10 @@ def _retrieve_paths( tool_mem_top_k: int = 6, include_skill_memory: bool = False, skill_mem_top_k: int = 3, + include_preference_memory: bool = False, + pref_mem_top_k: int = 6, ): - """Run A/B/C/D/E retrieval paths in parallel""" + """Run A/B/C/D/E/F retrieval paths in parallel""" tasks = [] id_filter = { "user_id": info.get("user_id", None), @@ -428,6 +444,22 @@ def _retrieve_paths( mode=mode, ) ) + if include_preference_memory: + tasks.append( + executor.submit( + self._retrieve_from_preference_memory, + query, + parsed_goal, + query_embedding, + pref_mem_top_k, + memory_type, + search_filter, + search_priority, + user_name, + id_filter, + mode=mode, + ) + ) results = [] for t in tasks: results.extend(t.result()) @@ -863,6 +895,57 @@ def _retrieve_from_skill_memory( search_filter=search_filter, ) + @timed + def _retrieve_from_preference_memory( + self, + query, + parsed_goal, + query_embedding, + top_k, + memory_type, + search_filter: dict | None = None, + search_priority: dict | None = None, + user_name: str | None = None, + id_filter: dict | None = None, + mode: str = "fast", + ): + """Retrieve and rerank from PreferenceMemory""" + if memory_type not in ["All", "PreferenceMemory"]: + logger.info(f"[PATH-F] '{query}' Skipped (memory_type does not match)") + return [] + + # chain of thinking + cot_embeddings = [] + if self.vec_cot: + queries = self._cot_query(query, mode=mode, context=parsed_goal.context) + if len(queries) > 1: + cot_embeddings = self.embedder.embed(queries) + cot_embeddings.extend(query_embedding) + else: + cot_embeddings = query_embedding + + items = self.graph_retriever.retrieve( + query=query, + parsed_goal=parsed_goal, + query_embedding=cot_embeddings, + top_k=top_k * 2, + memory_scope="PreferenceMemory", + search_filter=search_filter, + search_priority=search_priority, + user_name=user_name, + id_filter=id_filter, + use_fast_graph=self.use_fast_graph, + ) + + return self.reranker.rerank( + query=query, + query_embedding=query_embedding[0], + graph_results=items, + top_k=top_k, + parsed_goal=parsed_goal, + search_filter=search_filter, + ) + @timed def _retrieve_simple( self, @@ -933,6 +1016,8 @@ def _sort_and_trim( tool_mem_top_k=6, include_skill_memory=False, skill_mem_top_k=3, + include_preference_memory=False, + pref_mem_top_k=6, ): """Sort results by score and trim to top_k""" final_items = [] @@ -1000,6 +1085,28 @@ def _sort_and_trim( ) ) + if include_preference_memory: + pref_results = [ + (item, score) + for item, score in results + if item.metadata.memory_type == "PreferenceMemory" + ] + sorted_pref_results = sorted(pref_results, key=lambda pair: pair[1], reverse=True)[ + :pref_mem_top_k + ] + for item, score in sorted_pref_results: + if plugin and round(score, 2) == 0.00: + continue + meta_data = item.metadata.model_dump() + meta_data["relativity"] = score + final_items.append( + TextualMemoryItem( + id=item.id, + memory=item.memory, + metadata=SearchedTreeNodeTextualMemoryMetadata(**meta_data), + ) + ) + # separate textual results results = [ (item, score) diff --git a/src/memos/multi_mem_cube/single_cube.py b/src/memos/multi_mem_cube/single_cube.py index d890c77bf..6df410c19 100644 --- a/src/memos/multi_mem_cube/single_cube.py +++ b/src/memos/multi_mem_cube/single_cube.py @@ -1,7 +1,6 @@ from __future__ import annotations import json -import os import time import traceback @@ -11,10 +10,8 @@ from memos.api.handlers.formatters_handler import ( format_memory_item, - post_process_pref_mem, post_process_textual_mem, ) -from memos.context.context import ContextThreadPoolExecutor from memos.log import get_logger from memos.mem_reader.utils import parse_keep_filter_response from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem @@ -22,7 +19,6 @@ ADD_TASK_LABEL, MEM_FEEDBACK_TASK_LABEL, MEM_READ_TASK_LABEL, - PREF_ADD_TASK_LABEL, ) from memos.memories.textual.item import TextualMemoryItem from memos.multi_mem_cube.views import MemCubeView @@ -78,38 +74,23 @@ def add_memories(self, add_req: APIADDRequest) -> list[dict[str, Any]]: ) target_session_id = add_req.session_id or "default_session" - sync_mode = add_req.async_mode or self._get_sync_mode() - self.logger.info( f"[SingleCubeView] cube={self.cube_id} " f"Processing add with mode={sync_mode}, session={target_session_id}" ) - with ContextThreadPoolExecutor(max_workers=2) as executor: - text_future = executor.submit(self._process_text_mem, add_req, user_context, sync_mode) - pref_future = executor.submit(self._process_pref_mem, add_req, user_context, sync_mode) - - text_results = text_future.result() - pref_results = pref_future.result() - - self.logger.info( - f"[SingleCubeView] cube={self.cube_id} text_results={len(text_results)}, " - f"pref_results={len(pref_results)}" - ) - - for item in text_results: - item["cube_id"] = self.cube_id - for item in pref_results: - item["cube_id"] = self.cube_id - - all_memories = text_results + pref_results + all_memories = self._process_text_mem(add_req, user_context, sync_mode) - # TODO: search existing memories and compare + self.logger.info(f"[SingleCubeView] cube={self.cube_id} total_results={len(all_memories)}") return all_memories @timed def search_memories(self, search_req: APISearchRequest) -> dict[str, Any]: + """ + Unified memory search handling (text + preference memories). + Preference memories are now searched through the same _search_text flow. + """ # Create UserContext object user_context = UserContext( user_id=search_req.user_id, @@ -131,28 +112,16 @@ def search_memories(self, search_req: APISearchRequest) -> dict[str, Any]: # Determine search mode search_mode = self._get_search_mode(search_req.mode) - # Execute search in parallel for text and preference memories - with ContextThreadPoolExecutor(max_workers=2) as executor: - text_future = executor.submit(self._search_text, search_req, user_context, search_mode) - pref_future = executor.submit(self._search_pref, search_req, user_context) - - text_formatted_memories = text_future.result() - pref_formatted_memories = pref_future.result() + # Unified search through _search_text (includes all memory types) + all_formatted_memories = self._search_text(search_req, user_context, search_mode) - # Build result + # Build result with unified processing memories_result = post_process_textual_mem( memories_result, - text_formatted_memories, + all_formatted_memories, self.cube_id, ) - memories_result = post_process_pref_mem( - memories_result, - pref_formatted_memories, - self.cube_id, - search_req.include_preference, - ) - self.logger.info(f"Search memories result: {memories_result}") self.logger.info(f"Search {len(memories_result)} memories.") return memories_result @@ -407,71 +376,6 @@ def _dedup_by_content(memories: list) -> list: return formatted_memories - @timed - def _search_pref( - self, - search_req: APISearchRequest, - user_context: UserContext, - ) -> list[dict[str, Any]]: - """ - Search preference memories. - - Args: - search_req: Search request - user_context: User context - - Returns: - List of formatted preference memory items - TODO: ADD CUBE ID IN PREFERENCE MEMORY - """ - if os.getenv("ENABLE_PREFERENCE_MEMORY", "false").lower() != "true": - return [] - if not search_req.include_preference: - return [] - - logger.info(f"search_req.filter for preference memory: {search_req.filter}") - logger.info(f"type of pref_mem: {type(self.naive_mem_cube.pref_mem)}") - try: - results = self.naive_mem_cube.pref_mem.search( - query=search_req.query, - top_k=search_req.pref_top_k, - info={ - "user_id": search_req.user_id, - "mem_cube_id": user_context.mem_cube_id, - "session_id": search_req.session_id, - "chat_history": search_req.chat_history, - }, - search_filter=search_req.filter, - ) - include_embedding = os.getenv("INCLUDE_EMBEDDING", "false") == "true" - formatted_results = self._postformat_memories( - results, user_context.mem_cube_id, include_embedding=include_embedding - ) - - # For each returned item, tackle with metadata.info project_id / - # operation / manager_user_id - for item in formatted_results: - if not isinstance(item, dict): - continue - metadata = item.get("metadata") - if not isinstance(metadata, dict): - continue - info = metadata.get("info") - if not isinstance(info, dict): - continue - - for key in ("project_id", "operation", "manager_user_id"): - if key not in info: - continue - value = info.pop(key) - if key not in metadata: - metadata[key] = value - - return formatted_results - except Exception as e: - self.logger.error("Error in _search_pref: %s; traceback: %s", e, traceback.format_exc()) - return [] - def _fast_search( self, search_req: APISearchRequest, @@ -645,89 +549,6 @@ def _schedule_memory_tasks( ) self.mem_scheduler.submit_messages(messages=[message_item_add]) - @timed - def _process_pref_mem( - self, - add_req: APIADDRequest, - user_context: UserContext, - sync_mode: str, - ) -> list[dict[str, Any]]: - """ - Process and add preference memories. - - Extracts preferences from messages and adds them to the preference memory system. - Handles both sync and async modes. - - Args: - add_req: Add memory request - user_context: User context with IDs - - Returns: - List of formatted preference responses - """ - if os.getenv("ENABLE_PREFERENCE_MEMORY", "false").lower() != "true": - return [] - - if add_req.messages is None or isinstance(add_req.messages, str): - return [] - - for message in add_req.messages: - if isinstance(message, dict) and message.get("role", None) is None: - return [] - - target_session_id = add_req.session_id or "default_session" - - if sync_mode == "async": - try: - messages_list = [add_req.messages] - message_item_pref = ScheduleMessageItem( - user_id=add_req.user_id, - session_id=target_session_id, - mem_cube_id=user_context.mem_cube_id, - mem_cube=self.naive_mem_cube, - label=PREF_ADD_TASK_LABEL, - content=json.dumps(messages_list), - timestamp=datetime.utcnow(), - info=add_req.info, - user_name=self.cube_id, - task_id=add_req.task_id, - user_context=user_context, - ) - self.mem_scheduler.submit_messages(messages=[message_item_pref]) - self.logger.info(f"[SingleCubeView] cube={self.cube_id} Submitted PREF_ADD async") - except Exception as e: - self.logger.error( - f"[SingleCubeView] cube={self.cube_id} Failed to submit PREF_ADD: {e}", - exc_info=True, - ) - return [] - else: - pref_memories_local = self.naive_mem_cube.pref_mem.get_memory( - [add_req.messages], - type="chat", - info={ - **(add_req.info or {}), - "user_id": add_req.user_id, - "session_id": target_session_id, - "mem_cube_id": user_context.mem_cube_id, - }, - user_context=user_context, - ) - pref_ids_local: list[str] = self.naive_mem_cube.pref_mem.add(pref_memories_local) - self.logger.info( - f"[SingleCubeView] cube={self.cube_id} " - f"added {len(pref_ids_local)} preferences for user {add_req.user_id}: {pref_ids_local}" - ) - - return [ - { - "memory": memory.metadata.preference, - "memory_id": memory_id, - "memory_type": memory.metadata.preference_type, - } - for memory_id, memory in zip(pref_ids_local, pref_memories_local, strict=False) - ] - def add_before_search( self, messages: list[dict], @@ -834,7 +655,7 @@ def _process_text_mem( sync_mode: str, ) -> list[dict[str, Any]]: """ - Process and add text memories. + Process and add text memories (including preference memories). Extracts memories from messages and adds them to the text memory system. Handles both sync and async modes. @@ -959,13 +780,15 @@ def _process_text_mem( "[SingleCubeView] merged_from provided but graph_db is unavailable; skip archiving." ) + # Format results uniformly text_memories = [ { "memory": memory.memory, "memory_id": memory_id, "memory_type": memory.metadata.memory_type, + "cube_id": self.cube_id, } - for memory_id, memory in zip(mem_ids_local, flattened_local, strict=False) + for memory_id, memory in zip(mem_ids_local, mem_group, strict=False) ] return text_memories diff --git a/src/memos/search/search_service.py b/src/memos/search/search_service.py index 6d57e3605..fa713a7d1 100644 --- a/src/memos/search/search_service.py +++ b/src/memos/search/search_service.py @@ -62,6 +62,8 @@ def search_text_memories( tool_mem_top_k=search_req.tool_mem_top_k, include_skill_memory=search_req.include_skill_memory, skill_mem_top_k=search_req.skill_mem_top_k, + include_preference_memory=search_req.include_preference, + pref_mem_top_k=search_req.pref_top_k, dedup=search_req.dedup, include_embedding=include_embedding, )