From 0fda336acad3a63747e807838e07caf4e37faf40 Mon Sep 17 00:00:00 2001 From: Hao <120852460@qq.com> Date: Thu, 30 Oct 2025 17:18:02 +0800 Subject: [PATCH 1/4] add breakpoint in eval scripts --- evaluation/scripts/PrefEval/pref_mem0.py | 41 +++++- evaluation/scripts/PrefEval/pref_memobase.py | 33 ++++- evaluation/scripts/PrefEval/pref_memos.py | 55 +++++--- evaluation/scripts/PrefEval/pref_memu.py | 44 +++++-- .../scripts/PrefEval/pref_supermemory.py | 47 +++++-- evaluation/scripts/PrefEval/pref_zep.py | 45 ++++--- evaluation/scripts/personamem/pm_ingestion.py | 120 +++++++++++------- evaluation/scripts/personamem/pm_search.py | 45 +++---- 8 files changed, 292 insertions(+), 138 deletions(-) diff --git a/evaluation/scripts/PrefEval/pref_mem0.py b/evaluation/scripts/PrefEval/pref_mem0.py index 4bbdb0fd8..214068567 100644 --- a/evaluation/scripts/PrefEval/pref_mem0.py +++ b/evaluation/scripts/PrefEval/pref_mem0.py @@ -29,7 +29,13 @@ def add_memory_for_line( - line_data: tuple, mem_client, num_irrelevant_turns: int, lib: str, version: str + line_data: tuple, + mem_client, + num_irrelevant_turns: int, + lib: str, + version: str, + success_records, + f, ) -> dict: """ Adds conversation memory for a single line of data to MemOS and returns the data with a persistent user_id. @@ -46,13 +52,22 @@ def add_memory_for_line( elif num_irrelevant_turns == 300: conversation = conversation + irre_300 - turns_add = 5 start_time_add = time.monotonic() - if conversation: - for chunk_start in range(0, len(conversation), turns_add * 2): - chunk = conversation[chunk_start : chunk_start + turns_add * 2] - timestamp_add = int(time.time() * 100) - mem_client.add(messages=chunk, user_id=user_id, timestamp=timestamp_add) + + for idx, _ in enumerate(conversation[::2]): + msg_idx = idx * 2 + record_id = f"{lib}_user_pref_eval_{i}_{version}_{str(msg_idx)}" + timestamp_add = int(time.time() * 100) + + if record_id not in success_records: + mem_client.add( + messages=conversation[msg_idx : msg_idx + 2], + user_id=user_id, + timestamp=timestamp_add, + ) + f.write(f"{record_id}\n") + f.flush() + end_time_add = time.monotonic() add_duration = end_time_add - start_time_add @@ -210,6 +225,15 @@ def main(): from utils.client import Mem0Client mem_client = Mem0Client(enable_graph="graph" in args.lib) + os.makedirs(f"results/prefeval/{args.lib}_{args.version}", exist_ok=True) + success_records = set() + record_file = f"results/prefeval/{args.lib}_{args.version}/success_records.txt" + if os.path.exists(record_file): + print(f"Loading existing success records from {record_file}...") + with open(record_file, encoding="utf-8") as f: + for i in f.readlines(): + success_records.add(i.strip()) + print(f"Loaded {len(success_records)} records.") if args.mode == "add": print(f"Running in 'add' mode. Ingesting memories from '{args.input}'...") @@ -218,6 +242,7 @@ def main(): with ( open(args.output, "w", encoding="utf-8") as outfile, concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + open(record_file, "a+", encoding="utf-8") as f, ): futures = [ executor.submit( @@ -227,6 +252,8 @@ def main(): args.add_turn, args.lib, args.version, + success_records, + f, ) for i, line in enumerate(lines) ] diff --git a/evaluation/scripts/PrefEval/pref_memobase.py b/evaluation/scripts/PrefEval/pref_memobase.py index 4f6174d3d..e99b10520 100644 --- a/evaluation/scripts/PrefEval/pref_memobase.py +++ b/evaluation/scripts/PrefEval/pref_memobase.py @@ -12,7 +12,6 @@ from openai import OpenAI from tqdm import tqdm - ROOT_DIR = os.path.dirname( os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) ) @@ -28,7 +27,13 @@ def add_memory_for_line( - line_data: tuple, mem_client, num_irrelevant_turns: int, lib: str, version: str + line_data: tuple, + mem_client, + num_irrelevant_turns: int, + lib: str, + version: str, + success_records, + f, ) -> dict: """ Adds conversation memory for a single line of data to MemOS and returns the data with a persistent user_id. @@ -36,8 +41,6 @@ def add_memory_for_line( i, line = line_data user_id = f"{lib}_user_pref_eval_{i}_{version}" mem_client.delete_user(user_id) - user_id = mem_client.client.add_user({"user_id": user_id}) - print("user_id:", user_id) try: original_data = json.loads(line) conversation = original_data.get("conversation", []) @@ -63,7 +66,14 @@ def add_memory_for_line( "created_at": timestamp_add, } ) - mem_client.add(messages=messages, user_id=user_id) + for idx, _ in enumerate(conversation[::2]): + msg_idx = idx * 2 + record_id = f"{lib}_user_pref_eval_{i}_{version}_{str(msg_idx)}" + + if record_id not in success_records: + mem_client.add(messages=conversation[msg_idx : msg_idx + 2], user_id=user_id) + f.write(f"{record_id}\n") + f.flush() end_time_add = time.monotonic() add_duration = end_time_add - start_time_add @@ -222,6 +232,16 @@ def main(): mem_client = MemobaseClient() + os.makedirs(f"results/prefeval/{args.lib}_{args.version}", exist_ok=True) + success_records = set() + record_file = f"results/prefeval/{args.lib}_{args.version}/success_records.txt" + if os.path.exists(record_file): + print(f"Loading existing success records from {record_file}...") + with open(record_file, encoding="utf-8") as f: + for i in f.readlines(): + success_records.add(i.strip()) + print(f"Loaded {len(success_records)} records.") + if args.mode == "add": print(f"Running in 'add' mode. Ingesting memories from '{args.input}'...") print(f"Adding {args.add_turn} irrelevant turns.") @@ -229,6 +249,7 @@ def main(): with ( open(args.output, "w", encoding="utf-8") as outfile, concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + open(record_file, "a+", encoding="utf-8") as f, ): futures = [ executor.submit( @@ -238,6 +259,8 @@ def main(): args.add_turn, args.lib, args.version, + success_records, + f, ) for i, line in enumerate(lines) ] diff --git a/evaluation/scripts/PrefEval/pref_memos.py b/evaluation/scripts/PrefEval/pref_memos.py index fc358dc36..0ee88e868 100644 --- a/evaluation/scripts/PrefEval/pref_memos.py +++ b/evaluation/scripts/PrefEval/pref_memos.py @@ -12,7 +12,6 @@ from openai import OpenAI from tqdm import tqdm - ROOT_DIR = os.path.dirname( os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) ) @@ -21,7 +20,6 @@ sys.path.insert(0, ROOT_DIR) sys.path.insert(0, EVAL_SCRIPTS_DIR) - load_dotenv() OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") BASE_URL = os.getenv("OPENAI_BASE_URL") @@ -30,8 +28,8 @@ def add_memory_for_line( - line_data: tuple, mem_client, num_irrelevant_turns: int, lib: str, version: str -) -> dict: + line_data, mem_client, num_irrelevant_turns, lib, version, success_records, f +): """ Adds conversation memory for a single line of data to MemOS and returns the data with a persistent user_id. """ @@ -47,15 +45,22 @@ def add_memory_for_line( elif num_irrelevant_turns == 300: conversation = conversation + irre_300 - turns_add = 5 start_time_add = time.monotonic() - if conversation: - if os.getenv("PRE_SPLIT_CHUNK", "false").lower() == "true": - for chunk_start in range(0, len(conversation), turns_add * 2): - chunk = conversation[chunk_start : chunk_start + turns_add * 2] - mem_client.add(messages=chunk, user_id=user_id, conv_id=None, batch_size=2) - else: - mem_client.add(messages=conversation, user_id=user_id, conv_id=None, batch_size=2) + + for idx, _ in enumerate(conversation[::2]): + msg_idx = idx * 2 + record_id = f"{lib}_user_pref_eval_{i}_{version}_{str(msg_idx)}" + + if record_id not in success_records: + mem_client.add( + messages=conversation[msg_idx : msg_idx + 2], + user_id=user_id, + conv_id=None, + batch_size=2, + ) + f.write(f"{record_id}\n") + f.flush() + end_time_add = time.monotonic() add_duration = end_time_add - start_time_add @@ -68,7 +73,7 @@ def add_memory_for_line( return None -def search_memory_for_line(line_data: tuple, mem_client, top_k_value: int) -> dict: +def search_memory_for_line(line_data, mem_client, top_k_value): """ Processes a single line of data, searching memory based on the question. """ @@ -120,7 +125,7 @@ def search_memory_for_line(line_data: tuple, mem_client, top_k_value: int) -> di return None -def generate_response_for_line(line_data: tuple, openai_client: OpenAI, lib: str) -> dict: +def generate_response_for_line(line_data, openai_client, lib): """ Generates a response for a single line of data using pre-fetched memories. """ @@ -195,7 +200,7 @@ def main(): parser.add_argument( "--lib", type=str, - choices=["memos-api", "memos-local"], + choices=["memos-api", "memos-api-online"], default="memos-api", help="Which MemOS library to use (used in 'add' mode).", ) @@ -218,9 +223,22 @@ def main(): print(f"Error: Input file '{args.input}' not found") return - from utils.client import MemosApiClient + from utils.client import MemosApiClient, MemosApiOnlineClient + + if args.lib == "memos-api": + mem_client = MemosApiClient() + elif args.lib == "memos-api-online": + mem_client = MemosApiOnlineClient() - mem_client = MemosApiClient() + os.makedirs(f"results/prefeval/{args.lib}_{args.version}", exist_ok=True) + success_records = set() + record_file = f"results/prefeval/{args.lib}_{args.version}/success_records.txt" + if os.path.exists(record_file): + print(f"Loading existing success records from {record_file}...") + with open(record_file, encoding="utf-8") as f: + for i in f.readlines(): + success_records.add(i.strip()) + print(f"Loaded {len(success_records)} records.") if args.mode == "add": print(f"Running in 'add' mode. Ingesting memories from '{args.input}'...") @@ -229,6 +247,7 @@ def main(): with ( open(args.output, "w", encoding="utf-8") as outfile, concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + open(record_file, "a+", encoding="utf-8") as record_f, ): futures = [ executor.submit( @@ -238,6 +257,8 @@ def main(): args.add_turn, args.lib, args.version, + success_records, + record_f, ) for i, line in enumerate(lines) ] diff --git a/evaluation/scripts/PrefEval/pref_memu.py b/evaluation/scripts/PrefEval/pref_memu.py index 2b9f769a4..4c37db7b7 100644 --- a/evaluation/scripts/PrefEval/pref_memu.py +++ b/evaluation/scripts/PrefEval/pref_memu.py @@ -14,7 +14,6 @@ from openai import OpenAI from tqdm import tqdm - ROOT_DIR = os.path.dirname( os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) ) @@ -30,7 +29,13 @@ def add_memory_for_line( - line_data: tuple, mem_client, num_irrelevant_turns: int, lib: str, version: str + line_data: tuple, + mem_client, + num_irrelevant_turns: int, + lib: str, + version: str, + success_records, + f, ) -> dict: """ Adds conversation memory for a single line of data to MemOS and returns the data with a persistent user_id. @@ -47,19 +52,21 @@ def add_memory_for_line( elif num_irrelevant_turns == 300: conversation = conversation + irre_300 - turns_add = 5 start_time_add = time.monotonic() - if conversation: - if os.getenv("PRE_SPLIT_CHUNK", "false").lower() == "true": - for chunk_start in range(0, len(conversation), turns_add * 2): - chunk = conversation[chunk_start : chunk_start + turns_add * 2] - mem_client.add( - messages=chunk, user_id=user_id, iso_date=datetime.now().isoformat() - ) - else: + + for idx, _ in enumerate(conversation[::2]): + msg_idx = idx * 2 + record_id = f"{lib}_user_pref_eval_{i}_{version}_{str(msg_idx)}" + + if record_id not in success_records: mem_client.add( - messages=conversation, user_id=user_id, iso_date=datetime.now().isoformat() + messages=conversation[msg_idx : msg_idx + 2], + user_id=user_id, + iso_date=datetime.now().isoformat(), ) + f.write(f"{record_id}\n") + f.flush() + end_time_add = time.monotonic() add_duration = end_time_add - start_time_add @@ -219,6 +226,16 @@ def main(): mem_client = MemuClient() + os.makedirs(f"results/prefeval/{args.lib}_{args.version}", exist_ok=True) + success_records = set() + record_file = f"results/prefeval/{args.lib}_{args.version}/success_records.txt" + if os.path.exists(record_file): + print(f"Loading existing success records from {record_file}...") + with open(record_file, encoding="utf-8") as f: + for i in f.readlines(): + success_records.add(i.strip()) + print(f"Loaded {len(success_records)} records.") + if args.mode == "add": print(f"Running in 'add' mode. Ingesting memories from '{args.input}'...") print(f"Adding {args.add_turn} irrelevant turns.") @@ -226,6 +243,7 @@ def main(): with ( open(args.output, "w", encoding="utf-8") as outfile, concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + open(record_file, "a+", encoding="utf-8") as f, ): futures = [ executor.submit( @@ -235,6 +253,8 @@ def main(): args.add_turn, args.lib, args.version, + success_records, + f, ) for i, line in enumerate(lines) ] diff --git a/evaluation/scripts/PrefEval/pref_supermemory.py b/evaluation/scripts/PrefEval/pref_supermemory.py index 88a64038b..68963e2af 100644 --- a/evaluation/scripts/PrefEval/pref_supermemory.py +++ b/evaluation/scripts/PrefEval/pref_supermemory.py @@ -12,7 +12,6 @@ from openai import OpenAI from tqdm import tqdm - ROOT_DIR = os.path.dirname( os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) ) @@ -28,7 +27,13 @@ def add_memory_for_line( - line_data: tuple, mem_client, num_irrelevant_turns: int, lib: str, version: str + line_data: tuple, + mem_client, + num_irrelevant_turns: int, + lib: str, + version: str, + success_records, + f, ) -> dict: """ Adds conversation memory for a single line of data to MemOS and returns the data with a persistent user_id. @@ -45,15 +50,20 @@ def add_memory_for_line( elif num_irrelevant_turns == 300: conversation = conversation + irre_300 - turns_add = 5 start_time_add = time.monotonic() - if conversation: - if os.getenv("PRE_SPLIT_CHUNK", "false").lower() == "true": - for chunk_start in range(0, len(conversation), turns_add * 2): - chunk = conversation[chunk_start : chunk_start + turns_add * 2] - mem_client.add(messages=chunk, user_id=user_id) - else: - mem_client.add(messages=conversation, user_id=user_id) + + for idx, _ in enumerate(conversation[::2]): + msg_idx = idx * 2 + record_id = f"{lib}_user_pref_eval_{i}_{version}_{str(msg_idx)}" + + if record_id not in success_records: + mem_client.add( + messages=conversation[msg_idx : msg_idx + 2], + user_id=user_id, + ) + f.write(f"{record_id}\n") + f.flush() + end_time_add = time.monotonic() add_duration = end_time_add - start_time_add @@ -90,9 +100,7 @@ def search_memory_for_line(line_data: tuple, mem_client, top_k_value: int) -> di start_time_search = time.monotonic() relevant_memories = mem_client.search(query=question, user_id=user_id, top_k=top_k_value) search_memories_duration = time.monotonic() - start_time_search - memories_str = "\n".join( - f"- {entry.get('memory', '')}" for entry in relevant_memories["text_mem"][0]["memories"] - ) + memories_str = relevant_memories memory_tokens_used = len(tokenizer.encode(memories_str)) @@ -250,6 +258,16 @@ def search(self, query, user_id, top_k): mem_client = SupermemoryClient() + os.makedirs(f"results/prefeval/{args.lib}_{args.version}", exist_ok=True) + success_records = set() + record_file = f"results/prefeval/{args.lib}_{args.version}/success_records.txt" + if os.path.exists(record_file): + print(f"Loading existing success records from {record_file}...") + with open(record_file, encoding="utf-8") as f: + for i in f.readlines(): + success_records.add(i.strip()) + print(f"Loaded {len(success_records)} records.") + if args.mode == "add": print(f"Running in 'add' mode. Ingesting memories from '{args.input}'...") print(f"Adding {args.add_turn} irrelevant turns.") @@ -257,6 +275,7 @@ def search(self, query, user_id, top_k): with ( open(args.output, "w", encoding="utf-8") as outfile, concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + open(record_file, "a+", encoding="utf-8") as f, ): futures = [ executor.submit( @@ -266,6 +285,8 @@ def search(self, query, user_id, top_k): args.add_turn, args.lib, args.version, + success_records, + f, ) for i, line in enumerate(lines) ] diff --git a/evaluation/scripts/PrefEval/pref_zep.py b/evaluation/scripts/PrefEval/pref_zep.py index 91aef1492..be98c6ba9 100644 --- a/evaluation/scripts/PrefEval/pref_zep.py +++ b/evaluation/scripts/PrefEval/pref_zep.py @@ -14,7 +14,6 @@ from openai import OpenAI from tqdm import tqdm - ROOT_DIR = os.path.dirname( os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) ) @@ -30,7 +29,13 @@ def add_memory_for_line( - line_data: tuple, mem_client, num_irrelevant_turns: int, lib: str, version: str + line_data: tuple, + mem_client, + num_irrelevant_turns: int, + lib: str, + version: str, + success_records, + f, ) -> dict: """ Adds conversation memory for a single line of data to MemOS and returns the data with a persistent user_id. @@ -47,25 +52,22 @@ def add_memory_for_line( elif num_irrelevant_turns == 300: conversation = conversation + irre_300 - turns_add = 5 start_time_add = time.monotonic() - if conversation: - if os.getenv("PRE_SPLIT_CHUNK", "false").lower() == "true": - for chunk_start in range(0, len(conversation), turns_add * 2): - chunk = conversation[chunk_start : chunk_start + turns_add * 2] - mem_client.add( - messages=chunk, - user_id=user_id, - conv_id=None, - timestamp=datetime.now().isoformat(), - ) - else: + + for idx, _ in enumerate(conversation[::2]): + msg_idx = idx * 2 + record_id = f"{lib}_user_pref_eval_{i}_{version}_{str(msg_idx)}" + + if record_id not in success_records: mem_client.add( - messages=conversation, + messages=conversation[msg_idx : msg_idx + 2], user_id=user_id, conv_id=None, timestamp=datetime.now().isoformat(), ) + f.write(f"{record_id}\n") + f.flush() + end_time_add = time.monotonic() add_duration = end_time_add - start_time_add @@ -225,6 +227,16 @@ def main(): mem_client = ZepClient() + os.makedirs(f"results/prefeval/{args.lib}_{args.version}", exist_ok=True) + success_records = set() + record_file = f"results/prefeval/{args.lib}_{args.version}/success_records.txt" + if os.path.exists(record_file): + print(f"Loading existing success records from {record_file}...") + with open(record_file, encoding="utf-8") as f: + for i in f.readlines(): + success_records.add(i.strip()) + print(f"Loaded {len(success_records)} records.") + if args.mode == "add": print(f"Running in 'add' mode. Ingesting memories from '{args.input}'...") print(f"Adding {args.add_turn} irrelevant turns.") @@ -232,6 +244,7 @@ def main(): with ( open(args.output, "w", encoding="utf-8") as outfile, concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + open(record_file, "a+", encoding="utf-8") as f, ): futures = [ executor.submit( @@ -241,6 +254,8 @@ def main(): args.add_turn, args.lib, args.version, + success_records, + f, ) for i, line in enumerate(lines) ] diff --git a/evaluation/scripts/personamem/pm_ingestion.py b/evaluation/scripts/personamem/pm_ingestion.py index cab0fbeb5..fdbf43528 100644 --- a/evaluation/scripts/personamem/pm_ingestion.py +++ b/evaluation/scripts/personamem/pm_ingestion.py @@ -10,7 +10,6 @@ from tqdm import tqdm - sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) @@ -25,28 +24,22 @@ def ingest_session(session, user_id, session_id, frame, client): f"[{frame}] ๐Ÿ“ Session [{session_id}: [{idx + 1}/{len(session)}] Ingesting message: {msg['role']} - {msg['content'][:50]}..." ) timestamp_add = int(time.time() * 100) - client.add(messages=messages, user_id=user_id, timestamp=timestamp_add) + client.add(messages=messages, user_id=user_id, timestamp=timestamp_add, batch_size=10) print(f"[{frame}] โœ… Session [{session_id}]: Ingested {len(messages)} messages") elif frame == "memos-api": - if os.getenv("PRE_SPLIT_CHUNK") == "true": - for i in range(0, len(session), 10): - messages = session[i : i + 10] - client.add(messages=messages, user_id=user_id, conv_id=session_id, batch_size=2) - print(f"[{frame}] โœ… Session [{session_id}]: Ingested {len(messages)} messages") - else: - client.add(messages=session, user_id=user_id, conv_id=session_id, batch_size=2) - print(f"[{frame}] โœ… Session [{session_id}]: Ingested {len(session)} messages") + client.add(messages=session, user_id=user_id, conv_id=session_id, batch_size=10) + print(f"[{frame}] โœ… Session [{session_id}]: Ingested {len(session)} messages") elif frame == "memobase": for _idx, msg in enumerate(session): if msg["role"] != "system": messages.append( { "role": msg["role"], - "content": msg["content"][:8000], + "content": msg["content"], "created_at": datetime.now().isoformat(), } ) - client.add(messages, user_id) + client.add(messages, user_id, batch_size=10) print(f"[{frame}] โœ… Session [{session_id}]: Ingested {len(messages)} messages") elif frame == "supermemory": for _idx, msg in enumerate(session): @@ -62,6 +55,8 @@ def ingest_session(session, user_id, session_id, frame, client): for _idx, msg in enumerate(session): messages.append({"role": msg["role"], "content": msg["content"]}) client.add(messages, user_id, datetime.now().astimezone().isoformat()) + elif frame == "memos-api-online": + client.add(messages, user_id, session_id, batch_size=10) def build_jsonl_index(jsonl_path): @@ -125,7 +120,11 @@ def count_csv_rows(csv_path): return sum(1 for _ in f) - 1 -def ingest_conv(row_data, context, version, conv_idx, frame): +def ingest_conv(row_data, context, version, conv_idx, frame, success_records, f): + if str(conv_idx) in success_records: + print(f"โœ… Conversation {conv_idx} already ingested, skipping...") + return conv_idx + end_index_in_shared_context = row_data["end_index_in_shared_context"] context = context[: int(end_index_in_shared_context)] user_id = f"pm_exper_user_{conv_idx}_{version}" @@ -150,8 +149,6 @@ def ingest_conv(row_data, context, version, conv_idx, frame): print("๐Ÿ”Œ Using Mem0 client for ingestion...") client.client.delete_all(user_id=user_id) print(f"๐Ÿ—‘๏ธ Deleted existing memories for user {user_id}...") - - print(f"๐Ÿ—‘๏ธ Deleted existing memories for user {user_id}...") elif frame == "memos-api": from utils.client import MemosApiClient @@ -160,8 +157,6 @@ def ingest_conv(row_data, context, version, conv_idx, frame): from utils.client import MemobaseClient client = MemobaseClient() - print("๐Ÿ”Œ Using Memobase client for ingestion...") - client.delte_user(user_id) elif frame == "supermemory": from utils.client import SupermemoryClient @@ -170,15 +165,33 @@ def ingest_conv(row_data, context, version, conv_idx, frame): from utils.client import MemuClient client = MemuClient() + elif frame == "memos-api-online": + from utils.client import MemosApiOnlineClient + + client = MemosApiOnlineClient() + + try: + ingest_session(session=context, user_id=user_id, session_id=conv_idx, frame=frame, client=client) + print(f"โœ… Ingestion of conversation {conv_idx} completed") + print("=" * 80) + + f.write(f"{conv_idx}\n") + f.flush() + return conv_idx + except Exception as e: + print(f"โŒ Error ingesting conversation {conv_idx}: {e}") + raise - ingest_session( - session=context, user_id=user_id, session_id=conv_idx, frame=frame, client=client - ) - print(f"โœ… Ingestion of conversation {conv_idx} completed") - print("=" * 80) +def main(frame, version, num_workers=2, clear=False): + os.makedirs(f"results/pm/{frame}-{version}/", exist_ok=True) + record_file = f"results/pm/{frame}-{version}/success_records.txt" + + if clear: + if os.path.exists(record_file): + os.remove(record_file) + print("๐Ÿงน Cleared progress records") -def main(frame, version, num_workers=2): print("\n" + "=" * 80) print(f"๐Ÿš€ PERSONAMEM INGESTION - {frame.upper()} v{version}".center(80)) print("=" * 80) @@ -190,31 +203,48 @@ def main(frame, version, num_workers=2): print(f"๐Ÿ“š Loaded PersonaMem dataset from {question_csv_path} and {context_jsonl_path}") print("-" * 80) - start_time = datetime.now() + success_records = set() + if os.path.exists(record_file): + with open(record_file, "r") as f: + success_records = set(line.strip() for line in f) + print(f"๐Ÿ“Š Found {len(success_records)} completed conversations, {total_rows - len(success_records)} remaining") + start_time = datetime.now() all_data = list(load_rows_with_context(question_csv_path, context_jsonl_path)) - with ThreadPoolExecutor(max_workers=num_workers) as executor: - future_to_idx = { - executor.submit( + pending_data = [(idx, row_data, context) for idx, (row_data, context) in enumerate(all_data) + if str(idx) not in success_records] + + if not pending_data: + print("โœ… All conversations have been processed!") + return + + print(f"๐Ÿ”„ Processing {len(pending_data)} conversations...") + + with ThreadPoolExecutor(max_workers=num_workers) as executor, open(record_file, "a") as f: + futures = [] + for idx, row_data, context in pending_data: + future = executor.submit( ingest_conv, row_data=row_data, context=context, version=version, conv_idx=idx, frame=frame, - ): idx - for idx, (row_data, context) in enumerate(all_data) - } + success_records=success_records, + f=f + ) + futures.append(future) + completed_count = 0 for future in tqdm( - as_completed(future_to_idx), total=len(future_to_idx), desc="Processing conversations" + as_completed(futures), total=len(futures), desc="Processing conversations" ): - idx = future_to_idx[future] try: - future.result() + result = future.result() + completed_count += 1 except Exception as exc: - print(f"\nโŒ Conversation {idx} generated an exception: {exc}") + print(f"\nโŒ Conversation generated an exception: {exc}") end_time = datetime.now() elapsed_time = end_time - start_time @@ -225,23 +255,19 @@ def main(frame, version, num_workers=2): print("=" * 80) print(f"โฑ๏ธ Total time taken to ingest {total_rows} rows: {elapsed_time_str}") print(f"๐Ÿ”„ Framework: {frame} | Version: {version} | Workers: {num_workers}") + print(f"๐Ÿ“ˆ Processed: {len(success_records) + completed_count}/{total_rows} conversations") print("=" * 80 + "\n") if __name__ == "__main__": parser = argparse.ArgumentParser(description="PersonaMem Ingestion Script") - parser.add_argument( - "--lib", - type=str, - choices=["mem0", "mem0_graph", "memos-api", "memobase", "memu", "supermemory", "zep"], - default="memos-api", - ) - parser.add_argument( - "--version", type=str, default="0925-1", help="Version of the evaluation framework." - ) - parser.add_argument( - "--workers", type=int, default=3, help="Number of parallel workers for processing users." - ) + parser.add_argument("--lib", type=str, + choices=["memos-api-online", "mem0", "mem0_graph", "memos-api", "memobase", "memu", + "supermemory", "zep"], + default='memos-api') + parser.add_argument("--version", type=str, default="default", help="Version of the evaluation framework.") + parser.add_argument("--workers", type=int, default=3, help="Number of parallel workers for processing users.") + parser.add_argument("--clear", action="store_true", help="Clear progress and start fresh") args = parser.parse_args() - main(frame=args.lib, version=args.version, num_workers=args.workers) + main(frame=args.lib, version=args.version, num_workers=args.workers, clear=args.clear) \ No newline at end of file diff --git a/evaluation/scripts/personamem/pm_search.py b/evaluation/scripts/personamem/pm_search.py index 441474c7c..574206fde 100644 --- a/evaluation/scripts/personamem/pm_search.py +++ b/evaluation/scripts/personamem/pm_search.py @@ -3,12 +3,10 @@ import json import os import sys - from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from time import time - from tqdm import tqdm @@ -83,8 +81,8 @@ def memos_search(client, user_id, query, top_k): start = time() results = client.search(query=query, user_id=user_id, top_k=top_k) search_memories = ( - "\n".join(item["memory"] for cube in results["text_mem"] for item in cube["memories"]) - + f"\n{results['pref_string']}" + "\n".join(item["memory"] for cube in results["text_mem"] for item in cube["memories"]) + + f"\n{results['pref_string']}" ) context = MEMOS_CONTEXT_TEMPLATE.format(user_id=user_id, memories=search_memories) @@ -226,6 +224,17 @@ def process_user(row_data, conv_idx, frame, version, top_k=20): client = MemuClient() print("๐Ÿ”Œ Using memu client for search...") context, duration_ms = memu_search(client, question, user_id, top_k) + elif frame == "memobase": + from utils.client import MemobaseClient + + client = MemobaseClient() + print("๐Ÿ”Œ Using Memobase client for search...") + context, duration_ms = memobase_search(client, question, user_id, top_k) + elif frame == "memos-api-online": + from utils.client import MemosApiOnlineClient + client = MemosApiOnlineClient() + print("๐Ÿ”Œ Using memos-api-online client for search...") + context, duration_ms = memos_search(client, question, user_id, top_k) search_results[user_id].append( { @@ -244,7 +253,7 @@ def process_user(row_data, conv_idx, frame, version, top_k=20): os.makedirs(f"results/pm/{frame}-{version}/tmp", exist_ok=True) with open( - f"results/pm/{frame}-{version}/tmp/{frame}_pm_search_results_{conv_idx}.json", "w" + f"results/pm/{frame}-{version}/tmp/{frame}_pm_search_results_{conv_idx}.json", "w" ) as f: json.dump(search_results, f, indent=4) print(f"๐Ÿ’พ Search results for conversation {conv_idx} saved...") @@ -269,7 +278,7 @@ def main(frame, version, top_k=20, num_workers=2): print(f"๐Ÿ” PERSONAMEM SEARCH - {frame.upper()} v{version}".center(80)) print("=" * 80) - question_csv_path = "data/personamem/questions_32k.csv" + question_csv_path = "data/personamem/questions_32k copy.csv" context_jsonl_path = "data/personamem/shared_contexts_32k.jsonl" total_rows = count_csv_rows(question_csv_path) @@ -295,7 +304,7 @@ def main(frame, version, top_k=20, num_workers=2): } for future in tqdm( - as_completed(future_to_idx), total=len(future_to_idx), desc="Processing conversations" + as_completed(future_to_idx), total=len(future_to_idx), desc="Processing conversations" ): idx = future_to_idx[future] try: @@ -324,21 +333,13 @@ def main(frame, version, top_k=20, num_workers=2): if __name__ == "__main__": parser = argparse.ArgumentParser(description="PersonaMem Search Script") - parser.add_argument( - "--lib", - type=str, - choices=["mem0", "mem0_graph", "memos-api", "memobase", "memu", "supermemory"], - default="memos-api", - ) - parser.add_argument( - "--version", type=str, default="default", help="Version of the evaluation framework." - ) - parser.add_argument( - "--top_k", type=int, default=20, help="Number of top results to retrieve from the search." - ) - parser.add_argument( - "--workers", type=int, default=3, help="Number of parallel workers for processing users." - ) + parser.add_argument("--lib", type=str, + choices=["memos-api-online", "mem0", "mem0_graph", "memos-api", "memobase", "memu", + "supermemory"], + default='memos-api') + parser.add_argument("--version", type=str, default="0925", help="Version of the evaluation framework.") + parser.add_argument("--top_k", type=int, default=20, help="Number of top results to retrieve from the search.") + parser.add_argument("--workers", type=int, default=3, help="Number of parallel workers for processing users.") args = parser.parse_args() From 5af351576097ec2190558f59205cbf1895f52932 Mon Sep 17 00:00:00 2001 From: Hao <120852460@qq.com> Date: Thu, 30 Oct 2025 21:06:54 +0800 Subject: [PATCH 2/4] =?UTF-8?q?feat(locomo):=20=E6=94=AF=E6=8C=81=E6=96=AD?= =?UTF-8?q?=E7=82=B9=E7=BB=AD=E4=BC=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- evaluation/scripts/locomo/locomo_ingestion.py | 30 +++++++++++++------ 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/evaluation/scripts/locomo/locomo_ingestion.py b/evaluation/scripts/locomo/locomo_ingestion.py index 518d90c4c..b47facda6 100644 --- a/evaluation/scripts/locomo/locomo_ingestion.py +++ b/evaluation/scripts/locomo/locomo_ingestion.py @@ -88,7 +88,7 @@ def ingest_session(client, session, frame, version, metadata): return elapsed_time -def process_user(conv_idx, frame, locomo_df, version): +def process_user(conv_idx, frame, locomo_df, version, success_records, f): conversation = locomo_df["conversation"].iloc[conv_idx] max_session_count = 35 start_time = time.time() @@ -149,11 +149,15 @@ def process_user(conv_idx, frame, locomo_df, version): print(f"Processing {valid_sessions} sessions for user {conv_idx}") - for session, metadata in sessions_to_process: - session_time = ingest_session(client, session, frame, version, metadata) - total_session_time += session_time - print(f"User {conv_idx}, {metadata['session_key']} processed in {session_time} seconds") - + for session_idx, (session, metadata) in enumerate(sessions_to_process): + if f"{conv_idx}_{session_idx}" not in success_records: + session_time = ingest_session(client, session, frame, version, metadata) + total_session_time += session_time + print(f"User {conv_idx}, {metadata['session_key']} processed in {session_time} seconds") + f.write(f"{conv_idx}_{session_idx}\n") + f.flush() + else: + print(f"Session {conv_idx}_{session_idx} already ingested") end_time = time.time() elapsed_time = round(end_time - start_time, 2) print(f"User {conv_idx} processed successfully in {elapsed_time} seconds") @@ -170,9 +174,17 @@ def main(frame, version="default", num_workers=4): print( f"Starting processing for {num_users} users in serial mode, each user using {num_workers} workers for sessions..." ) - with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor: + os.makedirs(f"results/locomo/{frame}-{version}/", exist_ok=True) + success_records = [] + record_file = f"results/locomo/{frame}-{version}/success_records.txt" + if os.path.exists(record_file): + with open(record_file) as f: + for i in f.readlines(): + success_records.append(i.strip()) + + with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor, open(record_file, "a+") as f: futures = [ - executor.submit(process_user, user_id, frame, locomo_df, version) + executor.submit(process_user, user_id, frame, locomo_df, version, success_records, f) for user_id in range(num_users) ] for future in concurrent.futures.as_completed(futures): @@ -216,7 +228,7 @@ def main(frame, version="default", num_workers=4): help="Version identifier for saving results (e.g., 1010)", ) parser.add_argument( - "--workers", type=int, default=3, help="Number of parallel workers to process users" + "--workers", type=int, default=10, help="Number of parallel workers to process users" ) args = parser.parse_args() lib = args.lib From f1a45a55c490867e9e7058344c561f3af3f78f82 Mon Sep 17 00:00:00 2001 From: Hao <120852460@qq.com> Date: Thu, 30 Oct 2025 21:14:08 +0800 Subject: [PATCH 3/4] format code --- evaluation/scripts/locomo/locomo_ingestion.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/evaluation/scripts/locomo/locomo_ingestion.py b/evaluation/scripts/locomo/locomo_ingestion.py index b47facda6..a9e4d5f02 100644 --- a/evaluation/scripts/locomo/locomo_ingestion.py +++ b/evaluation/scripts/locomo/locomo_ingestion.py @@ -182,7 +182,10 @@ def main(frame, version="default", num_workers=4): for i in f.readlines(): success_records.append(i.strip()) - with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor, open(record_file, "a+") as f: + with ( + concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor, + open(record_file, "a+") as f, + ): futures = [ executor.submit(process_user, user_id, frame, locomo_df, version, success_records, f) for user_id in range(num_users) From c1f1ca14bda2dd55bbe4a41989b7df785af64c8f Mon Sep 17 00:00:00 2001 From: Hao <120852460@qq.com> Date: Tue, 4 Nov 2025 14:09:49 +0800 Subject: [PATCH 4/4] doc: Update readme eval result --- README.md | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 6873ba2b1..50621b584 100644 --- a/README.md +++ b/README.md @@ -54,22 +54,20 @@ ## ๐Ÿ“ˆ Performance Benchmark -MemOS demonstrates significant improvements over baseline memory solutions in multiple reasoning tasks. - -| Model | Avg. Score | Multi-Hop | Open Domain | Single-Hop | Temporal Reasoning | -|-------------|------------|-----------|-------------|------------|---------------------| -| **OpenAI** | 0.5275 | 0.6028 | 0.3299 | 0.6183 | 0.2825 | -| **MemOS** | **0.7331** | **0.6430** | **0.5521** | **0.7844** | **0.7321** | -| **Improvement** | **+38.98%** | **+6.67%** | **+67.35%** | **+26.86%** | **+159.15%** | - -> ๐Ÿ’ก **Temporal reasoning accuracy improved by 159% compared to the OpenAI baseline.** - -### Details of End-to-End Evaluation on LOCOMO - -> [!NOTE] -> Comparison of LLM Judge Scores across five major tasks in the LOCOMO benchmark. Each bar shows the mean evaluation score judged by LLMs for a given method-task pair, with standard deviation as error bars. MemOS-0630 consistently outperforms baseline methods (LangMem, Zep, OpenAI, Mem0) across all task types, especially in multi-hop and temporal reasoning scenarios. - -END2END SCORE +MemOS demonstrates significant improvements over baseline memory solutions in multiple memory tasks, +showcasing its capabilities in **information extraction**, **temporal and cross-session reasoning**, and **personalized preference responses**. + +| Model | LOCOMO | LongMemEval | PrefEval-10 | PersonaMem | +|-----------------|-------------|-------------|-------------|-------------| +| **GPT-4o-mini** | 52.75 | 55.4 | 2.8 | 43.46 | +| **MemOS** | **75.80** | **77.80** | **71.90** | **61.17** | +| **Improvement** | **+43.70%** | **+40.43%** | **+2568%** | **+40.75%** | + +### Detailed Evaluation Results +- We use gpt-4o-mini as the processing and judging LLM and bge-m3 as embedding model in MemOS evaluation. +- The evaluation was conducted under conditions that align various settings as closely as possible. Reproduce the results with our scripts at [`evaluation`](./evaluation). +- Check the full search and response details at huggingface https://huggingface.co/datasets/MemTensor/MemOS_eval_result. +> ๐Ÿ’ก **MemOS outperforms all other methods (Mem0, Zep, Memobase, SuperMemory et al.) across all benchmarks!** ## โœจ Key Features