Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added examples/api/__init__.py
Empty file.
144 changes: 144 additions & 0 deletions examples/api/product_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
#!/usr/bin/env python3
"""
Simulate full MemOS Product API workflow:
1. Register user
2. Add memory
3. Search memory
4. Chat (stream)
"""

import json

import requests


BASE_URL = "http://0.0.0.0:8001/product"
HEADERS = {"Content-Type": "application/json"}

index = "24"
USER_ID = f"memos_user_id_{index}"
USER_NAME = f"memos_user_alice_{index}"
MEM_CUBE_ID = f"memos_cube_id_{index}"
SESSION_ID = f"memos_session_id_{index}"
SESSION_ID2 = f"memos_session_id_{index}_s2"


def register_user():
url = f"{BASE_URL}/users/register"
data = {
"user_id": USER_ID,
"user_name": USER_NAME,
"interests": "memory,retrieval,test",
"mem_cube_id": MEM_CUBE_ID,
}
print(f"[*] Registering user {USER_ID} ...")
resp = requests.post(url, headers=HEADERS, data=json.dumps(data), timeout=30)
print(resp.status_code, resp.text)
return resp.json()


def add_memory():
url = f"{BASE_URL}/add"
data = {
"user_id": USER_ID,
"memory_content": "今天我在测试 MemOS 的记忆添加与检索流程。",
"messages": [{"role": "user", "content": "我今天在做系统测试"}],
"doc_path": None,
"mem_cube_id": MEM_CUBE_ID,
"source": "test_script",
"user_profile": False,
"session_id": SESSION_ID,
}
print("[*] Adding memory ...")
resp = requests.post(url, headers=HEADERS, data=json.dumps(data), timeout=30)
print(resp.status_code, resp.text)
return resp.json()


def search_memory(query="系统测试"):
url = f"{BASE_URL}/search"
data = {
"user_id": USER_ID,
"query": query,
"mem_cube_id": MEM_CUBE_ID,
"top_k": 5,
"session_id": SESSION_ID,
}
print("[*] Searching memory ...")
resp = requests.post(url, headers=HEADERS, data=json.dumps(data), timeout=30)
print(resp.status_code, resp.text)
return resp.json()


def chat_stream(query: str, session_id: str, history: list | None = None):
url = f"{BASE_URL}/chat"
data = {
"user_id": USER_ID,
"query": query,
"mem_cube_id": MEM_CUBE_ID,
"history": history,
"internet_search": False,
"moscube": False,
"session_id": session_id,
}

print("[*] Starting streaming chat ...")

with requests.post(url, headers=HEADERS, data=json.dumps(data), stream=True) as resp:
for raw_line in resp.iter_lines():
if not raw_line:
continue
line = raw_line.decode("utf-8", errors="ignore")

payload = line.removeprefix("data: ").strip()
if payload == "[DONE]":
print("[done]")
break

try:
msg = json.loads(payload)
msg_type = msg.get("type")
msg_data = msg.get("data") or msg.get("content")

if msg_type == "text":
print(msg_data, end="", flush=True)
elif msg_type == "reference":
print(f"\n[参考记忆] {msg_data}")
elif msg_type == "status":
pass
elif msg_type == "suggestion":
print(f"\n[建议] {msg_data}")
elif msg_type == "end":
print("\n[✅ Chat End]")
else:
print(f"\n[{msg_type}] {msg_data}")
except Exception:
try:
print(payload.encode("latin-1").decode("utf-8"), end="")
except Exception:
print(payload)


if __name__ == "__main__":
print("===== STEP 1: Register User =====")
register_user()

print("\n===== STEP 2: Add Memory =====")
add_memory()

print("\n===== STEP 3: Search Memory =====")
search_memory()

