Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# MemOS: Memory Operating System for AI Agents

MemOS is an open-source **Agent Memory framework** that empowers AI agents with **long-term memory, personality consistency, and contextual recall**. It enables agents to **remember past interactions**, **learn over time**, and **build evolving identities** across sessions.

Designed for **AI companions, role-playing NPCs, and multi-agent systems**, MemOS provides a unified API for **memory representation, retrieval, and update** — making it the foundation for next-generation **memory-augmented AI agents**.
<div align="center">
<a href="https://memos.openmem.net/">
<img src="https://statics.memtensor.com.cn/memos/memos-banner.gif" alt="MemOS Banner">
Expand Down
1 change: 0 additions & 1 deletion evaluation/.env-example
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,3 @@ MEMU_API_KEY="mu_xxx"
SUPERMEMORY_API_KEY="sm_xxx"
MEMOBASE_API_KEY="xxx"
MEMOBASE_PROJECT_URL="http://***.***.***.***:8019"

Empty file added examples/api/__init__.py
Empty file.
144 changes: 144 additions & 0 deletions examples/api/product_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
#!/usr/bin/env python3
"""
Simulate full MemOS Product API workflow:
1. Register user
2. Add memory
3. Search memory
4. Chat (stream)
"""

import json

import requests


BASE_URL = "http://0.0.0.0:8001/product"
HEADERS = {"Content-Type": "application/json"}

index = "24"
USER_ID = f"memos_user_id_{index}"
USER_NAME = f"memos_user_alice_{index}"
MEM_CUBE_ID = f"memos_cube_id_{index}"
SESSION_ID = f"memos_session_id_{index}"
SESSION_ID2 = f"memos_session_id_{index}_s2"


def register_user():
url = f"{BASE_URL}/users/register"
data = {
"user_id": USER_ID,
"user_name": USER_NAME,
"interests": "memory,retrieval,test",
"mem_cube_id": MEM_CUBE_ID,
}
print(f"[*] Registering user {USER_ID} ...")
resp = requests.post(url, headers=HEADERS, data=json.dumps(data), timeout=30)
print(resp.status_code, resp.text)
return resp.json()


def add_memory():
url = f"{BASE_URL}/add"
data = {
"user_id": USER_ID,
"memory_content": "今天我在测试 MemOS 的记忆添加与检索流程。",
"messages": [{"role": "user", "content": "我今天在做系统测试"}],
"doc_path": None,
"mem_cube_id": MEM_CUBE_ID,
"source": "test_script",
"user_profile": False,
"session_id": SESSION_ID,
}
print("[*] Adding memory ...")
resp = requests.post(url, headers=HEADERS, data=json.dumps(data), timeout=30)
print(resp.status_code, resp.text)
return resp.json()


def search_memory(query="系统测试"):
url = f"{BASE_URL}/search"
data = {
"user_id": USER_ID,
"query": query,
"mem_cube_id": MEM_CUBE_ID,
"top_k": 5,
"session_id": SESSION_ID,
}
print("[*] Searching memory ...")
resp = requests.post(url, headers=HEADERS, data=json.dumps(data), timeout=30)
print(resp.status_code, resp.text)
return resp.json()


def chat_stream(query: str, session_id: str, history: list | None = None):
url = f"{BASE_URL}/chat"
data = {
"user_id": USER_ID,
"query": query,
"mem_cube_id": MEM_CUBE_ID,
"history": history,
"internet_search": False,
"moscube": False,
"session_id": session_id,
}

print("[*] Starting streaming chat ...")

with requests.post(url, headers=HEADERS, data=json.dumps(data), stream=True) as resp:
for raw_line in resp.iter_lines():
if not raw_line:
continue
line = raw_line.decode("utf-8", errors="ignore")

payload = line.removeprefix("data: ").strip()
if payload == "[DONE]":
print("[done]")
break

try:
msg = json.loads(payload)
msg_type = msg.get("type")
msg_data = msg.get("data") or msg.get("content")

