From f4690f5d7883085e950eb8701b1b233919873196 Mon Sep 17 00:00:00 2001 From: Aidan Daly Date: Thu, 25 Jun 2026 06:09:28 +0000 Subject: [PATCH 1/3] fix(templates): key per-session agent state by context.session_id across all HTTP starter base templates (#808, #809) All five HTTP starter base templates (strands, openaiagents, googleadk, langchain_langgraph, autogen) plus the regenerated assets snapshot. Only base/main.py template content changed; the strands hasMemory branch and all other framework files are untouched. Refs aws/agentcore-cli#808 Refs aws/agentcore-cli#809 --- .../assets.snapshot.test.ts.snap | 213 +++++++++++------- src/assets/python/http/autogen/base/main.py | 36 ++- src/assets/python/http/googleadk/base/main.py | 34 ++- .../http/langchain_langgraph/base/main.py | 33 ++- .../python/http/openaiagents/base/main.py | 27 ++- src/assets/python/http/strands/base/main.py | 83 ++++--- 6 files changed, 268 insertions(+), 158 deletions(-) diff --git a/src/assets/__tests__/__snapshots__/assets.snapshot.test.ts.snap b/src/assets/__tests__/__snapshots__/assets.snapshot.test.ts.snap index 6cfcc62d2..11befaa2d 100644 --- a/src/assets/__tests__/__snapshots__/assets.snapshot.test.ts.snap +++ b/src/assets/__tests__/__snapshots__/assets.snapshot.test.ts.snap @@ -3478,23 +3478,37 @@ You have access to the following mounted filesystems. Use file_read, file_write, {{/each}}{{/if}} """ +# Reuses one AssistantAgent per session_id so each session keeps its own +# in-process conversation history (best-effort; resets on cold start). Caches up +# to 128 active sessions; the oldest is evicted and its history reset. +_agents = {} + + +async def get_or_create_agent(session_id): + if session_id not in _agents: + if len(_agents) >= 128: + _agents.pop(next(iter(_agents))) + # Get MCP Tools + mcp_tools = await get_streamable_http_mcp_tools() + _agents[session_id] = AssistantAgent( + name="{{ name }}", + model_client=load_model(), + tools=tools + mcp_tools, + system_message=SYSTEM_MESSAGE, + ) + return _agents[session_id] + + @app.entrypoint async def invoke(payload, context): log.info("Invoking Agent.....") - # Get MCP Tools - mcp_tools = await get_streamable_http_mcp_tools() - - # Define an AssistantAgent with the model and tools - agent = AssistantAgent( - name="{{ name }}", - model_client=load_model(), - tools=tools + mcp_tools, - system_message=SYSTEM_MESSAGE, - ) - # Process the user prompt prompt = payload.get("prompt", "What can you help me with?") + session_id = getattr(context, "session_id", "default-session") + + # Reuse the per-session agent (preserves conversation history) + agent = await get_or_create_agent(session_id) # Run the agent result = await agent.run(task=prompt) @@ -3940,21 +3954,39 @@ agent = Agent( ) -# Session and Runner -async def setup_session_and_runner(user_id, session_id): - ensure_credentials_loaded() - session_service = InMemorySessionService() - session = await session_service.create_session( +# Module-level session service and runner (preserves history across invocations) +_session_service = InMemorySessionService() +_runner = None + + +def get_or_create_runner(): + global _runner + if _runner is None: + ensure_credentials_loaded() + _runner = Runner( + agent=agent, + app_name=APP_NAME, + session_service=_session_service, + ) + return _runner + + +async def get_or_create_session(user_id, session_id): + session = await _session_service.get_session( app_name=APP_NAME, user_id=user_id, session_id=session_id ) - runner = Runner(agent=agent, app_name=APP_NAME, session_service=session_service) - return session, runner + if session is None: + session = await _session_service.create_session( + app_name=APP_NAME, user_id=user_id, session_id=session_id + ) + return session # Agent Interaction async def call_agent_async(query, user_id, session_id): content = types.Content(role="user", parts=[types.Part(text=query)]) - session, runner = await setup_session_and_runner(user_id, session_id) + runner = get_or_create_runner() + session = await get_or_create_session(user_id, session_id) events = runner.run_async( user_id=user_id, session_id=session.id, new_message=content ) @@ -4244,6 +4276,7 @@ import os from typing import Any from langchain_core.messages import HumanMessage{{#if hasConfigBundle}}, SystemMessage{{/if}} +from langgraph.checkpoint.memory import InMemorySaver from langgraph.prebuilt import create_react_agent from langchain.tools import tool {{#if hasConfigBundle}} @@ -4294,6 +4327,9 @@ def add_numbers(a: int, b: int) -> int: # Define a collection of tools used by the model tools = [add_numbers] +# Module-level checkpointer preserves conversation history across invocations +_checkpointer = InMemorySaver() + {{#if needsOs}} _MOUNT_PATHS = [ {{#if sessionStorageMountPath}}"{{sessionStorageMountPath}}",{{/if}} @@ -4389,29 +4425,44 @@ async def invoke(payload, context): if mcp_client: mcp_tools = await mcp_client.get_tools() - # Define the agent using create_react_agent + # Define the agent using create_react_agent (checkpointer is shared across invocations) {{#if hasConfigBundle}} - graph = create_react_agent(get_or_create_model(), tools=mcp_tools + tools, prompt=DEFAULT_SYSTEM_PROMPT) + graph = create_react_agent( + get_or_create_model(), + tools=mcp_tools + tools, + prompt=DEFAULT_SYSTEM_PROMPT, + checkpointer=_checkpointer, + ) callback = ConfigBundleCallback() # Process the user prompt prompt = payload.get("prompt", "What can you help me with?") + session_id = getattr(context, "session_id", "default-session") log.info(f"Agent input: {prompt}") - # Run the agent with config bundle callback + # Run the agent with config bundle callback (checkpointer auto-loads/saves history per session) result = await graph.ainvoke( {"messages": [HumanMessage(content=prompt)]}, - config={"callbacks": [callback]}, + config={"callbacks": [callback], "configurable": {"thread_id": session_id}}, ) {{else}} - graph = create_react_agent(get_or_create_model(), tools=mcp_tools + tools, prompt=DEFAULT_SYSTEM_PROMPT) + graph = create_react_agent( + get_or_create_model(), + tools=mcp_tools + tools, + prompt=DEFAULT_SYSTEM_PROMPT, + checkpointer=_checkpointer, + ) # Process the user prompt prompt = payload.get("prompt", "What can you help me with?") + session_id = getattr(context, "session_id", "default-session") log.info(f"Agent input: {prompt}") - # Run the agent - result = await graph.ainvoke({"messages": [HumanMessage(content=prompt)]}) + # Run the agent (checkpointer auto-loads/saves history per session) + result = await graph.ainvoke( + {"messages": [HumanMessage(content=prompt)]}, + config={"configurable": {"thread_id": session_id}}, + ) {{/if}} # Return result @@ -4782,7 +4833,8 @@ import os {{#if hasGateway}} from contextlib import AsyncExitStack {{/if}} -from agents import Agent, Runner, function_tool +from functools import lru_cache +from agents import Agent, Runner, SQLiteSession, function_tool from bedrock_agentcore.runtime import BedrockAgentCoreApp from model.load import load_model {{#if hasGateway}} @@ -4886,8 +4938,16 @@ You have access to the following mounted filesystems. Use file_read, file_write, {{/each}}{{/if}} """ +# Caches up to 128 active sessions; LRU eviction silently resets history for +# the oldest session. For production use, replace with a durable session store +# (e.g. SQLiteSession with a file path). +@lru_cache(maxsize=128) +def get_session(session_id): + return SQLiteSession(session_id) + + # Define the agent execution -async def main(query): +async def main(query, session): ensure_credentials_loaded() try: {{#if hasGateway}} @@ -4906,7 +4966,7 @@ async def main(query): tools=tools, mcp_config={"include_server_in_tool_names": True}, ) - result = await Runner.run(agent, query) + result = await Runner.run(agent, query, session=session) return result else: agent = Agent( @@ -4916,7 +4976,7 @@ async def main(query): mcp_servers=[], tools=tools ) - result = await Runner.run(agent, query) + result = await Runner.run(agent, query, session=session) return result {{else}} if mcp_servers: @@ -4929,7 +4989,7 @@ async def main(query): mcp_servers=active_servers, tools=tools ) - result = await Runner.run(agent, query) + result = await Runner.run(agent, query, session=session) return result else: agent = Agent( @@ -4939,7 +4999,7 @@ async def main(query): mcp_servers=[], tools=tools ) - result = await Runner.run(agent, query) + result = await Runner.run(agent, query, session=session) return result {{/if}} except Exception as e: @@ -4953,9 +5013,11 @@ async def invoke(payload, context): # Process the user prompt prompt = payload.get("prompt", "What can you help me with?") + session_id = getattr(context, "session_id", "default-session") + session = get_session(session_id) - # Run the agent - result = await main(prompt) + # Run the agent (session automatically loads/saves conversation history) + result = await main(prompt, session) # Return result return {"result": result.final_output} @@ -5634,51 +5696,47 @@ def agent_factory(): get_or_create_agent = agent_factory() {{/unless}} {{else}} -{{#if hasConfigBundle}} -def create_agent({{#if hasSkillsFetcher}}skill_plugins=None{{/if}}): - return Agent( - model=load_model(), - system_prompt=DEFAULT_SYSTEM_PROMPT, - tools=tools, - conversation_manager=_make_conversation_manager(), - {{#if hasSkillsFetcher}} - plugins=skill_plugins or None, - {{/if}} - hooks=[ConfigBundleHook()], - ) -{{else}} {{#unless hasPayment}} -_agent = None - -def get_or_create_agent({{#if hasSkillsFetcher}}skill_plugins=None{{/if}}): - global _agent - if _agent is None: - _agent = Agent( - model=load_model(), - system_prompt=DEFAULT_SYSTEM_PROMPT, - tools=tools, - conversation_manager=_make_conversation_manager(), - {{#if hasSkillsFetcher}} - plugins=skill_plugins or None, - {{/if}} - {{#if hasExecutionLimits}} - tool_executor=SequentialToolExecutor(), - callback_handler=None, - {{/if}} - hooks=[ +# Reuses one Agent per session_id so each session keeps its own in-process +# conversation history (best-effort; resets on cold start). The cache is bounded +# so a single process serving many sessions cannot leak history between them or +# grow without limit. For durable history, attach a session manager. +def agent_factory(): + cache = {} + def get_or_create_agent(session_id{{#if hasSkillsFetcher}}, skill_plugins=None{{/if}}): + if session_id not in cache: + if len(cache) >= 128: + cache.pop(next(iter(cache))) + cache[session_id] = Agent( + model=load_model(), + system_prompt=DEFAULT_SYSTEM_PROMPT, + tools=tools, + conversation_manager=_make_conversation_manager(), + {{#if hasSkillsFetcher}} + plugins=skill_plugins or None, + {{/if}} {{#if hasExecutionLimits}} - ExecutionLimitsHook( - {{#if maxIterations}}max_iterations={{maxIterations}},{{/if}} - {{#if maxTokens}}max_tokens={{maxTokens}},{{/if}} - {{#if timeoutSeconds}}timeout_seconds={{timeoutSeconds}},{{/if}} - ), + tool_executor=SequentialToolExecutor(), + callback_handler=None, {{/if}} - ], - ) - return _agent + hooks=[ + {{#if hasExecutionLimits}} + ExecutionLimitsHook( + {{#if maxIterations}}max_iterations={{maxIterations}},{{/if}} + {{#if maxTokens}}max_tokens={{maxTokens}},{{/if}} + {{#if timeoutSeconds}}timeout_seconds={{timeoutSeconds}},{{/if}} + ), + {{/if}} + {{#if hasConfigBundle}} + ConfigBundleHook(), + {{/if}} + ], + ) + return cache[session_id] + return get_or_create_agent +get_or_create_agent = agent_factory() {{/unless}} {{/if}} -{{/if}} def _extract_prompt(payload: dict): @@ -5785,11 +5843,8 @@ async def invoke(payload, context): hooks=[ConfigBundleHook()],{{/if}} ) {{else}} -{{#if hasConfigBundle}} - agent = create_agent({{#if hasSkillsFetcher}}_skill_plugins{{/if}}) -{{else}} - agent = get_or_create_agent({{#if hasSkillsFetcher}}_skill_plugins{{/if}}) -{{/if}} + session_id = getattr(context, 'session_id', 'default-session') + agent = get_or_create_agent(session_id{{#if hasSkillsFetcher}}, _skill_plugins{{/if}}) {{/if}} {{/if}} diff --git a/src/assets/python/http/autogen/base/main.py b/src/assets/python/http/autogen/base/main.py index a01633215..98bd2b77d 100644 --- a/src/assets/python/http/autogen/base/main.py +++ b/src/assets/python/http/autogen/base/main.py @@ -91,23 +91,37 @@ def list_files(path: str) -> str: {{/each}}{{/if}} """ +# Reuses one AssistantAgent per session_id so each session keeps its own +# in-process conversation history (best-effort; resets on cold start). Caches up +# to 128 active sessions; the oldest is evicted and its history reset. +_agents = {} + + +async def get_or_create_agent(session_id): + if session_id not in _agents: + if len(_agents) >= 128: + _agents.pop(next(iter(_agents))) + # Get MCP Tools + mcp_tools = await get_streamable_http_mcp_tools() + _agents[session_id] = AssistantAgent( + name="{{ name }}", + model_client=load_model(), + tools=tools + mcp_tools, + system_message=SYSTEM_MESSAGE, + ) + return _agents[session_id] + + @app.entrypoint async def invoke(payload, context): log.info("Invoking Agent.....") - # Get MCP Tools - mcp_tools = await get_streamable_http_mcp_tools() - - # Define an AssistantAgent with the model and tools - agent = AssistantAgent( - name="{{ name }}", - model_client=load_model(), - tools=tools + mcp_tools, - system_message=SYSTEM_MESSAGE, - ) - # Process the user prompt prompt = payload.get("prompt", "What can you help me with?") + session_id = getattr(context, "session_id", "default-session") + + # Reuse the per-session agent (preserves conversation history) + agent = await get_or_create_agent(session_id) # Run the agent result = await agent.run(task=prompt) diff --git a/src/assets/python/http/googleadk/base/main.py b/src/assets/python/http/googleadk/base/main.py index fe5eb7d5d..620e2dba5 100644 --- a/src/assets/python/http/googleadk/base/main.py +++ b/src/assets/python/http/googleadk/base/main.py @@ -121,21 +121,39 @@ def ensure_credentials_loaded(): ) -# Session and Runner -async def setup_session_and_runner(user_id, session_id): - ensure_credentials_loaded() - session_service = InMemorySessionService() - session = await session_service.create_session( +# Module-level session service and runner (preserves history across invocations) +_session_service = InMemorySessionService() +_runner = None + + +def get_or_create_runner(): + global _runner + if _runner is None: + ensure_credentials_loaded() + _runner = Runner( + agent=agent, + app_name=APP_NAME, + session_service=_session_service, + ) + return _runner + + +async def get_or_create_session(user_id, session_id): + session = await _session_service.get_session( app_name=APP_NAME, user_id=user_id, session_id=session_id ) - runner = Runner(agent=agent, app_name=APP_NAME, session_service=session_service) - return session, runner + if session is None: + session = await _session_service.create_session( + app_name=APP_NAME, user_id=user_id, session_id=session_id + ) + return session # Agent Interaction async def call_agent_async(query, user_id, session_id): content = types.Content(role="user", parts=[types.Part(text=query)]) - session, runner = await setup_session_and_runner(user_id, session_id) + runner = get_or_create_runner() + session = await get_or_create_session(user_id, session_id) events = runner.run_async( user_id=user_id, session_id=session.id, new_message=content ) diff --git a/src/assets/python/http/langchain_langgraph/base/main.py b/src/assets/python/http/langchain_langgraph/base/main.py index 27a12c039..d8ceb80e4 100644 --- a/src/assets/python/http/langchain_langgraph/base/main.py +++ b/src/assets/python/http/langchain_langgraph/base/main.py @@ -4,6 +4,7 @@ from typing import Any from langchain_core.messages import HumanMessage{{#if hasConfigBundle}}, SystemMessage{{/if}} +from langgraph.checkpoint.memory import InMemorySaver from langgraph.prebuilt import create_react_agent from langchain.tools import tool {{#if hasConfigBundle}} @@ -54,6 +55,9 @@ def add_numbers(a: int, b: int) -> int: # Define a collection of tools used by the model tools = [add_numbers] +# Module-level checkpointer preserves conversation history across invocations +_checkpointer = InMemorySaver() + {{#if needsOs}} _MOUNT_PATHS = [ {{#if sessionStorageMountPath}}"{{sessionStorageMountPath}}",{{/if}} @@ -149,29 +153,44 @@ async def invoke(payload, context): if mcp_client: mcp_tools = await mcp_client.get_tools() - # Define the agent using create_react_agent + # Define the agent using create_react_agent (checkpointer is shared across invocations) {{#if hasConfigBundle}} - graph = create_react_agent(get_or_create_model(), tools=mcp_tools + tools, prompt=DEFAULT_SYSTEM_PROMPT) + graph = create_react_agent( + get_or_create_model(), + tools=mcp_tools + tools, + prompt=DEFAULT_SYSTEM_PROMPT, + checkpointer=_checkpointer, + ) callback = ConfigBundleCallback() # Process the user prompt prompt = payload.get("prompt", "What can you help me with?") + session_id = getattr(context, "session_id", "default-session") log.info(f"Agent input: {prompt}") - # Run the agent with config bundle callback + # Run the agent with config bundle callback (checkpointer auto-loads/saves history per session) result = await graph.ainvoke( {"messages": [HumanMessage(content=prompt)]}, - config={"callbacks": [callback]}, + config={"callbacks": [callback], "configurable": {"thread_id": session_id}}, ) {{else}} - graph = create_react_agent(get_or_create_model(), tools=mcp_tools + tools, prompt=DEFAULT_SYSTEM_PROMPT) + graph = create_react_agent( + get_or_create_model(), + tools=mcp_tools + tools, + prompt=DEFAULT_SYSTEM_PROMPT, + checkpointer=_checkpointer, + ) # Process the user prompt prompt = payload.get("prompt", "What can you help me with?") + session_id = getattr(context, "session_id", "default-session") log.info(f"Agent input: {prompt}") - # Run the agent - result = await graph.ainvoke({"messages": [HumanMessage(content=prompt)]}) + # Run the agent (checkpointer auto-loads/saves history per session) + result = await graph.ainvoke( + {"messages": [HumanMessage(content=prompt)]}, + config={"configurable": {"thread_id": session_id}}, + ) {{/if}} # Return result diff --git a/src/assets/python/http/openaiagents/base/main.py b/src/assets/python/http/openaiagents/base/main.py index 772f50d42..db1a43d00 100644 --- a/src/assets/python/http/openaiagents/base/main.py +++ b/src/assets/python/http/openaiagents/base/main.py @@ -4,7 +4,8 @@ {{#if hasGateway}} from contextlib import AsyncExitStack {{/if}} -from agents import Agent, Runner, function_tool +from functools import lru_cache +from agents import Agent, Runner, SQLiteSession, function_tool from bedrock_agentcore.runtime import BedrockAgentCoreApp from model.load import load_model {{#if hasGateway}} @@ -108,8 +109,16 @@ def list_files(path: str) -> str: {{/each}}{{/if}} """ +# Caches up to 128 active sessions; LRU eviction silently resets history for +# the oldest session. For production use, replace with a durable session store +# (e.g. SQLiteSession with a file path). +@lru_cache(maxsize=128) +def get_session(session_id): + return SQLiteSession(session_id) + + # Define the agent execution -async def main(query): +async def main(query, session): ensure_credentials_loaded() try: {{#if hasGateway}} @@ -128,7 +137,7 @@ async def main(query): tools=tools, mcp_config={"include_server_in_tool_names": True}, ) - result = await Runner.run(agent, query) + result = await Runner.run(agent, query, session=session) return result else: agent = Agent( @@ -138,7 +147,7 @@ async def main(query): mcp_servers=[], tools=tools ) - result = await Runner.run(agent, query) + result = await Runner.run(agent, query, session=session) return result {{else}} if mcp_servers: @@ -151,7 +160,7 @@ async def main(query): mcp_servers=active_servers, tools=tools ) - result = await Runner.run(agent, query) + result = await Runner.run(agent, query, session=session) return result else: agent = Agent( @@ -161,7 +170,7 @@ async def main(query): mcp_servers=[], tools=tools ) - result = await Runner.run(agent, query) + result = await Runner.run(agent, query, session=session) return result {{/if}} except Exception as e: @@ -175,9 +184,11 @@ async def invoke(payload, context): # Process the user prompt prompt = payload.get("prompt", "What can you help me with?") + session_id = getattr(context, "session_id", "default-session") + session = get_session(session_id) - # Run the agent - result = await main(prompt) + # Run the agent (session automatically loads/saves conversation history) + result = await main(prompt, session) # Return result return {"result": result.final_output} diff --git a/src/assets/python/http/strands/base/main.py b/src/assets/python/http/strands/base/main.py index 87a216c32..0a7f9a5d9 100644 --- a/src/assets/python/http/strands/base/main.py +++ b/src/assets/python/http/strands/base/main.py @@ -424,51 +424,47 @@ def get_or_create_agent(session_id, user_id{{#if hasSkillsFetcher}}, skill_plugi get_or_create_agent = agent_factory() {{/unless}} {{else}} -{{#if hasConfigBundle}} -def create_agent({{#if hasSkillsFetcher}}skill_plugins=None{{/if}}): - return Agent( - model=load_model(), - system_prompt=DEFAULT_SYSTEM_PROMPT, - tools=tools, - conversation_manager=_make_conversation_manager(), - {{#if hasSkillsFetcher}} - plugins=skill_plugins or None, - {{/if}} - hooks=[ConfigBundleHook()], - ) -{{else}} {{#unless hasPayment}} -_agent = None - -def get_or_create_agent({{#if hasSkillsFetcher}}skill_plugins=None{{/if}}): - global _agent - if _agent is None: - _agent = Agent( - model=load_model(), - system_prompt=DEFAULT_SYSTEM_PROMPT, - tools=tools, - conversation_manager=_make_conversation_manager(), - {{#if hasSkillsFetcher}} - plugins=skill_plugins or None, - {{/if}} - {{#if hasExecutionLimits}} - tool_executor=SequentialToolExecutor(), - callback_handler=None, - {{/if}} - hooks=[ +# Reuses one Agent per session_id so each session keeps its own in-process +# conversation history (best-effort; resets on cold start). The cache is bounded +# so a single process serving many sessions cannot leak history between them or +# grow without limit. For durable history, attach a session manager. +def agent_factory(): + cache = {} + def get_or_create_agent(session_id{{#if hasSkillsFetcher}}, skill_plugins=None{{/if}}): + if session_id not in cache: + if len(cache) >= 128: + cache.pop(next(iter(cache))) + cache[session_id] = Agent( + model=load_model(), + system_prompt=DEFAULT_SYSTEM_PROMPT, + tools=tools, + conversation_manager=_make_conversation_manager(), + {{#if hasSkillsFetcher}} + plugins=skill_plugins or None, + {{/if}} {{#if hasExecutionLimits}} - ExecutionLimitsHook( - {{#if maxIterations}}max_iterations={{maxIterations}},{{/if}} - {{#if maxTokens}}max_tokens={{maxTokens}},{{/if}} - {{#if timeoutSeconds}}timeout_seconds={{timeoutSeconds}},{{/if}} - ), + tool_executor=SequentialToolExecutor(), + callback_handler=None, {{/if}} - ], - ) - return _agent + hooks=[ + {{#if hasExecutionLimits}} + ExecutionLimitsHook( + {{#if maxIterations}}max_iterations={{maxIterations}},{{/if}} + {{#if maxTokens}}max_tokens={{maxTokens}},{{/if}} + {{#if timeoutSeconds}}timeout_seconds={{timeoutSeconds}},{{/if}} + ), + {{/if}} + {{#if hasConfigBundle}} + ConfigBundleHook(), + {{/if}} + ], + ) + return cache[session_id] + return get_or_create_agent +get_or_create_agent = agent_factory() {{/unless}} {{/if}} -{{/if}} def _extract_prompt(payload: dict): @@ -575,11 +571,8 @@ async def invoke(payload, context): hooks=[ConfigBundleHook()],{{/if}} ) {{else}} -{{#if hasConfigBundle}} - agent = create_agent({{#if hasSkillsFetcher}}_skill_plugins{{/if}}) -{{else}} - agent = get_or_create_agent({{#if hasSkillsFetcher}}_skill_plugins{{/if}}) -{{/if}} + session_id = getattr(context, 'session_id', 'default-session') + agent = get_or_create_agent(session_id{{#if hasSkillsFetcher}}, _skill_plugins{{/if}}) {{/if}} {{/if}} From 08bfb25044f839365d318ab3ccc4b3420fee8700 Mon Sep 17 00:00:00 2001 From: Aidan Daly Date: Thu, 25 Jun 2026 15:21:50 +0000 Subject: [PATCH 2/3] fix(templates): bound googleadk/langchain session stores and make all caches true LRU (#808, #809) The googleadk (InMemorySessionService) and langchain_langgraph (InMemorySaver) templates kept per-session state at module scope with no eviction, trading the cross-session leak for an unbounded memory leak in long-running processes. Bound both to 128 active sessions with LRU eviction (delete_session / delete_thread on the least-recently-used key), matching the other three templates. The autogen and strands no-memory caches were plain dicts evicting by insertion order (FIFO) despite docstrings claiming LRU. Switch them to OrderedDict with move_to_end on hit so the eviction policy is genuinely LRU and the docstrings are accurate. openaiagents already uses functools.lru_cache (true LRU) and is unchanged. Regenerated the assets snapshot. Refs aws/agentcore-cli#808 Refs aws/agentcore-cli#809 --- .../assets.snapshot.test.ts.snap | 135 ++++++++++++------ src/assets/python/http/autogen/base/main.py | 30 ++-- src/assets/python/http/googleadk/base/main.py | 21 ++- .../http/langchain_langgraph/base/main.py | 22 ++- src/assets/python/http/strands/base/main.py | 62 ++++---- 5 files changed, 182 insertions(+), 88 deletions(-) diff --git a/src/assets/__tests__/__snapshots__/assets.snapshot.test.ts.snap b/src/assets/__tests__/__snapshots__/assets.snapshot.test.ts.snap index 11befaa2d..f277f1965 100644 --- a/src/assets/__tests__/__snapshots__/assets.snapshot.test.ts.snap +++ b/src/assets/__tests__/__snapshots__/assets.snapshot.test.ts.snap @@ -3388,6 +3388,7 @@ exports[`Assets Directory Snapshots > Python framework assets > python/python/ht "{{#if needsOs}} import os {{/if}} +from collections import OrderedDict from autogen_agentchat.agents import AssistantAgent from autogen_core.tools import FunctionTool from bedrock_agentcore.runtime import BedrockAgentCoreApp @@ -3480,22 +3481,25 @@ You have access to the following mounted filesystems. Use file_read, file_write, # Reuses one AssistantAgent per session_id so each session keeps its own # in-process conversation history (best-effort; resets on cold start). Caches up -# to 128 active sessions; the oldest is evicted and its history reset. -_agents = {} +# to 128 active sessions with LRU eviction (least-recently-used is dropped and +# its history reset). +_agents = OrderedDict() async def get_or_create_agent(session_id): - if session_id not in _agents: - if len(_agents) >= 128: - _agents.pop(next(iter(_agents))) - # Get MCP Tools - mcp_tools = await get_streamable_http_mcp_tools() - _agents[session_id] = AssistantAgent( - name="{{ name }}", - model_client=load_model(), - tools=tools + mcp_tools, - system_message=SYSTEM_MESSAGE, - ) + if session_id in _agents: + _agents.move_to_end(session_id) + return _agents[session_id] + if len(_agents) >= 128: + _agents.popitem(last=False) + # Get MCP Tools + mcp_tools = await get_streamable_http_mcp_tools() + _agents[session_id] = AssistantAgent( + name="{{ name }}", + model_client=load_model(), + tools=tools + mcp_tools, + system_message=SYSTEM_MESSAGE, + ) return _agents[session_id] @@ -3834,6 +3838,7 @@ exports[`Assets Directory Snapshots > Python framework assets > python/python/ht "{{#if needsOs}} import os {{/if}} +from collections import OrderedDict from google.adk.agents import Agent from google.adk.runners import Runner from google.adk.sessions import InMemorySessionService @@ -3954,8 +3959,15 @@ agent = Agent( ) -# Module-level session service and runner (preserves history across invocations) +# Module-level session service and runner preserve history across invocations. +# InMemorySessionService retains every (app_name, user_id, session_id) triple +# forever, so we bound it to 128 active sessions with LRU eviction (the +# least-recently-used session is deleted and its history reset) to keep a +# long-running process from growing without limit. For durable history, swap in +# a persistent session service (e.g. DatabaseSessionService). +_SESSION_LIMIT = 128 _session_service = InMemorySessionService() +_session_keys = OrderedDict() _runner = None @@ -3972,6 +3984,17 @@ def get_or_create_runner(): async def get_or_create_session(user_id, session_id): + key = (user_id, session_id) + if key in _session_keys: + _session_keys.move_to_end(key) + else: + while len(_session_keys) >= _SESSION_LIMIT: + old_user_id, old_session_id = _session_keys.popitem(last=False) + await _session_service.delete_session( + app_name=APP_NAME, user_id=old_user_id, session_id=old_session_id + ) + _session_keys[key] = True + session = await _session_service.get_session( app_name=APP_NAME, user_id=user_id, session_id=session_id ) @@ -4273,6 +4296,7 @@ exports[`Assets Directory Snapshots > Python framework assets > python/python/ht "{{#if needsOs}} import os {{/if}} +from collections import OrderedDict from typing import Any from langchain_core.messages import HumanMessage{{#if hasConfigBundle}}, SystemMessage{{/if}} @@ -4327,8 +4351,25 @@ def add_numbers(a: int, b: int) -> int: # Define a collection of tools used by the model tools = [add_numbers] -# Module-level checkpointer preserves conversation history across invocations +# Module-level checkpointer preserves conversation history across invocations. +# InMemorySaver keeps every thread_id (= session_id) checkpoint in memory +# forever, so we bound it to 128 active threads with LRU eviction (the +# least-recently-used thread is deleted and its history reset) to keep a +# long-running process from growing without limit. For durable history, swap in +# a persistent checkpointer (e.g. SqliteSaver/AsyncSqliteSaver with a file path). +_CHECKPOINT_LIMIT = 128 _checkpointer = InMemorySaver() +_thread_ids = OrderedDict() + + +def touch_thread(thread_id): + if thread_id in _thread_ids: + _thread_ids.move_to_end(thread_id) + return + while len(_thread_ids) >= _CHECKPOINT_LIMIT: + evicted, _ = _thread_ids.popitem(last=False) + _checkpointer.delete_thread(evicted) + _thread_ids[thread_id] = True {{#if needsOs}} _MOUNT_PATHS = [ @@ -4438,6 +4479,7 @@ async def invoke(payload, context): # Process the user prompt prompt = payload.get("prompt", "What can you help me with?") session_id = getattr(context, "session_id", "default-session") + touch_thread(session_id) log.info(f"Agent input: {prompt}") # Run the agent with config bundle callback (checkpointer auto-loads/saves history per session) @@ -4456,6 +4498,7 @@ async def invoke(payload, context): # Process the user prompt prompt = payload.get("prompt", "What can you help me with?") session_id = getattr(context, "session_id", "default-session") + touch_thread(session_id) log.info(f"Agent input: {prompt}") # Run the agent (checkpointer auto-loads/saves history per session) @@ -5271,6 +5314,7 @@ Thumbs.db" exports[`Assets Directory Snapshots > Python framework assets > python/python/http/strands/base/main.py should match snapshot 1`] = ` "from typing import Any +from collections import OrderedDict {{#if inlineFunctionTools}} import json @@ -5699,39 +5743,42 @@ get_or_create_agent = agent_factory() {{#unless hasPayment}} # Reuses one Agent per session_id so each session keeps its own in-process # conversation history (best-effort; resets on cold start). The cache is bounded -# so a single process serving many sessions cannot leak history between them or -# grow without limit. For durable history, attach a session manager. +# to 128 sessions with LRU eviction (least-recently-used is dropped and its +# history reset) so a single process serving many sessions cannot leak history +# between them or grow without limit. For durable history, attach a session manager. def agent_factory(): - cache = {} + cache = OrderedDict() def get_or_create_agent(session_id{{#if hasSkillsFetcher}}, skill_plugins=None{{/if}}): - if session_id not in cache: - if len(cache) >= 128: - cache.pop(next(iter(cache))) - cache[session_id] = Agent( - model=load_model(), - system_prompt=DEFAULT_SYSTEM_PROMPT, - tools=tools, - conversation_manager=_make_conversation_manager(), - {{#if hasSkillsFetcher}} - plugins=skill_plugins or None, - {{/if}} + if session_id in cache: + cache.move_to_end(session_id) + return cache[session_id] + if len(cache) >= 128: + cache.popitem(last=False) + cache[session_id] = Agent( + model=load_model(), + system_prompt=DEFAULT_SYSTEM_PROMPT, + tools=tools, + conversation_manager=_make_conversation_manager(), + {{#if hasSkillsFetcher}} + plugins=skill_plugins or None, + {{/if}} + {{#if hasExecutionLimits}} + tool_executor=SequentialToolExecutor(), + callback_handler=None, + {{/if}} + hooks=[ {{#if hasExecutionLimits}} - tool_executor=SequentialToolExecutor(), - callback_handler=None, + ExecutionLimitsHook( + {{#if maxIterations}}max_iterations={{maxIterations}},{{/if}} + {{#if maxTokens}}max_tokens={{maxTokens}},{{/if}} + {{#if timeoutSeconds}}timeout_seconds={{timeoutSeconds}},{{/if}} + ), {{/if}} - hooks=[ - {{#if hasExecutionLimits}} - ExecutionLimitsHook( - {{#if maxIterations}}max_iterations={{maxIterations}},{{/if}} - {{#if maxTokens}}max_tokens={{maxTokens}},{{/if}} - {{#if timeoutSeconds}}timeout_seconds={{timeoutSeconds}},{{/if}} - ), - {{/if}} - {{#if hasConfigBundle}} - ConfigBundleHook(), - {{/if}} - ], - ) + {{#if hasConfigBundle}} + ConfigBundleHook(), + {{/if}} + ], + ) return cache[session_id] return get_or_create_agent get_or_create_agent = agent_factory() diff --git a/src/assets/python/http/autogen/base/main.py b/src/assets/python/http/autogen/base/main.py index 98bd2b77d..63d331785 100644 --- a/src/assets/python/http/autogen/base/main.py +++ b/src/assets/python/http/autogen/base/main.py @@ -1,6 +1,7 @@ {{#if needsOs}} import os {{/if}} +from collections import OrderedDict from autogen_agentchat.agents import AssistantAgent from autogen_core.tools import FunctionTool from bedrock_agentcore.runtime import BedrockAgentCoreApp @@ -93,22 +94,25 @@ def list_files(path: str) -> str: # Reuses one AssistantAgent per session_id so each session keeps its own # in-process conversation history (best-effort; resets on cold start). Caches up -# to 128 active sessions; the oldest is evicted and its history reset. -_agents = {} +# to 128 active sessions with LRU eviction (least-recently-used is dropped and +# its history reset). +_agents = OrderedDict() async def get_or_create_agent(session_id): - if session_id not in _agents: - if len(_agents) >= 128: - _agents.pop(next(iter(_agents))) - # Get MCP Tools - mcp_tools = await get_streamable_http_mcp_tools() - _agents[session_id] = AssistantAgent( - name="{{ name }}", - model_client=load_model(), - tools=tools + mcp_tools, - system_message=SYSTEM_MESSAGE, - ) + if session_id in _agents: + _agents.move_to_end(session_id) + return _agents[session_id] + if len(_agents) >= 128: + _agents.popitem(last=False) + # Get MCP Tools + mcp_tools = await get_streamable_http_mcp_tools() + _agents[session_id] = AssistantAgent( + name="{{ name }}", + model_client=load_model(), + tools=tools + mcp_tools, + system_message=SYSTEM_MESSAGE, + ) return _agents[session_id] diff --git a/src/assets/python/http/googleadk/base/main.py b/src/assets/python/http/googleadk/base/main.py index 620e2dba5..5b5e52f01 100644 --- a/src/assets/python/http/googleadk/base/main.py +++ b/src/assets/python/http/googleadk/base/main.py @@ -1,6 +1,7 @@ {{#if needsOs}} import os {{/if}} +from collections import OrderedDict from google.adk.agents import Agent from google.adk.runners import Runner from google.adk.sessions import InMemorySessionService @@ -121,8 +122,15 @@ def ensure_credentials_loaded(): ) -# Module-level session service and runner (preserves history across invocations) +# Module-level session service and runner preserve history across invocations. +# InMemorySessionService retains every (app_name, user_id, session_id) triple +# forever, so we bound it to 128 active sessions with LRU eviction (the +# least-recently-used session is deleted and its history reset) to keep a +# long-running process from growing without limit. For durable history, swap in +# a persistent session service (e.g. DatabaseSessionService). +_SESSION_LIMIT = 128 _session_service = InMemorySessionService() +_session_keys = OrderedDict() _runner = None @@ -139,6 +147,17 @@ def get_or_create_runner(): async def get_or_create_session(user_id, session_id): + key = (user_id, session_id) + if key in _session_keys: + _session_keys.move_to_end(key) + else: + while len(_session_keys) >= _SESSION_LIMIT: + old_user_id, old_session_id = _session_keys.popitem(last=False) + await _session_service.delete_session( + app_name=APP_NAME, user_id=old_user_id, session_id=old_session_id + ) + _session_keys[key] = True + session = await _session_service.get_session( app_name=APP_NAME, user_id=user_id, session_id=session_id ) diff --git a/src/assets/python/http/langchain_langgraph/base/main.py b/src/assets/python/http/langchain_langgraph/base/main.py index d8ceb80e4..387fe2870 100644 --- a/src/assets/python/http/langchain_langgraph/base/main.py +++ b/src/assets/python/http/langchain_langgraph/base/main.py @@ -1,6 +1,7 @@ {{#if needsOs}} import os {{/if}} +from collections import OrderedDict from typing import Any from langchain_core.messages import HumanMessage{{#if hasConfigBundle}}, SystemMessage{{/if}} @@ -55,8 +56,25 @@ def add_numbers(a: int, b: int) -> int: # Define a collection of tools used by the model tools = [add_numbers] -# Module-level checkpointer preserves conversation history across invocations +# Module-level checkpointer preserves conversation history across invocations. +# InMemorySaver keeps every thread_id (= session_id) checkpoint in memory +# forever, so we bound it to 128 active threads with LRU eviction (the +# least-recently-used thread is deleted and its history reset) to keep a +# long-running process from growing without limit. For durable history, swap in +# a persistent checkpointer (e.g. SqliteSaver/AsyncSqliteSaver with a file path). +_CHECKPOINT_LIMIT = 128 _checkpointer = InMemorySaver() +_thread_ids = OrderedDict() + + +def touch_thread(thread_id): + if thread_id in _thread_ids: + _thread_ids.move_to_end(thread_id) + return + while len(_thread_ids) >= _CHECKPOINT_LIMIT: + evicted, _ = _thread_ids.popitem(last=False) + _checkpointer.delete_thread(evicted) + _thread_ids[thread_id] = True {{#if needsOs}} _MOUNT_PATHS = [ @@ -166,6 +184,7 @@ async def invoke(payload, context): # Process the user prompt prompt = payload.get("prompt", "What can you help me with?") session_id = getattr(context, "session_id", "default-session") + touch_thread(session_id) log.info(f"Agent input: {prompt}") # Run the agent with config bundle callback (checkpointer auto-loads/saves history per session) @@ -184,6 +203,7 @@ async def invoke(payload, context): # Process the user prompt prompt = payload.get("prompt", "What can you help me with?") session_id = getattr(context, "session_id", "default-session") + touch_thread(session_id) log.info(f"Agent input: {prompt}") # Run the agent (checkpointer auto-loads/saves history per session) diff --git a/src/assets/python/http/strands/base/main.py b/src/assets/python/http/strands/base/main.py index 0a7f9a5d9..578f21608 100644 --- a/src/assets/python/http/strands/base/main.py +++ b/src/assets/python/http/strands/base/main.py @@ -1,4 +1,5 @@ from typing import Any +from collections import OrderedDict {{#if inlineFunctionTools}} import json @@ -427,39 +428,42 @@ def get_or_create_agent(session_id, user_id{{#if hasSkillsFetcher}}, skill_plugi {{#unless hasPayment}} # Reuses one Agent per session_id so each session keeps its own in-process # conversation history (best-effort; resets on cold start). The cache is bounded -# so a single process serving many sessions cannot leak history between them or -# grow without limit. For durable history, attach a session manager. +# to 128 sessions with LRU eviction (least-recently-used is dropped and its +# history reset) so a single process serving many sessions cannot leak history +# between them or grow without limit. For durable history, attach a session manager. def agent_factory(): - cache = {} + cache = OrderedDict() def get_or_create_agent(session_id{{#if hasSkillsFetcher}}, skill_plugins=None{{/if}}): - if session_id not in cache: - if len(cache) >= 128: - cache.pop(next(iter(cache))) - cache[session_id] = Agent( - model=load_model(), - system_prompt=DEFAULT_SYSTEM_PROMPT, - tools=tools, - conversation_manager=_make_conversation_manager(), - {{#if hasSkillsFetcher}} - plugins=skill_plugins or None, - {{/if}} + if session_id in cache: + cache.move_to_end(session_id) + return cache[session_id] + if len(cache) >= 128: + cache.popitem(last=False) + cache[session_id] = Agent( + model=load_model(), + system_prompt=DEFAULT_SYSTEM_PROMPT, + tools=tools, + conversation_manager=_make_conversation_manager(), + {{#if hasSkillsFetcher}} + plugins=skill_plugins or None, + {{/if}} + {{#if hasExecutionLimits}} + tool_executor=SequentialToolExecutor(), + callback_handler=None, + {{/if}} + hooks=[ {{#if hasExecutionLimits}} - tool_executor=SequentialToolExecutor(), - callback_handler=None, + ExecutionLimitsHook( + {{#if maxIterations}}max_iterations={{maxIterations}},{{/if}} + {{#if maxTokens}}max_tokens={{maxTokens}},{{/if}} + {{#if timeoutSeconds}}timeout_seconds={{timeoutSeconds}},{{/if}} + ), {{/if}} - hooks=[ - {{#if hasExecutionLimits}} - ExecutionLimitsHook( - {{#if maxIterations}}max_iterations={{maxIterations}},{{/if}} - {{#if maxTokens}}max_tokens={{maxTokens}},{{/if}} - {{#if timeoutSeconds}}timeout_seconds={{timeoutSeconds}},{{/if}} - ), - {{/if}} - {{#if hasConfigBundle}} - ConfigBundleHook(), - {{/if}} - ], - ) + {{#if hasConfigBundle}} + ConfigBundleHook(), + {{/if}} + ], + ) return cache[session_id] return get_or_create_agent get_or_create_agent = agent_factory() From f3dabe9e77ef5c8405b2d2264e9181d40adbce06 Mon Sep 17 00:00:00 2001 From: Aidan Daly Date: Thu, 25 Jun 2026 16:12:22 +0000 Subject: [PATCH 3/3] fix(templates): unpack composite key in googleadk session lru eviction (#808) OrderedDict.popitem() returns (key, value) where key is the (user_id, session_id) tuple and value is True. The previous unpack assigned the key tuple to old_user_id and True to old_session_id, so delete_session() was called with garbage and the real session was never deleted -- the InMemorySessionService grew unbounded past the 128-session cap, defeating the #808 bound for GoogleADK (LangChain unpacked correctly). Destructure the key tuple and discard the value so the real ids reach delete_session. Add an asset test asserting the store caps at 128 with correct eviction ids, and regenerate the asset snapshot. --- .../__snapshots__/assets.snapshot.test.ts.snap | 2 +- .../__tests__/googleadk-session-eviction.test.ts | Bin 0 -> 3601 bytes src/assets/python/http/googleadk/base/main.py | 2 +- 3 files changed, 2 insertions(+), 2 deletions(-) create mode 100644 src/assets/__tests__/googleadk-session-eviction.test.ts diff --git a/src/assets/__tests__/__snapshots__/assets.snapshot.test.ts.snap b/src/assets/__tests__/__snapshots__/assets.snapshot.test.ts.snap index f277f1965..97e574ec1 100644 --- a/src/assets/__tests__/__snapshots__/assets.snapshot.test.ts.snap +++ b/src/assets/__tests__/__snapshots__/assets.snapshot.test.ts.snap @@ -3989,7 +3989,7 @@ async def get_or_create_session(user_id, session_id): _session_keys.move_to_end(key) else: while len(_session_keys) >= _SESSION_LIMIT: - old_user_id, old_session_id = _session_keys.popitem(last=False) + (old_user_id, old_session_id), _ = _session_keys.popitem(last=False) await _session_service.delete_session( app_name=APP_NAME, user_id=old_user_id, session_id=old_session_id ) diff --git a/src/assets/__tests__/googleadk-session-eviction.test.ts b/src/assets/__tests__/googleadk-session-eviction.test.ts new file mode 100644 index 0000000000000000000000000000000000000000..54b0078b5a89b29a7511171cfede0e20fc034396 GIT binary patch literal 3601 zcmai1?QYvP6z%Uk#RXcxa+)Y<2W)9l2gug6HQKaAo&Ffo%%&yUA}Wg-N!j%R1AB-) z;htpYl9C)dNe2Yc7kNLtnk!dUG-TCjNtPe8Lvr=0H^HF7` z8LD)zv4Q(8xvI=%%n`AvHtbo2M603{$v1E2n_PyaG% z)wB)>n2mIv4%>YAp)LV(qFx(b9LMZc#c9_hvhqoEuP@7=_*w>XBt=XMxT{0aPp7e<*m>d#7bFK6kw{tPh(UCF} zAfTZrbao<2dP<2l1}FhGCIYF{f&-|!t{e#_{wJR|nW}JVhUWSldcs==-z-_D@|>JO zccK2wH+Fn)gR;F+DAdR*O0+A+F5%kEaS;cB|UJF!I zH;z4~Gh@W^<&|{?Oa#^a_4>4XOgB5!M0-pF>0X9!rzdzHP)Hx#ezLRj`vZG?`aRHp zK^LN|?d-vLB~K#|fPgNu?EkES6j#^a1bH=%!lo-$g#KFK`sp>(q{*=B<7hl^ab;HIXO8+HnsNY-Zg7$GCm5Vx$xIsCOw&rnJ*(|zucfB+u>9M+t$%@Mt<^|oX z`NiA-#{woH!R}r|2wH(pJ3EbHMV&iC7|CxBVTcT5x_5euhwaeq(ON=d9+s#_G_g=M zk-o^yIH^fSKOr2(;Vj2_RpdERe8L3D7ylb>=e z(*3a%-1sz6-;W!qs{ORX8PqdXf(b9tWr1$%f$hl=CP>s^(v6+|4ZMfMN1X_SdT45+ z#6wW7QMRGyOmnWtg>}8x3|O}zAtHZM;h;cb3fmwtD=5wg(`5HY7CWLkvdD z{qX@c#CRcGl0^r9e6_nj2y#2XuW=pW!OZjFR_RKfD4}wpicn}w3T{oIrWxYZHK&`D zw@ULPAnOA6HbPxj*}q>u2FPy3hA|c*3$22&!Y4}}L!PbSTWJ1PJbAGAd9oE|Xo1?< z!$Snfv0k%y@bNjqRnfd*SzEW&r0Ocl}Ef|4?7LqTZ=bcam06abG`0y z&TH3ItR-ls(Ze`aK_weJ`6o;R!HF-&gcA-x02N)}@MUR=>jnr7= _SESSION_LIMIT: - old_user_id, old_session_id = _session_keys.popitem(last=False) + (old_user_id, old_session_id), _ = _session_keys.popitem(last=False) await _session_service.delete_session( app_name=APP_NAME, user_id=old_user_id, session_id=old_session_id )