diff --git a/evaluation/README.md b/evaluation/README.md index 47cfeedc0..ba8c7a0cc 100644 --- a/evaluation/README.md +++ b/evaluation/README.md @@ -1,6 +1,6 @@ # Evaluation Memory Framework -This repository provides tools and scripts for evaluating the LoCoMo dataset using various models and APIs. +This repository provides tools and scripts for evaluating the `LoCoMo`, `LongMemEval`, `PrefEval`, `personaMem` dataset using various models and APIs. ## Installation @@ -68,7 +68,8 @@ First prepare the dataset `longmemeval_s` from https://huggingface.co/datasets/x ``` ### PrefEval Evaluation -To evaluate the **Prefeval** dataset using one of the supported memory frameworks โ€” run the following [script](./scripts/run_prefeval_eval.sh): +Downloading benchmark_dataset/filtered_inter_turns.json from https://github.com/amazon-science/PrefEval/blob/main/benchmark_dataset/filtered_inter_turns.json and save it as `./data/prefeval/filtered_inter_turns.json`. +To evaluate the **Prefeval** dataset โ€” run the following [script](./scripts/run_prefeval_eval.sh): ```bash # Edit the configuration in ./scripts/run_prefeval_eval.sh @@ -83,4 +84,4 @@ get `questions_32k.csv` and `shared_contexts_32k.jsonl` from https://huggingface # Specify the model and memory backend you want to use (e.g., mem0, zep, etc.) # If you want to use MIRIX, edit the the configuration in ./scripts/personamem/config.yaml ./scripts/run_pm_eval.sh -``` +``` \ No newline at end of file diff --git a/evaluation/scripts/PrefEval/pref_eval.py b/evaluation/scripts/PrefEval/pref_eval.py index f1966b847..ec079614d 100644 --- a/evaluation/scripts/PrefEval/pref_eval.py +++ b/evaluation/scripts/PrefEval/pref_eval.py @@ -392,9 +392,7 @@ async def main(concurrency_limit: int, input_file: str, output_file: str, output if __name__ == "__main__": parser = argparse.ArgumentParser(description="Evaluate assistant responses from a JSONL file.") - parser.add_argument( - "--input", type=str, required=True, help="Path to the input JSONL file from pref_memos.py." - ) + parser.add_argument("--input", type=str, required=True, help="Path to the input JSONL file.") parser.add_argument( "--concurrency-limit", @@ -402,13 +400,31 @@ async def main(concurrency_limit: int, input_file: str, output_file: str, output default=10, help="The maximum number of concurrent API calls.", ) + + parser.add_argument( + "--lib", + type=str, + choices=[ + "memos-api-online", + "mem0", + "mem0_graph", + "memos-api", + "memobase", + "memu", + "supermemory", + "zep", + ], + default="memos-api", + help="Which library to use (used in 'add' mode).", + ) + args = parser.parse_args() input_path = args.input output_dir = os.path.dirname(input_path) - output_jsonl_path = os.path.join(output_dir, "eval_pref_memos.jsonl") - output_excel_path = os.path.join(output_dir, "eval_pref_memos_summary.xlsx") + output_jsonl_path = os.path.join(output_dir, f"eval_pref_{args.lib}.jsonl") + output_excel_path = os.path.join(output_dir, f"eval_pref_{args.lib}_summary.xlsx") asyncio.run( main( diff --git a/evaluation/scripts/PrefEval/pref_mem0.py b/evaluation/scripts/PrefEval/pref_mem0.py index 4bbdb0fd8..214068567 100644 --- a/evaluation/scripts/PrefEval/pref_mem0.py +++ b/evaluation/scripts/PrefEval/pref_mem0.py @@ -29,7 +29,13 @@ def add_memory_for_line( - line_data: tuple, mem_client, num_irrelevant_turns: int, lib: str, version: str + line_data: tuple, + mem_client, + num_irrelevant_turns: int, + lib: str, + version: str, + success_records, + f, ) -> dict: """ Adds conversation memory for a single line of data to MemOS and returns the data with a persistent user_id. @@ -46,13 +52,22 @@ def add_memory_for_line( elif num_irrelevant_turns == 300: conversation = conversation + irre_300 - turns_add = 5 start_time_add = time.monotonic() - if conversation: - for chunk_start in range(0, len(conversation), turns_add * 2): - chunk = conversation[chunk_start : chunk_start + turns_add * 2] - timestamp_add = int(time.time() * 100) - mem_client.add(messages=chunk, user_id=user_id, timestamp=timestamp_add) + + for idx, _ in enumerate(conversation[::2]): + msg_idx = idx * 2 + record_id = f"{lib}_user_pref_eval_{i}_{version}_{str(msg_idx)}" + timestamp_add = int(time.time() * 100) + + if record_id not in success_records: + mem_client.add( + messages=conversation[msg_idx : msg_idx + 2], + user_id=user_id, + timestamp=timestamp_add, + ) + f.write(f"{record_id}\n") + f.flush() + end_time_add = time.monotonic() add_duration = end_time_add - start_time_add @@ -210,6 +225,15 @@ def main(): from utils.client import Mem0Client mem_client = Mem0Client(enable_graph="graph" in args.lib) + os.makedirs(f"results/prefeval/{args.lib}_{args.version}", exist_ok=True) + success_records = set() + record_file = f"results/prefeval/{args.lib}_{args.version}/success_records.txt" + if os.path.exists(record_file): + print(f"Loading existing success records from {record_file}...") + with open(record_file, encoding="utf-8") as f: + for i in f.readlines(): + success_records.add(i.strip()) + print(f"Loaded {len(success_records)} records.") if args.mode == "add": print(f"Running in 'add' mode. Ingesting memories from '{args.input}'...") @@ -218,6 +242,7 @@ def main(): with ( open(args.output, "w", encoding="utf-8") as outfile, concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + open(record_file, "a+", encoding="utf-8") as f, ): futures = [ executor.submit( @@ -227,6 +252,8 @@ def main(): args.add_turn, args.lib, args.version, + success_records, + f, ) for i, line in enumerate(lines) ] diff --git a/evaluation/scripts/PrefEval/pref_memobase.py b/evaluation/scripts/PrefEval/pref_memobase.py index 4f6174d3d..e99b10520 100644 --- a/evaluation/scripts/PrefEval/pref_memobase.py +++ b/evaluation/scripts/PrefEval/pref_memobase.py @@ -12,7 +12,6 @@ from openai import OpenAI from tqdm import tqdm - ROOT_DIR = os.path.dirname( os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) ) @@ -28,7 +27,13 @@ def add_memory_for_line( - line_data: tuple, mem_client, num_irrelevant_turns: int, lib: str, version: str + line_data: tuple, + mem_client, + num_irrelevant_turns: int, + lib: str, + version: str, + success_records, + f, ) -> dict: """ Adds conversation memory for a single line of data to MemOS and returns the data with a persistent user_id. @@ -36,8 +41,6 @@ def add_memory_for_line( i, line = line_data user_id = f"{lib}_user_pref_eval_{i}_{version}" mem_client.delete_user(user_id) - user_id = mem_client.client.add_user({"user_id": user_id}) - print("user_id:", user_id) try: original_data = json.loads(line) conversation = original_data.get("conversation", []) @@ -63,7 +66,14 @@ def add_memory_for_line( "created_at": timestamp_add, } ) - mem_client.add(messages=messages, user_id=user_id) + for idx, _ in enumerate(conversation[::2]): + msg_idx = idx * 2 + record_id = f"{lib}_user_pref_eval_{i}_{version}_{str(msg_idx)}" + + if record_id not in success_records: + mem_client.add(messages=conversation[msg_idx : msg_idx + 2], user_id=user_id) + f.write(f"{record_id}\n") + f.flush() end_time_add = time.monotonic() add_duration = end_time_add - start_time_add @@ -222,6 +232,16 @@ def main(): mem_client = MemobaseClient() + os.makedirs(f"results/prefeval/{args.lib}_{args.version}", exist_ok=True) + success_records = set() + record_file = f"results/prefeval/{args.lib}_{args.version}/success_records.txt" + if os.path.exists(record_file): + print(f"Loading existing success records from {record_file}...") + with open(record_file, encoding="utf-8") as f: + for i in f.readlines(): + success_records.add(i.strip()) + print(f"Loaded {len(success_records)} records.") + if args.mode == "add": print(f"Running in 'add' mode. Ingesting memories from '{args.input}'...") print(f"Adding {args.add_turn} irrelevant turns.") @@ -229,6 +249,7 @@ def main(): with ( open(args.output, "w", encoding="utf-8") as outfile, concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + open(record_file, "a+", encoding="utf-8") as f, ): futures = [ executor.submit( @@ -238,6 +259,8 @@ def main(): args.add_turn, args.lib, args.version, + success_records, + f, ) for i, line in enumerate(lines) ] diff --git a/evaluation/scripts/PrefEval/pref_memos.py b/evaluation/scripts/PrefEval/pref_memos.py index fc358dc36..0ee88e868 100644 --- a/evaluation/scripts/PrefEval/pref_memos.py +++ b/evaluation/scripts/PrefEval/pref_memos.py @@ -12,7 +12,6 @@ from openai import OpenAI from tqdm import tqdm - ROOT_DIR = os.path.dirname( os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) ) @@ -21,7 +20,6 @@ sys.path.insert(0, ROOT_DIR) sys.path.insert(0, EVAL_SCRIPTS_DIR) - load_dotenv() OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") BASE_URL = os.getenv("OPENAI_BASE_URL") @@ -30,8 +28,8 @@ def add_memory_for_line( - line_data: tuple, mem_client, num_irrelevant_turns: int, lib: str, version: str -) -> dict: + line_data, mem_client, num_irrelevant_turns, lib, version, success_records, f +): """ Adds conversation memory for a single line of data to MemOS and returns the data with a persistent user_id. """ @@ -47,15 +45,22 @@ def add_memory_for_line( elif num_irrelevant_turns == 300: conversation = conversation + irre_300 - turns_add = 5 start_time_add = time.monotonic() - if conversation: - if os.getenv("PRE_SPLIT_CHUNK", "false").lower() == "true": - for chunk_start in range(0, len(conversation), turns_add * 2): - chunk = conversation[chunk_start : chunk_start + turns_add * 2] - mem_client.add(messages=chunk, user_id=user_id, conv_id=None, batch_size=2) - else: - mem_client.add(messages=conversation, user_id=user_id, conv_id=None, batch_size=2) + + for idx, _ in enumerate(conversation[::2]): + msg_idx = idx * 2 + record_id = f"{lib}_user_pref_eval_{i}_{version}_{str(msg_idx)}" + + if record_id not in success_records: + mem_client.add( + messages=conversation[msg_idx : msg_idx + 2], + user_id=user_id, + conv_id=None, + batch_size=2, + ) + f.write(f"{record_id}\n") + f.flush() + end_time_add = time.monotonic() add_duration = end_time_add - start_time_add @@ -68,7 +73,7 @@ def add_memory_for_line( return None -def search_memory_for_line(line_data: tuple, mem_client, top_k_value: int) -> dict: +def search_memory_for_line(line_data, mem_client, top_k_value): """ Processes a single line of data, searching memory based on the question. """ @@ -120,7 +125,7 @@ def search_memory_for_line(line_data: tuple, mem_client, top_k_value: int) -> di return None -def generate_response_for_line(line_data: tuple, openai_client: OpenAI, lib: str) -> dict: +def generate_response_for_line(line_data, openai_client, lib): """ Generates a response for a single line of data using pre-fetched memories. """ @@ -195,7 +200,7 @@ def main(): parser.add_argument( "--lib", type=str, - choices=["memos-api", "memos-local"], + choices=["memos-api", "memos-api-online"], default="memos-api", help="Which MemOS library to use (used in 'add' mode).", ) @@ -218,9 +223,22 @@ def main(): print(f"Error: Input file '{args.input}' not found") return - from utils.client import MemosApiClient + from utils.client import MemosApiClient, MemosApiOnlineClient + + if args.lib == "memos-api": + mem_client = MemosApiClient() + elif args.lib == "memos-api-online": + mem_client = MemosApiOnlineClient() - mem_client = MemosApiClient() + os.makedirs(f"results/prefeval/{args.lib}_{args.version}", exist_ok=True) + success_records = set() + record_file = f"results/prefeval/{args.lib}_{args.version}/success_records.txt" + if os.path.exists(record_file): + print(f"Loading existing success records from {record_file}...") + with open(record_file, encoding="utf-8") as f: + for i in f.readlines(): + success_records.add(i.strip()) + print(f"Loaded {len(success_records)} records.") if args.mode == "add": print(f"Running in 'add' mode. Ingesting memories from '{args.input}'...") @@ -229,6 +247,7 @@ def main(): with ( open(args.output, "w", encoding="utf-8") as outfile, concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + open(record_file, "a+", encoding="utf-8") as record_f, ): futures = [ executor.submit( @@ -238,6 +257,8 @@ def main(): args.add_turn, args.lib, args.version, + success_records, + record_f, ) for i, line in enumerate(lines) ] diff --git a/evaluation/scripts/PrefEval/pref_memu.py b/evaluation/scripts/PrefEval/pref_memu.py index 2b9f769a4..4c37db7b7 100644 --- a/evaluation/scripts/PrefEval/pref_memu.py +++ b/evaluation/scripts/PrefEval/pref_memu.py @@ -14,7 +14,6 @@ from openai import OpenAI from tqdm import tqdm - ROOT_DIR = os.path.dirname( os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) ) @@ -30,7 +29,13 @@ def add_memory_for_line( - line_data: tuple, mem_client, num_irrelevant_turns: int, lib: str, version: str + line_data: tuple, + mem_client, + num_irrelevant_turns: int, + lib: str, + version: str, + success_records, + f, ) -> dict: """ Adds conversation memory for a single line of data to MemOS and returns the data with a persistent user_id. @@ -47,19 +52,21 @@ def add_memory_for_line( elif num_irrelevant_turns == 300: conversation = conversation + irre_300 - turns_add = 5 start_time_add = time.monotonic() - if conversation: - if os.getenv("PRE_SPLIT_CHUNK", "false").lower() == "true": - for chunk_start in range(0, len(conversation), turns_add * 2): - chunk = conversation[chunk_start : chunk_start + turns_add * 2] - mem_client.add( - messages=chunk, user_id=user_id, iso_date=datetime.now().isoformat() - ) - else: + + for idx, _ in enumerate(conversation[::2]): + msg_idx = idx * 2 + record_id = f"{lib}_user_pref_eval_{i}_{version}_{str(msg_idx)}" + + if record_id not in success_records: mem_client.add( - messages=conversation, user_id=user_id, iso_date=datetime.now().isoformat() + messages=conversation[msg_idx : msg_idx + 2], + user_id=user_id, + iso_date=datetime.now().isoformat(), ) + f.write(f"{record_id}\n") + f.flush() + end_time_add = time.monotonic() add_duration = end_time_add - start_time_add @@ -219,6 +226,16 @@ def main(): mem_client = MemuClient() + os.makedirs(f"results/prefeval/{args.lib}_{args.version}", exist_ok=True) + success_records = set() + record_file = f"results/prefeval/{args.lib}_{args.version}/success_records.txt" + if os.path.exists(record_file): + print(f"Loading existing success records from {record_file}...") + with open(record_file, encoding="utf-8") as f: + for i in f.readlines(): + success_records.add(i.strip()) + print(f"Loaded {len(success_records)} records.") + if args.mode == "add": print(f"Running in 'add' mode. Ingesting memories from '{args.input}'...") print(f"Adding {args.add_turn} irrelevant turns.") @@ -226,6 +243,7 @@ def main(): with ( open(args.output, "w", encoding="utf-8") as outfile, concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + open(record_file, "a+", encoding="utf-8") as f, ): futures = [ executor.submit( @@ -235,6 +253,8 @@ def main(): args.add_turn, args.lib, args.version, + success_records, + f, ) for i, line in enumerate(lines) ] diff --git a/evaluation/scripts/PrefEval/pref_supermemory.py b/evaluation/scripts/PrefEval/pref_supermemory.py index 88a64038b..68963e2af 100644 --- a/evaluation/scripts/PrefEval/pref_supermemory.py +++ b/evaluation/scripts/PrefEval/pref_supermemory.py @@ -12,7 +12,6 @@ from openai import OpenAI from tqdm import tqdm - ROOT_DIR = os.path.dirname( os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) ) @@ -28,7 +27,13 @@ def add_memory_for_line( - line_data: tuple, mem_client, num_irrelevant_turns: int, lib: str, version: str + line_data: tuple, + mem_client, + num_irrelevant_turns: int, + lib: str, + version: str, + success_records, + f, ) -> dict: """ Adds conversation memory for a single line of data to MemOS and returns the data with a persistent user_id. @@ -45,15 +50,20 @@ def add_memory_for_line( elif num_irrelevant_turns == 300: conversation = conversation + irre_300 - turns_add = 5 start_time_add = time.monotonic() - if conversation: - if os.getenv("PRE_SPLIT_CHUNK", "false").lower() == "true": - for chunk_start in range(0, len(conversation), turns_add * 2): - chunk = conversation[chunk_start : chunk_start + turns_add * 2] - mem_client.add(messages=chunk, user_id=user_id) - else: - mem_client.add(messages=conversation, user_id=user_id) + + for idx, _ in enumerate(conversation[::2]): + msg_idx = idx * 2 + record_id = f"{lib}_user_pref_eval_{i}_{version}_{str(msg_idx)}" + + if record_id not in success_records: + mem_client.add( + messages=conversation[msg_idx : msg_idx + 2], + user_id=user_id, + ) + f.write(f"{record_id}\n") + f.flush() + end_time_add = time.monotonic() add_duration = end_time_add - start_time_add @@ -90,9 +100,7 @@ def search_memory_for_line(line_data: tuple, mem_client, top_k_value: int) -> di start_time_search = time.monotonic() relevant_memories = mem_client.search(query=question, user_id=user_id, top_k=top_k_value) search_memories_duration = time.monotonic() - start_time_search - memories_str = "\n".join( - f"- {entry.get('memory', '')}" for entry in relevant_memories["text_mem"][0]["memories"] - ) + memories_str = relevant_memories memory_tokens_used = len(tokenizer.encode(memories_str)) @@ -250,6 +258,16 @@ def search(self, query, user_id, top_k): mem_client = SupermemoryClient() + os.makedirs(f"results/prefeval/{args.lib}_{args.version}", exist_ok=True) + success_records = set() + record_file = f"results/prefeval/{args.lib}_{args.version}/success_records.txt" + if os.path.exists(record_file): + print(f"Loading existing success records from {record_file}...") + with open(record_file, encoding="utf-8") as f: + for i in f.readlines(): + success_records.add(i.strip()) + print(f"Loaded {len(success_records)} records.") + if args.mode == "add": print(f"Running in 'add' mode. Ingesting memories from '{args.input}'...") print(f"Adding {args.add_turn} irrelevant turns.") @@ -257,6 +275,7 @@ def search(self, query, user_id, top_k): with ( open(args.output, "w", encoding="utf-8") as outfile, concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + open(record_file, "a+", encoding="utf-8") as f, ): futures = [ executor.submit( @@ -266,6 +285,8 @@ def search(self, query, user_id, top_k): args.add_turn, args.lib, args.version, + success_records, + f, ) for i, line in enumerate(lines) ] diff --git a/evaluation/scripts/PrefEval/pref_zep.py b/evaluation/scripts/PrefEval/pref_zep.py index 91aef1492..be98c6ba9 100644 --- a/evaluation/scripts/PrefEval/pref_zep.py +++ b/evaluation/scripts/PrefEval/pref_zep.py @@ -14,7 +14,6 @@ from openai import OpenAI from tqdm import tqdm - ROOT_DIR = os.path.dirname( os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) ) @@ -30,7 +29,13 @@ def add_memory_for_line( - line_data: tuple, mem_client, num_irrelevant_turns: int, lib: str, version: str + line_data: tuple, + mem_client, + num_irrelevant_turns: int, + lib: str, + version: str, + success_records, + f, ) -> dict: """ Adds conversation memory for a single line of data to MemOS and returns the data with a persistent user_id. @@ -47,25 +52,22 @@ def add_memory_for_line( elif num_irrelevant_turns == 300: conversation = conversation + irre_300 - turns_add = 5 start_time_add = time.monotonic() - if conversation: - if os.getenv("PRE_SPLIT_CHUNK", "false").lower() == "true": - for chunk_start in range(0, len(conversation), turns_add * 2): - chunk = conversation[chunk_start : chunk_start + turns_add * 2] - mem_client.add( - messages=chunk, - user_id=user_id, - conv_id=None, - timestamp=datetime.now().isoformat(), - ) - else: + + for idx, _ in enumerate(conversation[::2]): + msg_idx = idx * 2 + record_id = f"{lib}_user_pref_eval_{i}_{version}_{str(msg_idx)}" + + if record_id not in success_records: mem_client.add( - messages=conversation, + messages=conversation[msg_idx : msg_idx + 2], user_id=user_id, conv_id=None, timestamp=datetime.now().isoformat(), ) + f.write(f"{record_id}\n") + f.flush() + end_time_add = time.monotonic() add_duration = end_time_add - start_time_add @@ -225,6 +227,16 @@ def main(): mem_client = ZepClient() + os.makedirs(f"results/prefeval/{args.lib}_{args.version}", exist_ok=True) + success_records = set() + record_file = f"results/prefeval/{args.lib}_{args.version}/success_records.txt" + if os.path.exists(record_file): + print(f"Loading existing success records from {record_file}...") + with open(record_file, encoding="utf-8") as f: + for i in f.readlines(): + success_records.add(i.strip()) + print(f"Loaded {len(success_records)} records.") + if args.mode == "add": print(f"Running in 'add' mode. Ingesting memories from '{args.input}'...") print(f"Adding {args.add_turn} irrelevant turns.") @@ -232,6 +244,7 @@ def main(): with ( open(args.output, "w", encoding="utf-8") as outfile, concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor, + open(record_file, "a+", encoding="utf-8") as f, ): futures = [ executor.submit( @@ -241,6 +254,8 @@ def main(): args.add_turn, args.lib, args.version, + success_records, + f, ) for i, line in enumerate(lines) ] diff --git a/evaluation/scripts/PrefEval/prefeval_preprocess.py b/evaluation/scripts/PrefEval/prefeval_preprocess.py index 9ace9dec9..b8ccf3f34 100644 --- a/evaluation/scripts/PrefEval/prefeval_preprocess.py +++ b/evaluation/scripts/PrefEval/prefeval_preprocess.py @@ -94,6 +94,7 @@ def process_jsonl_file(input_filepath, output_filepath): def main(): huggingface_dataset_name = "siyanzhao/prefeval_implicit_persona" output_directory = "./data/prefeval" + os.makedirs(output_directory, exist_ok=True) input_file_path = os.path.join(output_directory, "train.jsonl") processed_file_path = os.path.join(output_directory, "pref_processed.jsonl") diff --git a/evaluation/scripts/personamem/pm_ingestion.py b/evaluation/scripts/personamem/pm_ingestion.py index cab0fbeb5..fdbf43528 100644 --- a/evaluation/scripts/personamem/pm_ingestion.py +++ b/evaluation/scripts/personamem/pm_ingestion.py @@ -10,7 +10,6 @@ from tqdm import tqdm - sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) @@ -25,28 +24,22 @@ def ingest_session(session, user_id, session_id, frame, client): f"[{frame}] ๐Ÿ“ Session [{session_id}: [{idx + 1}/{len(session)}] Ingesting message: {msg['role']} - {msg['content'][:50]}..." ) timestamp_add = int(time.time() * 100) - client.add(messages=messages, user_id=user_id, timestamp=timestamp_add) + client.add(messages=messages, user_id=user_id, timestamp=timestamp_add, batch_size=10) print(f"[{frame}] โœ… Session [{session_id}]: Ingested {len(messages)} messages") elif frame == "memos-api": - if os.getenv("PRE_SPLIT_CHUNK") == "true": - for i in range(0, len(session), 10): - messages = session[i : i + 10] - client.add(messages=messages, user_id=user_id, conv_id=session_id, batch_size=2) - print(f"[{frame}] โœ… Session [{session_id}]: Ingested {len(messages)} messages") - else: - client.add(messages=session, user_id=user_id, conv_id=session_id, batch_size=2) - print(f"[{frame}] โœ… Session [{session_id}]: Ingested {len(session)} messages") + client.add(messages=session, user_id=user_id, conv_id=session_id, batch_size=10) + print(f"[{frame}] โœ… Session [{session_id}]: Ingested {len(session)} messages") elif frame == "memobase": for _idx, msg in enumerate(session): if msg["role"] != "system": messages.append( { "role": msg["role"], - "content": msg["content"][:8000], + "content": msg["content"], "created_at": datetime.now().isoformat(), } ) - client.add(messages, user_id) + client.add(messages, user_id, batch_size=10) print(f"[{frame}] โœ… Session [{session_id}]: Ingested {len(messages)} messages") elif frame == "supermemory": for _idx, msg in enumerate(session): @@ -62,6 +55,8 @@ def ingest_session(session, user_id, session_id, frame, client): for _idx, msg in enumerate(session): messages.append({"role": msg["role"], "content": msg["content"]}) client.add(messages, user_id, datetime.now().astimezone().isoformat()) + elif frame == "memos-api-online": + client.add(messages, user_id, session_id, batch_size=10) def build_jsonl_index(jsonl_path): @@ -125,7 +120,11 @@ def count_csv_rows(csv_path): return sum(1 for _ in f) - 1 -def ingest_conv(row_data, context, version, conv_idx, frame): +def ingest_conv(row_data, context, version, conv_idx, frame, success_records, f): + if str(conv_idx) in success_records: + print(f"โœ… Conversation {conv_idx} already ingested, skipping...") + return conv_idx + end_index_in_shared_context = row_data["end_index_in_shared_context"] context = context[: int(end_index_in_shared_context)] user_id = f"pm_exper_user_{conv_idx}_{version}" @@ -150,8 +149,6 @@ def ingest_conv(row_data, context, version, conv_idx, frame): print("๐Ÿ”Œ Using Mem0 client for ingestion...") client.client.delete_all(user_id=user_id) print(f"๐Ÿ—‘๏ธ Deleted existing memories for user {user_id}...") - - print(f"๐Ÿ—‘๏ธ Deleted existing memories for user {user_id}...") elif frame == "memos-api": from utils.client import MemosApiClient @@ -160,8 +157,6 @@ def ingest_conv(row_data, context, version, conv_idx, frame): from utils.client import MemobaseClient client = MemobaseClient() - print("๐Ÿ”Œ Using Memobase client for ingestion...") - client.delte_user(user_id) elif frame == "supermemory": from utils.client import SupermemoryClient @@ -170,15 +165,33 @@ def ingest_conv(row_data, context, version, conv_idx, frame): from utils.client import MemuClient client = MemuClient() + elif frame == "memos-api-online": + from utils.client import MemosApiOnlineClient + + client = MemosApiOnlineClient() + + try: + ingest_session(session=context, user_id=user_id, session_id=conv_idx, frame=frame, client=client) + print(f"โœ… Ingestion of conversation {conv_idx} completed") + print("=" * 80) + + f.write(f"{conv_idx}\n") + f.flush() + return conv_idx + except Exception as e: + print(f"โŒ Error ingesting conversation {conv_idx}: {e}") + raise - ingest_session( - session=context, user_id=user_id, session_id=conv_idx, frame=frame, client=client - ) - print(f"โœ… Ingestion of conversation {conv_idx} completed") - print("=" * 80) +def main(frame, version, num_workers=2, clear=False): + os.makedirs(f"results/pm/{frame}-{version}/", exist_ok=True) + record_file = f"results/pm/{frame}-{version}/success_records.txt" + + if clear: + if os.path.exists(record_file): + os.remove(record_file) + print("๐Ÿงน Cleared progress records") -def main(frame, version, num_workers=2): print("\n" + "=" * 80) print(f"๐Ÿš€ PERSONAMEM INGESTION - {frame.upper()} v{version}".center(80)) print("=" * 80) @@ -190,31 +203,48 @@ def main(frame, version, num_workers=2): print(f"๐Ÿ“š Loaded PersonaMem dataset from {question_csv_path} and {context_jsonl_path}") print("-" * 80) - start_time = datetime.now() + success_records = set() + if os.path.exists(record_file): + with open(record_file, "r") as f: + success_records = set(line.strip() for line in f) + print(f"๐Ÿ“Š Found {len(success_records)} completed conversations, {total_rows - len(success_records)} remaining") + start_time = datetime.now() all_data = list(load_rows_with_context(question_csv_path, context_jsonl_path)) - with ThreadPoolExecutor(max_workers=num_workers) as executor: - future_to_idx = { - executor.submit( + pending_data = [(idx, row_data, context) for idx, (row_data, context) in enumerate(all_data) + if str(idx) not in success_records] + + if not pending_data: + print("โœ… All conversations have been processed!") + return + + print(f"๐Ÿ”„ Processing {len(pending_data)} conversations...") + + with ThreadPoolExecutor(max_workers=num_workers) as executor, open(record_file, "a") as f: + futures = [] + for idx, row_data, context in pending_data: + future = executor.submit( ingest_conv, row_data=row_data, context=context, version=version, conv_idx=idx, frame=frame, - ): idx - for idx, (row_data, context) in enumerate(all_data) - } + success_records=success_records, + f=f + ) + futures.append(future) + completed_count = 0 for future in tqdm( - as_completed(future_to_idx), total=len(future_to_idx), desc="Processing conversations" + as_completed(futures), total=len(futures), desc="Processing conversations" ): - idx = future_to_idx[future] try: - future.result() + result = future.result() + completed_count += 1 except Exception as exc: - print(f"\nโŒ Conversation {idx} generated an exception: {exc}") + print(f"\nโŒ Conversation generated an exception: {exc}") end_time = datetime.now() elapsed_time = end_time - start_time @@ -225,23 +255,19 @@ def main(frame, version, num_workers=2): print("=" * 80) print(f"โฑ๏ธ Total time taken to ingest {total_rows} rows: {elapsed_time_str}") print(f"๐Ÿ”„ Framework: {frame} | Version: {version} | Workers: {num_workers}") + print(f"๐Ÿ“ˆ Processed: {len(success_records) + completed_count}/{total_rows} conversations") print("=" * 80 + "\n") if __name__ == "__main__": parser = argparse.ArgumentParser(description="PersonaMem Ingestion Script") - parser.add_argument( - "--lib", - type=str, - choices=["mem0", "mem0_graph", "memos-api", "memobase", "memu", "supermemory", "zep"], - default="memos-api", - ) - parser.add_argument( - "--version", type=str, default="0925-1", help="Version of the evaluation framework." - ) - parser.add_argument( - "--workers", type=int, default=3, help="Number of parallel workers for processing users." - ) + parser.add_argument("--lib", type=str, + choices=["memos-api-online", "mem0", "mem0_graph", "memos-api", "memobase", "memu", + "supermemory", "zep"], + default='memos-api') + parser.add_argument("--version", type=str, default="default", help="Version of the evaluation framework.") + parser.add_argument("--workers", type=int, default=3, help="Number of parallel workers for processing users.") + parser.add_argument("--clear", action="store_true", help="Clear progress and start fresh") args = parser.parse_args() - main(frame=args.lib, version=args.version, num_workers=args.workers) + main(frame=args.lib, version=args.version, num_workers=args.workers, clear=args.clear) \ No newline at end of file diff --git a/evaluation/scripts/personamem/pm_metric.py b/evaluation/scripts/personamem/pm_metric.py index e88c538d4..b9d10a576 100644 --- a/evaluation/scripts/personamem/pm_metric.py +++ b/evaluation/scripts/personamem/pm_metric.py @@ -353,12 +353,12 @@ def print_summary(results): parser.add_argument( "--lib", type=str, - choices=["zep", "mem0", "mem0_graph", "memos-api", "memobase", "memu", "supermemory"], + choices=["zep", "mem0", "mem0_graph", "memos-api", "memos-api-online", "memobase", "memu", "supermemory"], required=True, help="Memory library to evaluate", default="memos-api", ) - parser.add_argument("--version", type=str, default="0925", help="Evaluation framework version") + parser.add_argument("--version", type=str, default="default", help="Evaluation framework version") args = parser.parse_args() lib, version = args.lib, args.version diff --git a/evaluation/scripts/personamem/pm_responses.py b/evaluation/scripts/personamem/pm_responses.py index ff561f8d8..2e41b4140 100644 --- a/evaluation/scripts/personamem/pm_responses.py +++ b/evaluation/scripts/personamem/pm_responses.py @@ -10,7 +10,6 @@ from openai import OpenAI from tqdm import tqdm - sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import re @@ -154,9 +153,9 @@ def main(frame, version, num_runs=3, num_workers=4): future_to_user_id[future] = user_id for future in tqdm( - as_completed(future_to_user_id), - total=len(future_to_user_id), - desc="๐Ÿ“ Generating responses", + as_completed(future_to_user_id), + total=len(future_to_user_id), + desc="๐Ÿ“ Generating responses", ): user_id = future_to_user_id[future] try: @@ -185,21 +184,12 @@ def main(frame, version, num_runs=3, num_workers=4): if __name__ == "__main__": parser = argparse.ArgumentParser(description="PersonaMem Response Generation Script") - parser.add_argument( - "--lib", - type=str, - choices=["zep", "mem0", "mem0_graph", "memos-api", "memobase", "memu", "supermemory"], - default="memos-api", - ) - parser.add_argument( - "--version", type=str, default="0925", help="Version of the evaluation framework." - ) - parser.add_argument( - "--num_runs", type=int, default=3, help="Number of runs for LLM-as-a-Judge evaluation." - ) - parser.add_argument( - "--workers", type=int, default=3, help="Number of worker threads to use for processing." - ) + parser.add_argument("--lib", type=str, + choices=["memos-api-online", "zep", "mem0", "mem0_graph", "memos-api", "memobase", "memu", + "supermemory"], default='memos-api') + parser.add_argument("--version", type=str, default="default", help="Version of the evaluation framework.") + parser.add_argument("--num_runs", type=int, default=3, help="Number of runs for LLM-as-a-Judge evaluation.") + parser.add_argument("--workers", type=int, default=10, help="Number of worker threads to use for processing.") args = parser.parse_args() main(frame=args.lib, version=args.version, num_runs=args.num_runs, num_workers=args.workers) diff --git a/evaluation/scripts/personamem/pm_search.py b/evaluation/scripts/personamem/pm_search.py index 441474c7c..edec6b008 100644 --- a/evaluation/scripts/personamem/pm_search.py +++ b/evaluation/scripts/personamem/pm_search.py @@ -3,12 +3,10 @@ import json import os import sys - from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from time import time - from tqdm import tqdm @@ -83,8 +81,8 @@ def memos_search(client, user_id, query, top_k): start = time() results = client.search(query=query, user_id=user_id, top_k=top_k) search_memories = ( - "\n".join(item["memory"] for cube in results["text_mem"] for item in cube["memories"]) - + f"\n{results['pref_string']}" + "\n".join(item["memory"] for cube in results["text_mem"] for item in cube["memories"]) + + f"\n{results['pref_string']}" ) context = MEMOS_CONTEXT_TEMPLATE.format(user_id=user_id, memories=search_memories) @@ -226,6 +224,17 @@ def process_user(row_data, conv_idx, frame, version, top_k=20): client = MemuClient() print("๐Ÿ”Œ Using memu client for search...") context, duration_ms = memu_search(client, question, user_id, top_k) + elif frame == "memobase": + from utils.client import MemobaseClient + + client = MemobaseClient() + print("๐Ÿ”Œ Using Memobase client for search...") + context, duration_ms = memobase_search(client, question, user_id, top_k) + elif frame == "memos-api-online": + from utils.client import MemosApiOnlineClient + client = MemosApiOnlineClient() + print("๐Ÿ”Œ Using memos-api-online client for search...") + context, duration_ms = memos_search(client, question, user_id, top_k) search_results[user_id].append( { @@ -244,7 +253,7 @@ def process_user(row_data, conv_idx, frame, version, top_k=20): os.makedirs(f"results/pm/{frame}-{version}/tmp", exist_ok=True) with open( - f"results/pm/{frame}-{version}/tmp/{frame}_pm_search_results_{conv_idx}.json", "w" + f"results/pm/{frame}-{version}/tmp/{frame}_pm_search_results_{conv_idx}.json", "w" ) as f: json.dump(search_results, f, indent=4) print(f"๐Ÿ’พ Search results for conversation {conv_idx} saved...") @@ -295,7 +304,7 @@ def main(frame, version, top_k=20, num_workers=2): } for future in tqdm( - as_completed(future_to_idx), total=len(future_to_idx), desc="Processing conversations" + as_completed(future_to_idx), total=len(future_to_idx), desc="Processing conversations" ): idx = future_to_idx[future] try: @@ -324,21 +333,13 @@ def main(frame, version, top_k=20, num_workers=2): if __name__ == "__main__": parser = argparse.ArgumentParser(description="PersonaMem Search Script") - parser.add_argument( - "--lib", - type=str, - choices=["mem0", "mem0_graph", "memos-api", "memobase", "memu", "supermemory"], - default="memos-api", - ) - parser.add_argument( - "--version", type=str, default="default", help="Version of the evaluation framework." - ) - parser.add_argument( - "--top_k", type=int, default=20, help="Number of top results to retrieve from the search." - ) - parser.add_argument( - "--workers", type=int, default=3, help="Number of parallel workers for processing users." - ) + parser.add_argument("--lib", type=str, + choices=["memos-api-online", "mem0", "mem0_graph", "memos-api", "memobase", "memu", + "supermemory"], + default='memos-api') + parser.add_argument("--version", type=str, default="default", help="Version of the evaluation framework.") + parser.add_argument("--top_k", type=int, default=20, help="Number of top results to retrieve from the search.") + parser.add_argument("--workers", type=int, default=3, help="Number of parallel workers for processing users.") args = parser.parse_args() diff --git a/evaluation/scripts/run_lme_eval.sh b/evaluation/scripts/run_lme_eval.sh index 08e431312..8fa8d6c7e 100755 --- a/evaluation/scripts/run_lme_eval.sh +++ b/evaluation/scripts/run_lme_eval.sh @@ -2,7 +2,7 @@ # Common parameters for all scripts LIB="memos-api" -VERSION="1020" +VERSION="default" WORKERS=10 TOPK=20 diff --git a/evaluation/scripts/run_locomo_eval.sh b/evaluation/scripts/run_locomo_eval.sh index d9c13a1ac..37569956f 100755 --- a/evaluation/scripts/run_locomo_eval.sh +++ b/evaluation/scripts/run_locomo_eval.sh @@ -2,7 +2,7 @@ # Common parameters for all scripts LIB="memos-api" -VERSION="072001" +VERSION="default" WORKERS=10 TOPK=20 diff --git a/evaluation/scripts/run_openai_eval.sh b/evaluation/scripts/run_openai_eval.sh index 27bb712af..e07f113e5 100755 --- a/evaluation/scripts/run_openai_eval.sh +++ b/evaluation/scripts/run_openai_eval.sh @@ -2,7 +2,7 @@ # Common parameters for all scripts LIB="openai" -VERSION="063001" +VERSION="default" WORKERS=10 NUM_RUNS=3 diff --git a/evaluation/scripts/run_pm_eval.sh b/evaluation/scripts/run_pm_eval.sh index a46440bfc..39d9e72ca 100755 --- a/evaluation/scripts/run_pm_eval.sh +++ b/evaluation/scripts/run_pm_eval.sh @@ -2,21 +2,11 @@ # Common parameters for all scripts LIB="memos-api" -VERSION="072202" +VERSION="default" WORKERS=10 TOPK=20 -if [ "$LIB" = "mirix" ]; then - echo "Running pm_mirix.py 100 times..." - for i in {1..100}; do - echo "Iteration $i/100" - CUDA_VISIBLE_DEVICES=0 python scripts/personamem/pm_mirix.py --version $VERSION --workers 1 - if [ $? -ne 0 ]; then - echo "Error running xx.py on iteration $i" - exit 1 - fi - done -elif ["$LIB" = "zep"]; then +if ["$LIB" = "zep"]; then CUDA_VISIBLE_DEVICES=0 python scripts/personamem/pm_ingestion_zep.py --version $VERSION --workers $WORKERS CUDA_VISIBLE_DEVICES=0 python scripts/personamem/pm_search_zep.py --version $VERSION --top_k $TOPK --workers $WORKERS echo "Running pm_responses.py..." diff --git a/evaluation/scripts/run_prefeval_eval.sh b/evaluation/scripts/run_prefeval_eval.sh index a79cefcc2..129382ebf 100755 --- a/evaluation/scripts/run_prefeval_eval.sh +++ b/evaluation/scripts/run_prefeval_eval.sh @@ -6,13 +6,13 @@ # Number of workers for parallel processing. # This variable controls both pref_memos.py (--max-workers) # and pref_eval.py (--concurrency-limit). -WORKERS=10 +WORKERS=20 # Parameters for pref_memos.py -TOP_K=6 -ADD_TURN=0 # Options: 0, 10, or 300 -LIB="memos-api" -VERSION="1022-0" +TOP_K=10 +ADD_TURN=10 # Options: 0, 10, or 300 +LIB="memos-api" # Options: memos-api, memos-api-online, mem0, mem0-graph, memobase, supermemory, memu, zep +VERSION="default" # --- File Paths --- # You may need to adjust these paths based on your project structure. @@ -133,7 +133,8 @@ echo "" echo "Running pref_eval.py..." python scripts/PrefEval/pref_eval.py \ --input $RESPONSE_FILE \ - --concurrency-limit $WORKERS + --concurrency-limit $WORKERS \ + --lib $LIB if [ $? -ne 0 ]; then echo "Error: Evaluation script failed." @@ -142,4 +143,4 @@ fi echo "" echo "--- PrefEval Pipeline completed successfully! ---" -echo "Final results are in $RESPONSE_FILE" +echo "Final results are in $RESPONSE_FILE" \ No newline at end of file diff --git a/evaluation/scripts/utils/client.py b/evaluation/scripts/utils/client.py index 4e7cfdbca..3c34c49d0 100644 --- a/evaluation/scripts/utils/client.py +++ b/evaluation/scripts/utils/client.py @@ -245,7 +245,7 @@ def search(self, query, user_id, top_k): res = json.loads(response.text)["data"]["memory_detail_list"] for i in res: i.update({"memory": i.pop("memory_value")}) - return {"text_mem": [{"memories": res}], "pref_mem": ""} + return {"text_mem": [{"memories": res}], "pref_str": ""} except Exception as e: if attempt < max_retries - 1: time.sleep(2**attempt)