From 368fcb4ff19209b14ece78a232283f99aeee9772 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 21 Jun 2026 12:17:21 +0000 Subject: [PATCH] docs: document Vercel AI SDK, ADK Python, and Mastra self-managed agent setups The agent Setup tab now generates integration samples for three more frameworks (cloudv2 PR #27322): Vercel AI SDK, ADK Python, and Mastra. Add each as a tab in the Framework samples section of the self-managed agents page and update the supported-frameworks list, following the existing per-framework sample convention (client_credentials grant, gateway-pointed model, MCP over Streamable HTTP, and the X-Redpanda-Genai-Conversation header on every LLM and tool call). Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_016d5q483ntfuzHAgPvygmZr --- .../connect/pages/self-managed-agents.adoc | 404 +++++++++++++++++- 1 file changed, 403 insertions(+), 1 deletion(-) diff --git a/modules/connect/pages/self-managed-agents.adoc b/modules/connect/pages/self-managed-agents.adoc index bbca36f..5ccf4d2 100644 --- a/modules/connect/pages/self-managed-agents.adoc +++ b/modules/connect/pages/self-managed-agents.adoc @@ -45,7 +45,7 @@ For the declarative path, see xref:connect:create-agent.adoc[Create an agent]. * The `dataplane_adp_agent_create` permission, granted by the Writer built-in role. See xref:control:permissions-reference.adoc#agent-management-permissions[Agent management permissions]. * At least one xref:gateway:configure-provider.adoc[LLM provider configured] in ADP. The agent calls the model through this provider. * If the agent calls tools: One or more xref:connect:mcp-overview.adoc[MCP servers] registered in ADP. -* An agent built in your own framework. The *Setup* tab generates ready-to-paste samples for ai-sdk-go, LangChain, CrewAI, ADK Java, and ADK Go. +* An agent built in your own framework. The *Setup* tab generates ready-to-paste samples for ai-sdk-go, LangChain, CrewAI, ADK Java, ADK Go, ADK Python, Vercel AI SDK, and Mastra. == Register the agent @@ -1046,6 +1046,408 @@ func mcpServers() []string { ---- -- +ADK Python:: ++ +-- +[source,python] +---- +import asyncio +import os +import uuid + +import requests +from google.adk.agents import LlmAgent +from google.adk.models.lite_llm import LiteLlm +from google.adk.runners import InMemoryRunner +from google.adk.tools.mcp_tool.mcp_session_manager import StreamableHTTPConnectionParams +from google.adk.tools.mcp_tool.mcp_toolset import McpToolset +from google.genai import types + +CONVERSATION_HEADER = "X-Redpanda-Genai-Conversation" +APP_NAME = "redpanda-self-managed-agent" +USER_ID = "user-123" + + +def env(k: str) -> str: + # Required env var, or a clear failure (not an opaque KeyError). Export from the Setup tab. + v = os.environ.get(k) + if not v: + raise SystemExit(f"missing env var {k} - export it from the Setup tab") + return v + + +def get_access_token() -> str: + # OAuth2 client_credentials grant against the gateway IDP (requests sets the + # form content-type automatically when data= is a dict). + resp = requests.post( + env("REDPANDA_TOKEN_URL"), + data={ + "grant_type": "client_credentials", + "client_id": env("REDPANDA_CLIENT_ID"), + "client_secret": env("REDPANDA_CLIENT_SECRET"), + }, + timeout=30, + ) + resp.raise_for_status() + return resp.json()["access_token"] + + +def model_arg() -> str: + # ADK Python drives the gateway through LiteLlm. REDPANDA_LLM_PROVIDER_TYPE + # selects the route. Anthropic uses the native Messages wire (LiteLlm appends + # /v1/messages to api_base). OpenAI and Google both go through the + # OpenAI-compatible /chat/completions route: LiteLlm's gemini/ provider drops + # the /v1beta path on a custom api_base, so Google is routed through openai/ too. + model = env("REDPANDA_LLM_MODEL") + provider = os.environ.get("REDPANDA_LLM_PROVIDER_TYPE", "openai").lower() + if provider == "anthropic": + return f"anthropic/{model}" + return f"openai/{model}" + + +def build_model(token: str, conversation_id: str) -> LiteLlm: + # api_key is a placeholder: the gateway authenticates on the bearer, not the + # native key. The bearer + conversation id ride extra_headers, which LiteLlm + # forwards verbatim to the provider request (the LLM call). + return LiteLlm( + model=model_arg(), + api_base=env("REDPANDA_LLM_PROVIDER_URL"), + api_key="redpanda-gateway", + extra_headers={ + "Authorization": f"Bearer {token}", + CONVERSATION_HEADER: conversation_id, + }, + ) + + +def build_toolsets(token: str, conversation_id: str) -> list[McpToolset]: + # One McpToolset per server over MCP Streamable HTTP; the same bearer + + # conversation id ride every tool call, matching the LLM call. + base = env("REDPANDA_MCP_BASE_URL") + headers = {"Authorization": f"Bearer {token}", CONVERSATION_HEADER: conversation_id} + names = [n.strip() for n in os.environ.get("REDPANDA_MCP_SERVERS", "").split(",") if n.strip()] + return [ + McpToolset(connection_params=StreamableHTTPConnectionParams(url=f"{base}/{name}", headers=headers)) + for name in names + ] + + +async def main() -> None: + # Fetched once for this short-lived sample; a long-running agent should + # refresh the bearer before its TTL. + token = get_access_token() + + # conversation_id is YOUR conversation id (chat thread, request id, A2A + # contextId) - never a shared constant. ADK uses it as the session id, and it + # is stamped on the LLM call and every MCP tool call. Mint one here. This demo + # runs one conversation; a multi-conversation app rebuilds build_model + + # build_toolsets per conversation (the id is baked into both). + conversation_id = uuid.uuid4().hex + + agent = LlmAgent( + name="assistant", + model=build_model(token, conversation_id), + instruction="You are a helpful agent.", + tools=build_toolsets(token, conversation_id), + ) + runner = InMemoryRunner(agent=agent, app_name=APP_NAME) + # create_session accepts session_id=, so the caller's id IS the session id. + await runner.session_service.create_session( + app_name=APP_NAME, user_id=USER_ID, session_id=conversation_id + ) + + message = types.Content(role="user", parts=[types.Part(text="What tools can you call?")]) + async for event in runner.run_async(user_id=USER_ID, session_id=conversation_id, new_message=message): + if event.is_final_response() and event.content and event.content.parts: + print(event.content.parts[0].text) # the assistant's reply + + +if __name__ == "__main__": + asyncio.run(main()) +---- +-- + +Vercel AI SDK:: ++ +-- +[source,typescript] +---- +import { createAnthropic } from '@ai-sdk/anthropic'; +import { createGoogleGenerativeAI } from '@ai-sdk/google'; +import { createOpenAICompatible } from '@ai-sdk/openai-compatible'; +import { createMCPClient } from '@ai-sdk/mcp'; +import { generateText, stepCountIs, type LanguageModel, type ToolSet } from 'ai'; +import { randomUUID } from 'node:crypto'; + +const CONVERSATION_HEADER = 'X-Redpanda-Genai-Conversation'; + +// Required env var, or a clear failure (not an opaque 401 later). Export from the Setup tab. +function env(key: string): string { + const value = process.env[key]; + if (!value) { + throw new Error(`missing env var ${key} - export it from the Setup tab`); + } + return value; +} + +// OAuth2 client_credentials grant against the gateway IDP -> the bearer for every call. +async function getAccessToken(): Promise { + const res = await fetch(env('REDPANDA_TOKEN_URL'), { + method: 'POST', + headers: { 'content-type': 'application/x-www-form-urlencoded' }, + body: new URLSearchParams({ + grant_type: 'client_credentials', + client_id: env('REDPANDA_CLIENT_ID'), + client_secret: env('REDPANDA_CLIENT_SECRET'), + }), + }); + if (!res.ok) { + throw new Error(`token request failed: ${res.status}`); + } + const { access_token } = (await res.json()) as { access_token: string }; + return access_token; +} + +// Fetched once for this short-lived sample. A long-running agent should refresh +// the bearer before its TTL (for example, mint it inside a custom fetch, as the +// Mastra sample does). +const token = await getAccessToken(); +const mcpBase = env('REDPANDA_MCP_BASE_URL'); + +// buildModel constructs the native AI SDK model for the configured provider. +// REDPANDA_LLM_PROVIDER_TYPE selects the provider: "anthropic" and "google" use +// their native wire (the gateway forwards /v1/messages and +// /v1beta/...:generateContent to the upstream), everything else uses the +// OpenAI-compatible /chat/completions route. All three point at the same +// provider-scoped REDPANDA_LLM_PROVIDER_URL. The bearer is the real auth; apiKey +// is a placeholder for the ignored native x-api-key / x-goog-api-key. +function buildModel(): LanguageModel { + const base = env('REDPANDA_LLM_PROVIDER_URL'); + const model = env('REDPANDA_LLM_MODEL'); + switch ((process.env['REDPANDA_LLM_PROVIDER_TYPE'] ?? 'openai').toLowerCase()) { + case 'anthropic': { + // baseURL + "/v1" -> the gateway forwards /v1/messages. + const provider = createAnthropic({ + baseURL: `${base}/v1`, + apiKey: 'redpanda-gateway', + headers: { Authorization: `Bearer ${token}` }, + }); + return provider(model); + } + case 'google': + case 'gemini': { + // baseURL + "/v1beta" -> /v1beta/models/:generateContent. + const provider = createGoogleGenerativeAI({ + baseURL: `${base}/v1beta`, + apiKey: 'redpanda-gateway', + headers: { Authorization: `Bearer ${token}` }, + }); + return provider(model); + } + default: { + // OpenAI-compatible /chat/completions. apiKey becomes the Authorization: + // Bearer the gateway authenticates on, so no placeholder header is needed. + const provider = createOpenAICompatible({ + name: 'redpanda', + apiKey: token, + baseURL: base, + }); + return provider(model); + } + } +} + +const model = buildModel(); + +// MCP tool servers over Streamable HTTP. Transport headers are fixed per +// connection, so build the clients per conversation: the bearer (gateway auth) +// and the SAME conversation id ride every MCP tool call, matching the LLM call. +async function connectTools(conversationId: string): Promise<{ tools: ToolSet; close: () => Promise }> { + const headers = { Authorization: `Bearer ${token}`, [CONVERSATION_HEADER]: conversationId }; + const urls = (process.env['REDPANDA_MCP_SERVERS'] ?? '') + .split(',') + .map((s) => s.trim()) + .filter(Boolean) + .map((name) => `${mcpBase}/${name}`); + const clients = await Promise.all( + urls.map((url) => createMCPClient({ transport: { type: 'http', url, headers } })) + ); + const toolSets = await Promise.all(clients.map((client) => client.tools())); + const tools: ToolSet = Object.assign({}, ...toolSets); + const close = async (): Promise => { + await Promise.allSettled(clients.map((client) => client.close())); + }; + return { tools, close }; +} + +// conversationId is YOUR conversation id, passed in by the caller (chat thread, +// request id, A2A contextId) - never a shared constant. One chat() == one +// conversation; the same id is stamped on the LLM call and every MCP tool call. +async function chat(conversationId: string, prompt: string): Promise { + const { tools, close } = await connectTools(conversationId); + try { + const { text } = await generateText({ + model, + tools, + prompt, + stopWhen: stepCountIs(10), + // Stamp the conversation id so the gateway groups this LLM call with the + // MCP tool calls into one transcript. + headers: { [CONVERSATION_HEADER]: conversationId }, + }); + return text; + } finally { + await close(); + } +} + +// In your app the conversation id is the inbound thread/request id; mint one here. +const answer = await chat(randomUUID(), 'What tools can you call?'); +console.log(answer); +---- +-- + +Mastra:: ++ +-- +[source,typescript] +---- +import { createAnthropic } from '@ai-sdk/anthropic'; +import { createGoogleGenerativeAI } from '@ai-sdk/google'; +import { createOpenAICompatible } from '@ai-sdk/openai-compatible'; +import { Agent } from '@mastra/core/agent'; +import { MCPClient } from '@mastra/mcp'; +import { AsyncLocalStorage } from 'node:async_hooks'; +import { randomUUID } from 'node:crypto'; +import type { LanguageModel } from 'ai'; + +const CONVERSATION_HEADER = 'X-Redpanda-Genai-Conversation'; + +// Required env var, or a clear failure (not an opaque 401 later). Export from the Setup tab. +function env(key: string): string { + const value = process.env[key]; + if (!value) { + throw new Error(`missing env var ${key} - export it from the Setup tab`); + } + return value; +} + +// Holds the CURRENT conversation id for the in-flight turn. gatewayFetch reads it +// so the LLM call and every MCP tool call carry the SAME id. In your app this is +// your own id (chat thread, request id, A2A contextId) - never a shared constant. +const conversation = new AsyncLocalStorage(); + +// OAuth2 client_credentials bearer, cached and refreshed before it expires. +let token: { value: string; expiresAt: number } | undefined; +async function bearer(): Promise { + if (token && Date.now() < token.expiresAt - 30_000) { + return token.value; + } + const res = await fetch(env('REDPANDA_TOKEN_URL'), { + method: 'POST', + headers: { 'content-type': 'application/x-www-form-urlencoded' }, + body: new URLSearchParams({ + grant_type: 'client_credentials', + client_id: env('REDPANDA_CLIENT_ID'), + client_secret: env('REDPANDA_CLIENT_SECRET'), + }), + }); + if (!res.ok) { + throw new Error(`token request failed: ${res.status}`); + } + const json = (await res.json()) as { access_token: string; expires_in?: number }; + token = { value: json.access_token, expiresAt: Date.now() + (json.expires_in ?? 3600) * 1000 }; + return token.value; +} + +// ONE fetch shared by the model provider and every MCP server: it sets the bearer +// (gateway auth) and stamps the conversation id (read live from AsyncLocalStorage) +// on every request. Sharing it keeps the LLM call and the MCP tool calls in lockstep. +const gatewayFetch: typeof fetch = async (input, init) => { + const headers = new Headers(init?.headers); + headers.set('Authorization', `Bearer ${await bearer()}`); + const id = conversation.getStore(); + if (id) { + headers.set(CONVERSATION_HEADER, id); + } + return fetch(input, { ...init, headers }); +}; + +const mcpBase = env('REDPANDA_MCP_BASE_URL'); + +// buildModel builds the AI SDK model Mastra runs on, pointed at the gateway. +// REDPANDA_LLM_PROVIDER_TYPE selects the provider: "anthropic" and "google" use +// their native wire with the version segment on the base URL (/v1, /v1beta), +// everything else uses the OpenAI-compatible /chat/completions route. gatewayFetch +// overwrites the Authorization header with the real bearer on every request, so +// apiKey is just a placeholder that satisfies each constructor. +function buildModel(): LanguageModel { + const base = env('REDPANDA_LLM_PROVIDER_URL'); + const model = env('REDPANDA_LLM_MODEL'); + switch ((process.env['REDPANDA_LLM_PROVIDER_TYPE'] ?? 'openai').toLowerCase()) { + case 'anthropic': { + const provider = createAnthropic({ + baseURL: `${base}/v1`, + apiKey: 'redpanda-gateway', + fetch: gatewayFetch, + }); + return provider(model); + } + case 'google': + case 'gemini': { + const provider = createGoogleGenerativeAI({ + baseURL: `${base}/v1beta`, + apiKey: 'redpanda-gateway', + fetch: gatewayFetch, + }); + return provider(model); + } + default: { + const provider = createOpenAICompatible({ + name: 'redpanda', + apiKey: 'redpanda-gateway', + baseURL: base, + fetch: gatewayFetch, + }); + return provider(model); + } + } +} + +// Each MCP server is ${mcpBase}/ over Streamable HTTP; gatewayFetch carries +// the bearer + conversation id on every tool call. Built once, reused per turn. +const serverNames = (process.env['REDPANDA_MCP_SERVERS'] ?? '') + .split(',') + .map((s) => s.trim()) + .filter(Boolean); +const mcp = new MCPClient({ + servers: Object.fromEntries( + serverNames.map((name) => [name, { url: new URL(`${mcpBase}/${name}`), fetch: gatewayFetch }]) + ), +}); + +const agent = new Agent({ + name: 'assistant', + instructions: 'You are a helpful agent.', + model: buildModel(), + tools: await mcp.listTools(), +}); + +// One conversation == one id, scoped with conversation.run so gatewayFetch stamps +// it on the model call and every MCP tool call. In your app, pass your own id here. +async function chat(conversationId: string, prompt: string): Promise { + return conversation.run(conversationId, async () => { + const result = await agent.generate(prompt, { maxSteps: 10 }); + return result.text; + }); +} + +const answer = await chat(randomUUID(), 'What tools can you call?'); +console.log(answer); +await mcp.disconnect(); +---- +-- + ====== NOTE: ADK Go ships only Gemini-shaped models, so the ADK Go sample works against a Google provider only. For an OpenAI or Anthropic provider, use one of the other frameworks.