Skip to content

feat: integrate Mem0 with ADK memory service#94

Merged
QueryPlanner merged 7 commits into
mainfrom
feat/adk-memory-service-integration
May 16, 2026
Merged

feat: integrate Mem0 with ADK memory service#94
QueryPlanner merged 7 commits into
mainfrom
feat/adk-memory-service-integration

Conversation

@QueryPlanner
Copy link
Copy Markdown
Owner

What

Integrate Mem0 with ADK's BaseMemoryService interface to enable automatic memory preloading and address Issue #67 (lazy initialization causing 25+ second delays).

Why

  • Issue Initialize Mem0 client at startup and improve error handling #67: First save_memory call took 25+ seconds due to lazy Mem0 initialization (4s Qdrant setup + 21s LLM memory extraction)
  • Memory availability: Relevant memories were not automatically available to the model - it had to explicitly call search_memory
  • Safety: delete_all_memories tool posed accidental data loss risk

How

  • Add Mem0MemoryService class implementing ADK's BaseMemoryService interface
  • Register mem0:// URI scheme via ADK service registry for proper integration
  • Add PreloadMemoryTool to agent tools for automatic memory injection on each LLM turn
  • Remove delete_all_memories tool from exports, registry, and tests
  • Enhance /health endpoint to include memory service status
  • Initialize Mem0 client eagerly during server startup (via factory called during get_fast_api_app())

Tests

  • Unit tests for Mem0MemoryService (search, error handling, noop methods)
  • All 52 memory tests pass
  • All 25 registry tests pass
  • Linting passes (ruff check)
  • Formatting passes (ruff format)
  • Type checking passes (mypy)
  • Pre-commit hooks pass

Related Issues

Fixes #67

- Add Mem0MemoryService implementing ADK BaseMemoryService interface
- Register mem0:// URI scheme for memory service factory
- Add PreloadMemoryTool for automatic memory injection on each turn
- Remove delete_all_memories tool to prevent accidental data loss
- Enhance health endpoint with memory service status
- Initialize Mem0 client at server startup (fixes #67)
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request integrates the Mem0 memory service into the application by introducing Mem0MemoryService, registering it within the service registry, and updating the health check and agent configuration to support it. Additionally, the delete_all_memories tool and its associated tests and prompt instructions have been removed. Feedback was provided regarding the search_memory implementation, specifically concerning blocking I/O within an asynchronous method and the need for more robust parsing of the search results to prevent potential runtime errors.

Comment on lines +68 to +97
try:
result = self._client.search(query=query, user_id=mem0_user_id, limit=limit)
except Exception:
logger.exception("Failed to search memories for user %s", mem0_user_id)
return SearchMemoryResponse(memories=[])

memories: list[MemoryEntry] = []
for m in result.get("results", []):
memory_text = m.get("memory", "")
if not memory_text:
continue

memories.append(
MemoryEntry(
content=types.Content(
role="user",
parts=[types.Part(text=memory_text)],
),
id=m.get("id"),
)
)

logger.debug(
"Found %d memories for query '%s' (user: %s)",
len(memories),
query[:30],
mem0_user_id,
)

return SearchMemoryResponse(memories=memories)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The search_memory implementation has two significant issues:

  1. Blocking I/O in Async Method: self._client.search is a synchronous call that likely performs network or database operations (e.g., querying Qdrant). Calling it directly in an async method blocks the FastAPI event loop, which can severely impact the application's throughput and responsiveness. It should be offloaded to a worker thread using asyncio.to_thread.
  2. Fragile Result Parsing: The current implementation assumes result is a dictionary and has a .get() method. However, depending on the Mem0 version or backend configuration, search might return a list directly or even None on certain failures. Accessing result.get("results", []) outside the try-except block (line 75) will cause an AttributeError if result is not a dictionary. The parsing logic should be more robust and contained within the error-handling block.
        try:
            import asyncio
            # Offload blocking Mem0 search to a thread to avoid blocking the event loop
            result = await asyncio.to_thread(
                self._client.search, query=query, user_id=mem0_user_id, limit=limit
            )

            # Handle various return types (dict with 'results' or direct list) and None gracefully
            raw_results = (result.get("results", []) if isinstance(result, dict) else result) or []

            memories: list[MemoryEntry] = []
            for m in raw_results:
                if not isinstance(m, dict):
                    continue
                memory_text = m.get("memory", "")
                if not memory_text:
                    continue

                memories.append(
                    MemoryEntry(
                        content=types.Content(
                            role="user",
                            parts=[types.Part(text=memory_text)],
                        ),
                        id=m.get("id"),
                    )
                )

            logger.debug(
                "Found %d memories for query '%s' (user: %s)",
                len(memories),
                query[:30],
                mem0_user_id,
            )
            return SearchMemoryResponse(memories=memories)

        except Exception:
            logger.exception("Failed to search memories for user %s", mem0_user_id)
            return SearchMemoryResponse(memories=[])

The user_id should match what explicit memory tools use, not be prefixed
with app_name. This ensures PreloadMemoryTool finds memories saved via
save_memory tool.
Telegram bot was not receiving preloaded memories because AdkRuntime
created Runner without memory_service. PreloadMemoryTool's call to
search_memory() would fail silently, preventing automatic memory
injection.
- Use asyncio.to_thread to avoid blocking event loop in search_memory
- Handle various result types (dict, list, None) from Mem0 search
- Fix reminder tests to pass user_id=None for no-user-id tests
- Add tests for edge cases in Mem0MemoryService
@QueryPlanner QueryPlanner force-pushed the feat/adk-memory-service-integration branch from 899fdbc to 8dafdee Compare May 16, 2026 17:45
@QueryPlanner
Copy link
Copy Markdown
Owner Author

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request integrates the Mem0 memory service into the ADK runtime and server, introducing Mem0MemoryService and registering it via the mem0:// URI scheme. It also removes the delete_all_memories tool and updates existing memory tools to utilize the user_id from the tool context. Feedback highlights a potential ValueError if the memory service is created outside the server context and suggests refining the health check logic to prevent masking configuration issues when the memory service is unavailable.

Comment thread src/blacki/adk_runtime.py
Comment on lines +388 to +390
memory_service = get_service_registry().create_memory_service(
"mem0://", agents_dir=str(Path(env.agent_dir).resolve())
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The create_memory_service call for the mem0:// scheme relies on a registration that currently happens in src/blacki/server.py. If create_adk_runtime is called from a context where server.py has not been imported (e.g., a standalone script or certain test configurations), this will raise a ValueError because the scheme won't be registered in the global service_registry. Consider moving the registration to a more central location or ensuring it happens lazily within this function if not already present.

Comment thread src/blacki/server.py Outdated
Comment on lines +244 to +245
all_ok = all(v in ("healthy", "unavailable") for v in checks.values())
status = "ok" if all_ok else "degraded"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The health check logic treats an unavailable memory service as acceptable for an ok status. While the system falls back to InMemoryMemoryService when the Mem0 client is missing, this might mask configuration issues where persistent memory was expected but failed to initialize. Consider if a missing Mem0 client should result in a degraded status instead, especially if persistence is a requirement for the production environment.

- Move mem0 service registration to adk_runtime.py for test safety
- Update health check to return degraded when configured but unavailable
Use specific dates instead of relative dates like 'yesterday' to avoid
timezone-dependent test failures when CI runs near IST midnight.
@QueryPlanner QueryPlanner merged commit 770c1fc into main May 16, 2026
1 check passed
@QueryPlanner QueryPlanner deleted the feat/adk-memory-service-integration branch May 16, 2026 19:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Initialize Mem0 client at startup and improve error handling

1 participant