print("\n===== STEP 4: Stream Chat =====")
chat_stream("我很开心,我今天吃了好吃的拉面", SESSION_ID, history=[])
chat_stream(
"我刚和你说什么",
SESSION_ID,
history=[
{"role": "user", "content": "我很开心,我今天吃了好吃的拉面"},
{"role": "assistant", "content": "🉑"},
],
)

print("\n===== STEP 4: Stream Chat =====")
chat_stream("我刚和你说什么了呢", SESSION_ID2, history=[])
58 changes: 48 additions & 10 deletions src/memos/mem_os/product.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,34 @@ def _extract_references_from_response(self, response: str) -> tuple[str, list[di
logger.error(f"Error extracting references from response: {e}", exc_info=True)
return response, []

def _extract_struct_data_from_history(self, chat_data: list[dict]) -> dict:
"""
get struct message from chat-history
# TODO: @xcy make this more general
"""
system_content = ""
memory_content = ""
chat_history = []

for item in chat_data:
role = item.get("role")
content = item.get("content", "")
if role == "system":
parts = content.split("# Memories", 1)
system_content = parts[0].strip()
if len(parts) > 1:
memory_content = "# Memories" + parts[1].strip()
elif role in ("user", "assistant"):
chat_history.append({"role": role, "content": content})

if chat_history and chat_history[-1]["role"] == "assistant":
if len(chat_history) >= 2 and chat_history[-2]["role"] == "user":
chat_history = chat_history[:-2]
else:
chat_history = chat_history[:-1]

return {"system": system_content, "memory": memory_content, "chat_history": chat_history}

def _chunk_response_with_tiktoken(
self, response: str, chunk_size: int = 5
) -> Generator[str, None, None]:
Expand Down Expand Up @@ -640,23 +668,26 @@ async def _post_chat_processing(
clean_response, extracted_references = self._extract_references_from_response(
full_response
)
struct_message = self._extract_struct_data_from_history(current_messages)
logger.info(f"Extracted {len(extracted_references)} references from response")

# Send chat report notifications asynchronously
if self.online_bot:
logger.info("Online Bot Open!")
try:
from memos.memos_tools.notification_utils import (
send_online_bot_notification_async,
)

# Prepare notification data
chat_data = {
"query": query,
"user_id": user_id,
"cube_id": cube_id,
"system_prompt": system_prompt,
"full_response": full_response,
}
chat_data = {"query": query, "user_id": user_id, "cube_id": cube_id}
chat_data.update(
{
"memory": struct_message["memory"],
"chat_history": struct_message["chat_history"],
"full_response": full_response,
}
)

system_data = {
"references": extracted_references,
Expand Down Expand Up @@ -720,6 +751,7 @@ def _start_post_chat_processing(
"""
Asynchronous processing of logs, notifications and memory additions, handle synchronous and asynchronous environments
"""
logger.info("Start post_chat_processing...")

def run_async_in_thread():
"""Running asynchronous tasks in a new thread"""
Expand Down Expand Up @@ -1046,14 +1078,20 @@ def chat(
memories_list = new_memories_list

system_prompt = super()._build_system_prompt(memories_list, base_prompt)
history_info = []
if history:
if history is not None:
# Use the provided history (even if it's empty)
history_info = history[-20:]
else:
# Fall back to internal chat_history
if user_id not in self.chat_history_manager:
self._register_chat_history(user_id, session_id)
history_info = self.chat_history_manager[user_id].chat_history[-20:]
current_messages = [
{"role": "system", "content": system_prompt},
*history_info,
{"role": "user", "content": query},
]
logger.info("Start to get final answer...")
response = self.chat_llm.generate(current_messages)
time_end = time.time()
self._start_post_chat_processing(
Expand Down Expand Up @@ -1129,7 +1167,7 @@ def chat_with_references(
self._register_chat_history(user_id, session_id)

chat_history = self.chat_history_manager[user_id]
if history:
if history is not None:
chat_history.chat_history = history[-20:]
current_messages = [
{"role": "system", "content": system_prompt},
Expand Down
Loading
Loading