feat: integrate Mem0 with ADK memory service#94
Conversation
- Add Mem0MemoryService implementing ADK BaseMemoryService interface - Register mem0:// URI scheme for memory service factory - Add PreloadMemoryTool for automatic memory injection on each turn - Remove delete_all_memories tool to prevent accidental data loss - Enhance health endpoint with memory service status - Initialize Mem0 client at server startup (fixes #67)
There was a problem hiding this comment.
Code Review
This pull request integrates the Mem0 memory service into the application by introducing Mem0MemoryService, registering it within the service registry, and updating the health check and agent configuration to support it. Additionally, the delete_all_memories tool and its associated tests and prompt instructions have been removed. Feedback was provided regarding the search_memory implementation, specifically concerning blocking I/O within an asynchronous method and the need for more robust parsing of the search results to prevent potential runtime errors.
| try: | ||
| result = self._client.search(query=query, user_id=mem0_user_id, limit=limit) | ||
| except Exception: | ||
| logger.exception("Failed to search memories for user %s", mem0_user_id) | ||
| return SearchMemoryResponse(memories=[]) | ||
|
|
||
| memories: list[MemoryEntry] = [] | ||
| for m in result.get("results", []): | ||
| memory_text = m.get("memory", "") | ||
| if not memory_text: | ||
| continue | ||
|
|
||
| memories.append( | ||
| MemoryEntry( | ||
| content=types.Content( | ||
| role="user", | ||
| parts=[types.Part(text=memory_text)], | ||
| ), | ||
| id=m.get("id"), | ||
| ) | ||
| ) | ||
|
|
||
| logger.debug( | ||
| "Found %d memories for query '%s' (user: %s)", | ||
| len(memories), | ||
| query[:30], | ||
| mem0_user_id, | ||
| ) | ||
|
|
||
| return SearchMemoryResponse(memories=memories) |
There was a problem hiding this comment.
The search_memory implementation has two significant issues:
- Blocking I/O in Async Method:
self._client.searchis a synchronous call that likely performs network or database operations (e.g., querying Qdrant). Calling it directly in anasyncmethod blocks the FastAPI event loop, which can severely impact the application's throughput and responsiveness. It should be offloaded to a worker thread usingasyncio.to_thread. - Fragile Result Parsing: The current implementation assumes
resultis a dictionary and has a.get()method. However, depending on the Mem0 version or backend configuration,searchmight return a list directly or evenNoneon certain failures. Accessingresult.get("results", [])outside thetry-exceptblock (line 75) will cause anAttributeErrorifresultis not a dictionary. The parsing logic should be more robust and contained within the error-handling block.
try:
import asyncio
# Offload blocking Mem0 search to a thread to avoid blocking the event loop
result = await asyncio.to_thread(
self._client.search, query=query, user_id=mem0_user_id, limit=limit
)
# Handle various return types (dict with 'results' or direct list) and None gracefully
raw_results = (result.get("results", []) if isinstance(result, dict) else result) or []
memories: list[MemoryEntry] = []
for m in raw_results:
if not isinstance(m, dict):
continue
memory_text = m.get("memory", "")
if not memory_text:
continue
memories.append(
MemoryEntry(
content=types.Content(
role="user",
parts=[types.Part(text=memory_text)],
),
id=m.get("id"),
)
)
logger.debug(
"Found %d memories for query '%s' (user: %s)",
len(memories),
query[:30],
mem0_user_id,
)
return SearchMemoryResponse(memories=memories)
except Exception:
logger.exception("Failed to search memories for user %s", mem0_user_id)
return SearchMemoryResponse(memories=[])The user_id should match what explicit memory tools use, not be prefixed with app_name. This ensures PreloadMemoryTool finds memories saved via save_memory tool.
Telegram bot was not receiving preloaded memories because AdkRuntime created Runner without memory_service. PreloadMemoryTool's call to search_memory() would fail silently, preventing automatic memory injection.
- Use asyncio.to_thread to avoid blocking event loop in search_memory - Handle various result types (dict, list, None) from Mem0 search - Fix reminder tests to pass user_id=None for no-user-id tests - Add tests for edge cases in Mem0MemoryService
899fdbc to
8dafdee
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request integrates the Mem0 memory service into the ADK runtime and server, introducing Mem0MemoryService and registering it via the mem0:// URI scheme. It also removes the delete_all_memories tool and updates existing memory tools to utilize the user_id from the tool context. Feedback highlights a potential ValueError if the memory service is created outside the server context and suggests refining the health check logic to prevent masking configuration issues when the memory service is unavailable.
| memory_service = get_service_registry().create_memory_service( | ||
| "mem0://", agents_dir=str(Path(env.agent_dir).resolve()) | ||
| ) |
There was a problem hiding this comment.
The create_memory_service call for the mem0:// scheme relies on a registration that currently happens in src/blacki/server.py. If create_adk_runtime is called from a context where server.py has not been imported (e.g., a standalone script or certain test configurations), this will raise a ValueError because the scheme won't be registered in the global service_registry. Consider moving the registration to a more central location or ensuring it happens lazily within this function if not already present.
| all_ok = all(v in ("healthy", "unavailable") for v in checks.values()) | ||
| status = "ok" if all_ok else "degraded" |
There was a problem hiding this comment.
The health check logic treats an unavailable memory service as acceptable for an ok status. While the system falls back to InMemoryMemoryService when the Mem0 client is missing, this might mask configuration issues where persistent memory was expected but failed to initialize. Consider if a missing Mem0 client should result in a degraded status instead, especially if persistence is a requirement for the production environment.
- Move mem0 service registration to adk_runtime.py for test safety - Update health check to return degraded when configured but unavailable
Use specific dates instead of relative dates like 'yesterday' to avoid timezone-dependent test failures when CI runs near IST midnight.
What
Integrate Mem0 with ADK's
BaseMemoryServiceinterface to enable automatic memory preloading and address Issue #67 (lazy initialization causing 25+ second delays).Why
save_memorycall took 25+ seconds due to lazy Mem0 initialization (4s Qdrant setup + 21s LLM memory extraction)search_memorydelete_all_memoriestool posed accidental data loss riskHow
Mem0MemoryServiceclass implementing ADK'sBaseMemoryServiceinterfacemem0://URI scheme via ADK service registry for proper integrationPreloadMemoryToolto agent tools for automatic memory injection on each LLM turndelete_all_memoriestool from exports, registry, and tests/healthendpoint to include memory service statusget_fast_api_app())Tests
Mem0MemoryService(search, error handling, noop methods)ruff check)ruff format)mypy)Related Issues
Fixes #67