if msg_type == "text":
print(msg_data, end="", flush=True)
elif msg_type == "reference":
print(f"\n[参考记忆] {msg_data}")
elif msg_type == "status":
pass
elif msg_type == "suggestion":
print(f"\n[建议] {msg_data}")
elif msg_type == "end":
print("\n[✅ Chat End]")
else:
print(f"\n[{msg_type}] {msg_data}")
except Exception:
try:
print(payload.encode("latin-1").decode("utf-8"), end="")
except Exception:
print(payload)


if __name__ == "__main__":
print("===== STEP 1: Register User =====")
register_user()

print("\n===== STEP 2: Add Memory =====")
add_memory()

print("\n===== STEP 3: Search Memory =====")
search_memory()

print("\n===== STEP 4: Stream Chat =====")
chat_stream("我很开心,我今天吃了好吃的拉面", SESSION_ID, history=[])
chat_stream(
"我刚和你说什么",
SESSION_ID,
history=[
{"role": "user", "content": "我很开心,我今天吃了好吃的拉面"},
{"role": "assistant", "content": "🉑"},
],
)

print("\n===== STEP 4: Stream Chat =====")
chat_stream("我刚和你说什么了呢", SESSION_ID2, history=[])
62 changes: 62 additions & 0 deletions examples/mem_scheduler/api_w_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from memos.api.routers.server_router import mem_scheduler
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem


# Debug: Print scheduler configuration
print("=== Scheduler Configuration Debug ===")
print(f"Scheduler type: {type(mem_scheduler).__name__}")
print(f"Config: {mem_scheduler.config}")
print(f"use_redis_queue: {mem_scheduler.use_redis_queue}")
print(f"Queue type: {type(mem_scheduler.memos_message_queue).__name__}")
print(f"Queue maxsize: {getattr(mem_scheduler.memos_message_queue, 'maxsize', 'N/A')}")

# Check if Redis queue is connected
if hasattr(mem_scheduler.memos_message_queue, "_is_connected"):
print(f"Redis connected: {mem_scheduler.memos_message_queue._is_connected}")
if hasattr(mem_scheduler.memos_message_queue, "_redis_conn"):
print(f"Redis connection: {mem_scheduler.memos_message_queue._redis_conn}")
print("=====================================\n")

queue = mem_scheduler.memos_message_queue
queue.clear()


# 1. Define a handler function
def my_test_handler(messages: list[ScheduleMessageItem]):
print(f"My test handler received {len(messages)} messages:")
for msg in messages:
print(f" my_test_handler - {msg.item_id}: {msg.content}")
print(
f"{queue._redis_conn.xinfo_groups(queue.stream_name)} qsize: {queue.qsize()} messages:{messages}"
)


# 2. Register the handler
TEST_HANDLER_LABEL = "test_handler"
mem_scheduler.register_handlers({TEST_HANDLER_LABEL: my_test_handler})

# 3. Create messages
messages_to_send = [
ScheduleMessageItem(
item_id=f"test_item_{i}",
user_id="test_user",
mem_cube_id="test_mem_cube",
label=TEST_HANDLER_LABEL,
content=f"This is test message {i}",
)
for i in range(5)
]

# 5. Submit messages
for mes in messages_to_send:
print(f"Submitting message {mes.item_id} to the scheduler...")
mem_scheduler.submit_messages([mes])

# 6. Wait for messages to be processed (limited to 100 checks)
print("Waiting for messages to be consumed (max 100 checks)...")
mem_scheduler.mem_scheduler_wait()


# 7. Stop the scheduler
print("Stopping the scheduler...")
mem_scheduler.stop()
85 changes: 0 additions & 85 deletions examples/mem_scheduler/memos_w_optimized_scheduler.py

This file was deleted.

87 changes: 0 additions & 87 deletions examples/mem_scheduler/memos_w_optimized_scheduler_for_test.py

This file was deleted.

Loading
Loading