diff --git a/src/assets/__tests__/__snapshots__/assets.snapshot.test.ts.snap b/src/assets/__tests__/__snapshots__/assets.snapshot.test.ts.snap index 6cfcc62d2..97e574ec1 100644 --- a/src/assets/__tests__/__snapshots__/assets.snapshot.test.ts.snap +++ b/src/assets/__tests__/__snapshots__/assets.snapshot.test.ts.snap @@ -3388,6 +3388,7 @@ exports[`Assets Directory Snapshots > Python framework assets > python/python/ht "{{#if needsOs}} import os {{/if}} +from collections import OrderedDict from autogen_agentchat.agents import AssistantAgent from autogen_core.tools import FunctionTool from bedrock_agentcore.runtime import BedrockAgentCoreApp @@ -3478,23 +3479,40 @@ You have access to the following mounted filesystems. Use file_read, file_write, {{/each}}{{/if}} """ -@app.entrypoint -async def invoke(payload, context): - log.info("Invoking Agent.....") +# Reuses one AssistantAgent per session_id so each session keeps its own +# in-process conversation history (best-effort; resets on cold start). Caches up +# to 128 active sessions with LRU eviction (least-recently-used is dropped and +# its history reset). +_agents = OrderedDict() + +async def get_or_create_agent(session_id): + if session_id in _agents: + _agents.move_to_end(session_id) + return _agents[session_id] + if len(_agents) >= 128: + _agents.popitem(last=False) # Get MCP Tools mcp_tools = await get_streamable_http_mcp_tools() - - # Define an AssistantAgent with the model and tools - agent = AssistantAgent( + _agents[session_id] = AssistantAgent( name="{{ name }}", model_client=load_model(), tools=tools + mcp_tools, system_message=SYSTEM_MESSAGE, ) + return _agents[session_id] + + +@app.entrypoint +async def invoke(payload, context): + log.info("Invoking Agent.....") # Process the user prompt prompt = payload.get("prompt", "What can you help me with?") + session_id = getattr(context, "session_id", "default-session") + + # Reuse the per-session agent (preserves conversation history) + agent = await get_or_create_agent(session_id) # Run the agent result = await agent.run(task=prompt) @@ -3820,6 +3838,7 @@ exports[`Assets Directory Snapshots > Python framework assets > python/python/ht "{{#if needsOs}} import os {{/if}} +from collections import OrderedDict from google.adk.agents import Agent from google.adk.runners import Runner from google.adk.sessions import InMemorySessionService @@ -3940,21 +3959,57 @@ agent = Agent( ) -# Session and Runner -async def setup_session_and_runner(user_id, session_id): - ensure_credentials_loaded() - session_service = InMemorySessionService() - session = await session_service.create_session( +# Module-level session service and runner preserve history across invocations. +# InMemorySessionService retains every (app_name, user_id, session_id) triple +# forever, so we bound it to 128 active sessions with LRU eviction (the +# least-recently-used session is deleted and its history reset) to keep a +# long-running process from growing without limit. For durable history, swap in +# a persistent session service (e.g. DatabaseSessionService). +_SESSION_LIMIT = 128 +_session_service = InMemorySessionService() +_session_keys = OrderedDict() +_runner = None + + +def get_or_create_runner(): + global _runner + if _runner is None: + ensure_credentials_loaded() + _runner = Runner( + agent=agent, + app_name=APP_NAME, + session_service=_session_service, + ) + return _runner + + +async def get_or_create_session(user_id, session_id): + key = (user_id, session_id) + if key in _session_keys: + _session_keys.move_to_end(key) + else: + while len(_session_keys) >= _SESSION_LIMIT: + (old_user_id, old_session_id), _ = _session_keys.popitem(last=False) + await _session_service.delete_session( + app_name=APP_NAME, user_id=old_user_id, session_id=old_session_id + ) + _session_keys[key] = True + + session = await _session_service.get_session( app_name=APP_NAME, user_id=user_id, session_id=session_id ) - runner = Runner(agent=agent, app_name=APP_NAME, session_service=session_service) - return session, runner + if session is None: + session = await _session_service.create_session( + app_name=APP_NAME, user_id=user_id, session_id=session_id + ) + return session # Agent Interaction async def call_agent_async(query, user_id, session_id): content = types.Content(role="user", parts=[types.Part(text=query)]) - session, runner = await setup_session_and_runner(user_id, session_id) + runner = get_or_create_runner() + session = await get_or_create_session(user_id, session_id) events = runner.run_async( user_id=user_id, session_id=session.id, new_message=content ) @@ -4241,9 +4296,11 @@ exports[`Assets Directory Snapshots > Python framework assets > python/python/ht "{{#if needsOs}} import os {{/if}} +from collections import OrderedDict from typing import Any from langchain_core.messages import HumanMessage{{#if hasConfigBundle}}, SystemMessage{{/if}} +from langgraph.checkpoint.memory import InMemorySaver from langgraph.prebuilt import create_react_agent from langchain.tools import tool {{#if hasConfigBundle}} @@ -4294,6 +4351,26 @@ def add_numbers(a: int, b: int) -> int: # Define a collection of tools used by the model tools = [add_numbers] +# Module-level checkpointer preserves conversation history across invocations. +# InMemorySaver keeps every thread_id (= session_id) checkpoint in memory +# forever, so we bound it to 128 active threads with LRU eviction (the +# least-recently-used thread is deleted and its history reset) to keep a +# long-running process from growing without limit. For durable history, swap in +# a persistent checkpointer (e.g. SqliteSaver/AsyncSqliteSaver with a file path). +_CHECKPOINT_LIMIT = 128 +_checkpointer = InMemorySaver() +_thread_ids = OrderedDict() + + +def touch_thread(thread_id): + if thread_id in _thread_ids: + _thread_ids.move_to_end(thread_id) + return + while len(_thread_ids) >= _CHECKPOINT_LIMIT: + evicted, _ = _thread_ids.popitem(last=False) + _checkpointer.delete_thread(evicted) + _thread_ids[thread_id] = True + {{#if needsOs}} _MOUNT_PATHS = [ {{#if sessionStorageMountPath}}"{{sessionStorageMountPath}}",{{/if}} @@ -4389,29 +4466,46 @@ async def invoke(payload, context): if mcp_client: mcp_tools = await mcp_client.get_tools() - # Define the agent using create_react_agent + # Define the agent using create_react_agent (checkpointer is shared across invocations) {{#if hasConfigBundle}} - graph = create_react_agent(get_or_create_model(), tools=mcp_tools + tools, prompt=DEFAULT_SYSTEM_PROMPT) + graph = create_react_agent( + get_or_create_model(), + tools=mcp_tools + tools, + prompt=DEFAULT_SYSTEM_PROMPT, + checkpointer=_checkpointer, + ) callback = ConfigBundleCallback() # Process the user prompt prompt = payload.get("prompt", "What can you help me with?") + session_id = getattr(context, "session_id", "default-session") + touch_thread(session_id) log.info(f"Agent input: {prompt}") - # Run the agent with config bundle callback + # Run the agent with config bundle callback (checkpointer auto-loads/saves history per session) result = await graph.ainvoke( {"messages": [HumanMessage(content=prompt)]}, - config={"callbacks": [callback]}, + config={"callbacks": [callback], "configurable": {"thread_id": session_id}}, ) {{else}} - graph = create_react_agent(get_or_create_model(), tools=mcp_tools + tools, prompt=DEFAULT_SYSTEM_PROMPT) + graph = create_react_agent( + get_or_create_model(), + tools=mcp_tools + tools, + prompt=DEFAULT_SYSTEM_PROMPT, + checkpointer=_checkpointer, + ) # Process the user prompt prompt = payload.get("prompt", "What can you help me with?") + session_id = getattr(context, "session_id", "default-session") + touch_thread(session_id) log.info(f"Agent input: {prompt}") - # Run the agent - result = await graph.ainvoke({"messages": [HumanMessage(content=prompt)]}) + # Run the agent (checkpointer auto-loads/saves history per session) + result = await graph.ainvoke( + {"messages": [HumanMessage(content=prompt)]}, + config={"configurable": {"thread_id": session_id}}, + ) {{/if}} # Return result @@ -4782,7 +4876,8 @@ import os {{#if hasGateway}} from contextlib import AsyncExitStack {{/if}} -from agents import Agent, Runner, function_tool +from functools import lru_cache +from agents import Agent, Runner, SQLiteSession, function_tool from bedrock_agentcore.runtime import BedrockAgentCoreApp from model.load import load_model {{#if hasGateway}} @@ -4886,8 +4981,16 @@ You have access to the following mounted filesystems. Use file_read, file_write, {{/each}}{{/if}} """ +# Caches up to 128 active sessions; LRU eviction silently resets history for +# the oldest session. For production use, replace with a durable session store +# (e.g. SQLiteSession with a file path). +@lru_cache(maxsize=128) +def get_session(session_id): + return SQLiteSession(session_id) + + # Define the agent execution -async def main(query): +async def main(query, session): ensure_credentials_loaded() try: {{#if hasGateway}} @@ -4906,7 +5009,7 @@ async def main(query): tools=tools, mcp_config={"include_server_in_tool_names": True}, ) - result = await Runner.run(agent, query) + result = await Runner.run(agent, query, session=session) return result else: agent = Agent( @@ -4916,7 +5019,7 @@ async def main(query): mcp_servers=[], tools=tools ) - result = await Runner.run(agent, query) + result = await Runner.run(agent, query, session=session) return result {{else}} if mcp_servers: @@ -4929,7 +5032,7 @@ async def main(query): mcp_servers=active_servers, tools=tools ) - result = await Runner.run(agent, query) + result = await Runner.run(agent, query, session=session) return result else: agent = Agent( @@ -4939,7 +5042,7 @@ async def main(query): mcp_servers=[], tools=tools ) - result = await Runner.run(agent, query) + result = await Runner.run(agent, query, session=session) return result {{/if}} except Exception as e: @@ -4953,9 +5056,11 @@ async def invoke(payload, context): # Process the user prompt prompt = payload.get("prompt", "What can you help me with?") + session_id = getattr(context, "session_id", "default-session") + session = get_session(session_id) - # Run the agent - result = await main(prompt) + # Run the agent (session automatically loads/saves conversation history) + result = await main(prompt, session) # Return result return {"result": result.final_output} @@ -5209,6 +5314,7 @@ Thumbs.db" exports[`Assets Directory Snapshots > Python framework assets > python/python/http/strands/base/main.py should match snapshot 1`] = ` "from typing import Any +from collections import OrderedDict {{#if inlineFunctionTools}} import json @@ -5634,26 +5740,21 @@ def agent_factory(): get_or_create_agent = agent_factory() {{/unless}} {{else}} -{{#if hasConfigBundle}} -def create_agent({{#if hasSkillsFetcher}}skill_plugins=None{{/if}}): - return Agent( - model=load_model(), - system_prompt=DEFAULT_SYSTEM_PROMPT, - tools=tools, - conversation_manager=_make_conversation_manager(), - {{#if hasSkillsFetcher}} - plugins=skill_plugins or None, - {{/if}} - hooks=[ConfigBundleHook()], - ) -{{else}} {{#unless hasPayment}} -_agent = None - -def get_or_create_agent({{#if hasSkillsFetcher}}skill_plugins=None{{/if}}): - global _agent - if _agent is None: - _agent = Agent( +# Reuses one Agent per session_id so each session keeps its own in-process +# conversation history (best-effort; resets on cold start). The cache is bounded +# to 128 sessions with LRU eviction (least-recently-used is dropped and its +# history reset) so a single process serving many sessions cannot leak history +# between them or grow without limit. For durable history, attach a session manager. +def agent_factory(): + cache = OrderedDict() + def get_or_create_agent(session_id{{#if hasSkillsFetcher}}, skill_plugins=None{{/if}}): + if session_id in cache: + cache.move_to_end(session_id) + return cache[session_id] + if len(cache) >= 128: + cache.popitem(last=False) + cache[session_id] = Agent( model=load_model(), system_prompt=DEFAULT_SYSTEM_PROMPT, tools=tools, @@ -5673,12 +5774,16 @@ def get_or_create_agent({{#if hasSkillsFetcher}}skill_plugins=None{{/if}}): {{#if timeoutSeconds}}timeout_seconds={{timeoutSeconds}},{{/if}} ), {{/if}} + {{#if hasConfigBundle}} + ConfigBundleHook(), + {{/if}} ], ) - return _agent + return cache[session_id] + return get_or_create_agent +get_or_create_agent = agent_factory() {{/unless}} {{/if}} -{{/if}} def _extract_prompt(payload: dict): @@ -5785,11 +5890,8 @@ async def invoke(payload, context): hooks=[ConfigBundleHook()],{{/if}} ) {{else}} -{{#if hasConfigBundle}} - agent = create_agent({{#if hasSkillsFetcher}}_skill_plugins{{/if}}) -{{else}} - agent = get_or_create_agent({{#if hasSkillsFetcher}}_skill_plugins{{/if}}) -{{/if}} + session_id = getattr(context, 'session_id', 'default-session') + agent = get_or_create_agent(session_id{{#if hasSkillsFetcher}}, _skill_plugins{{/if}}) {{/if}} {{/if}} diff --git a/src/assets/__tests__/googleadk-session-eviction.test.ts b/src/assets/__tests__/googleadk-session-eviction.test.ts new file mode 100644 index 000000000..54b0078b5 Binary files /dev/null and b/src/assets/__tests__/googleadk-session-eviction.test.ts differ diff --git a/src/assets/python/http/autogen/base/main.py b/src/assets/python/http/autogen/base/main.py index a01633215..63d331785 100644 --- a/src/assets/python/http/autogen/base/main.py +++ b/src/assets/python/http/autogen/base/main.py @@ -1,6 +1,7 @@ {{#if needsOs}} import os {{/if}} +from collections import OrderedDict from autogen_agentchat.agents import AssistantAgent from autogen_core.tools import FunctionTool from bedrock_agentcore.runtime import BedrockAgentCoreApp @@ -91,23 +92,40 @@ def list_files(path: str) -> str: {{/each}}{{/if}} """ -@app.entrypoint -async def invoke(payload, context): - log.info("Invoking Agent.....") +# Reuses one AssistantAgent per session_id so each session keeps its own +# in-process conversation history (best-effort; resets on cold start). Caches up +# to 128 active sessions with LRU eviction (least-recently-used is dropped and +# its history reset). +_agents = OrderedDict() + +async def get_or_create_agent(session_id): + if session_id in _agents: + _agents.move_to_end(session_id) + return _agents[session_id] + if len(_agents) >= 128: + _agents.popitem(last=False) # Get MCP Tools mcp_tools = await get_streamable_http_mcp_tools() - - # Define an AssistantAgent with the model and tools - agent = AssistantAgent( + _agents[session_id] = AssistantAgent( name="{{ name }}", model_client=load_model(), tools=tools + mcp_tools, system_message=SYSTEM_MESSAGE, ) + return _agents[session_id] + + +@app.entrypoint +async def invoke(payload, context): + log.info("Invoking Agent.....") # Process the user prompt prompt = payload.get("prompt", "What can you help me with?") + session_id = getattr(context, "session_id", "default-session") + + # Reuse the per-session agent (preserves conversation history) + agent = await get_or_create_agent(session_id) # Run the agent result = await agent.run(task=prompt) diff --git a/src/assets/python/http/googleadk/base/main.py b/src/assets/python/http/googleadk/base/main.py index fe5eb7d5d..165ab7e96 100644 --- a/src/assets/python/http/googleadk/base/main.py +++ b/src/assets/python/http/googleadk/base/main.py @@ -1,6 +1,7 @@ {{#if needsOs}} import os {{/if}} +from collections import OrderedDict from google.adk.agents import Agent from google.adk.runners import Runner from google.adk.sessions import InMemorySessionService @@ -121,21 +122,57 @@ def ensure_credentials_loaded(): ) -# Session and Runner -async def setup_session_and_runner(user_id, session_id): - ensure_credentials_loaded() - session_service = InMemorySessionService() - session = await session_service.create_session( +# Module-level session service and runner preserve history across invocations. +# InMemorySessionService retains every (app_name, user_id, session_id) triple +# forever, so we bound it to 128 active sessions with LRU eviction (the +# least-recently-used session is deleted and its history reset) to keep a +# long-running process from growing without limit. For durable history, swap in +# a persistent session service (e.g. DatabaseSessionService). +_SESSION_LIMIT = 128 +_session_service = InMemorySessionService() +_session_keys = OrderedDict() +_runner = None + + +def get_or_create_runner(): + global _runner + if _runner is None: + ensure_credentials_loaded() + _runner = Runner( + agent=agent, + app_name=APP_NAME, + session_service=_session_service, + ) + return _runner + + +async def get_or_create_session(user_id, session_id): + key = (user_id, session_id) + if key in _session_keys: + _session_keys.move_to_end(key) + else: + while len(_session_keys) >= _SESSION_LIMIT: + (old_user_id, old_session_id), _ = _session_keys.popitem(last=False) + await _session_service.delete_session( + app_name=APP_NAME, user_id=old_user_id, session_id=old_session_id + ) + _session_keys[key] = True + + session = await _session_service.get_session( app_name=APP_NAME, user_id=user_id, session_id=session_id ) - runner = Runner(agent=agent, app_name=APP_NAME, session_service=session_service) - return session, runner + if session is None: + session = await _session_service.create_session( + app_name=APP_NAME, user_id=user_id, session_id=session_id + ) + return session # Agent Interaction async def call_agent_async(query, user_id, session_id): content = types.Content(role="user", parts=[types.Part(text=query)]) - session, runner = await setup_session_and_runner(user_id, session_id) + runner = get_or_create_runner() + session = await get_or_create_session(user_id, session_id) events = runner.run_async( user_id=user_id, session_id=session.id, new_message=content ) diff --git a/src/assets/python/http/langchain_langgraph/base/main.py b/src/assets/python/http/langchain_langgraph/base/main.py index 27a12c039..387fe2870 100644 --- a/src/assets/python/http/langchain_langgraph/base/main.py +++ b/src/assets/python/http/langchain_langgraph/base/main.py @@ -1,9 +1,11 @@ {{#if needsOs}} import os {{/if}} +from collections import OrderedDict from typing import Any from langchain_core.messages import HumanMessage{{#if hasConfigBundle}}, SystemMessage{{/if}} +from langgraph.checkpoint.memory import InMemorySaver from langgraph.prebuilt import create_react_agent from langchain.tools import tool {{#if hasConfigBundle}} @@ -54,6 +56,26 @@ def add_numbers(a: int, b: int) -> int: # Define a collection of tools used by the model tools = [add_numbers] +# Module-level checkpointer preserves conversation history across invocations. +# InMemorySaver keeps every thread_id (= session_id) checkpoint in memory +# forever, so we bound it to 128 active threads with LRU eviction (the +# least-recently-used thread is deleted and its history reset) to keep a +# long-running process from growing without limit. For durable history, swap in +# a persistent checkpointer (e.g. SqliteSaver/AsyncSqliteSaver with a file path). +_CHECKPOINT_LIMIT = 128 +_checkpointer = InMemorySaver() +_thread_ids = OrderedDict() + + +def touch_thread(thread_id): + if thread_id in _thread_ids: + _thread_ids.move_to_end(thread_id) + return + while len(_thread_ids) >= _CHECKPOINT_LIMIT: + evicted, _ = _thread_ids.popitem(last=False) + _checkpointer.delete_thread(evicted) + _thread_ids[thread_id] = True + {{#if needsOs}} _MOUNT_PATHS = [ {{#if sessionStorageMountPath}}"{{sessionStorageMountPath}}",{{/if}} @@ -149,29 +171,46 @@ async def invoke(payload, context): if mcp_client: mcp_tools = await mcp_client.get_tools() - # Define the agent using create_react_agent + # Define the agent using create_react_agent (checkpointer is shared across invocations) {{#if hasConfigBundle}} - graph = create_react_agent(get_or_create_model(), tools=mcp_tools + tools, prompt=DEFAULT_SYSTEM_PROMPT) + graph = create_react_agent( + get_or_create_model(), + tools=mcp_tools + tools, + prompt=DEFAULT_SYSTEM_PROMPT, + checkpointer=_checkpointer, + ) callback = ConfigBundleCallback() # Process the user prompt prompt = payload.get("prompt", "What can you help me with?") + session_id = getattr(context, "session_id", "default-session") + touch_thread(session_id) log.info(f"Agent input: {prompt}") - # Run the agent with config bundle callback + # Run the agent with config bundle callback (checkpointer auto-loads/saves history per session) result = await graph.ainvoke( {"messages": [HumanMessage(content=prompt)]}, - config={"callbacks": [callback]}, + config={"callbacks": [callback], "configurable": {"thread_id": session_id}}, ) {{else}} - graph = create_react_agent(get_or_create_model(), tools=mcp_tools + tools, prompt=DEFAULT_SYSTEM_PROMPT) + graph = create_react_agent( + get_or_create_model(), + tools=mcp_tools + tools, + prompt=DEFAULT_SYSTEM_PROMPT, + checkpointer=_checkpointer, + ) # Process the user prompt prompt = payload.get("prompt", "What can you help me with?") + session_id = getattr(context, "session_id", "default-session") + touch_thread(session_id) log.info(f"Agent input: {prompt}") - # Run the agent - result = await graph.ainvoke({"messages": [HumanMessage(content=prompt)]}) + # Run the agent (checkpointer auto-loads/saves history per session) + result = await graph.ainvoke( + {"messages": [HumanMessage(content=prompt)]}, + config={"configurable": {"thread_id": session_id}}, + ) {{/if}} # Return result diff --git a/src/assets/python/http/openaiagents/base/main.py b/src/assets/python/http/openaiagents/base/main.py index 772f50d42..db1a43d00 100644 --- a/src/assets/python/http/openaiagents/base/main.py +++ b/src/assets/python/http/openaiagents/base/main.py @@ -4,7 +4,8 @@ {{#if hasGateway}} from contextlib import AsyncExitStack {{/if}} -from agents import Agent, Runner, function_tool +from functools import lru_cache +from agents import Agent, Runner, SQLiteSession, function_tool from bedrock_agentcore.runtime import BedrockAgentCoreApp from model.load import load_model {{#if hasGateway}} @@ -108,8 +109,16 @@ def list_files(path: str) -> str: {{/each}}{{/if}} """ +# Caches up to 128 active sessions; LRU eviction silently resets history for +# the oldest session. For production use, replace with a durable session store +# (e.g. SQLiteSession with a file path). +@lru_cache(maxsize=128) +def get_session(session_id): + return SQLiteSession(session_id) + + # Define the agent execution -async def main(query): +async def main(query, session): ensure_credentials_loaded() try: {{#if hasGateway}} @@ -128,7 +137,7 @@ async def main(query): tools=tools, mcp_config={"include_server_in_tool_names": True}, ) - result = await Runner.run(agent, query) + result = await Runner.run(agent, query, session=session) return result else: agent = Agent( @@ -138,7 +147,7 @@ async def main(query): mcp_servers=[], tools=tools ) - result = await Runner.run(agent, query) + result = await Runner.run(agent, query, session=session) return result {{else}} if mcp_servers: @@ -151,7 +160,7 @@ async def main(query): mcp_servers=active_servers, tools=tools ) - result = await Runner.run(agent, query) + result = await Runner.run(agent, query, session=session) return result else: agent = Agent( @@ -161,7 +170,7 @@ async def main(query): mcp_servers=[], tools=tools ) - result = await Runner.run(agent, query) + result = await Runner.run(agent, query, session=session) return result {{/if}} except Exception as e: @@ -175,9 +184,11 @@ async def invoke(payload, context): # Process the user prompt prompt = payload.get("prompt", "What can you help me with?") + session_id = getattr(context, "session_id", "default-session") + session = get_session(session_id) - # Run the agent - result = await main(prompt) + # Run the agent (session automatically loads/saves conversation history) + result = await main(prompt, session) # Return result return {"result": result.final_output} diff --git a/src/assets/python/http/strands/base/main.py b/src/assets/python/http/strands/base/main.py index 87a216c32..578f21608 100644 --- a/src/assets/python/http/strands/base/main.py +++ b/src/assets/python/http/strands/base/main.py @@ -1,4 +1,5 @@ from typing import Any +from collections import OrderedDict {{#if inlineFunctionTools}} import json @@ -424,26 +425,21 @@ def get_or_create_agent(session_id, user_id{{#if hasSkillsFetcher}}, skill_plugi get_or_create_agent = agent_factory() {{/unless}} {{else}} -{{#if hasConfigBundle}} -def create_agent({{#if hasSkillsFetcher}}skill_plugins=None{{/if}}): - return Agent( - model=load_model(), - system_prompt=DEFAULT_SYSTEM_PROMPT, - tools=tools, - conversation_manager=_make_conversation_manager(), - {{#if hasSkillsFetcher}} - plugins=skill_plugins or None, - {{/if}} - hooks=[ConfigBundleHook()], - ) -{{else}} {{#unless hasPayment}} -_agent = None - -def get_or_create_agent({{#if hasSkillsFetcher}}skill_plugins=None{{/if}}): - global _agent - if _agent is None: - _agent = Agent( +# Reuses one Agent per session_id so each session keeps its own in-process +# conversation history (best-effort; resets on cold start). The cache is bounded +# to 128 sessions with LRU eviction (least-recently-used is dropped and its +# history reset) so a single process serving many sessions cannot leak history +# between them or grow without limit. For durable history, attach a session manager. +def agent_factory(): + cache = OrderedDict() + def get_or_create_agent(session_id{{#if hasSkillsFetcher}}, skill_plugins=None{{/if}}): + if session_id in cache: + cache.move_to_end(session_id) + return cache[session_id] + if len(cache) >= 128: + cache.popitem(last=False) + cache[session_id] = Agent( model=load_model(), system_prompt=DEFAULT_SYSTEM_PROMPT, tools=tools, @@ -463,12 +459,16 @@ def get_or_create_agent({{#if hasSkillsFetcher}}skill_plugins=None{{/if}}): {{#if timeoutSeconds}}timeout_seconds={{timeoutSeconds}},{{/if}} ), {{/if}} + {{#if hasConfigBundle}} + ConfigBundleHook(), + {{/if}} ], ) - return _agent + return cache[session_id] + return get_or_create_agent +get_or_create_agent = agent_factory() {{/unless}} {{/if}} -{{/if}} def _extract_prompt(payload: dict): @@ -575,11 +575,8 @@ async def invoke(payload, context): hooks=[ConfigBundleHook()],{{/if}} ) {{else}} -{{#if hasConfigBundle}} - agent = create_agent({{#if hasSkillsFetcher}}_skill_plugins{{/if}}) -{{else}} - agent = get_or_create_agent({{#if hasSkillsFetcher}}_skill_plugins{{/if}}) -{{/if}} + session_id = getattr(context, 'session_id', 'default-session') + agent = get_or_create_agent(session_id{{#if hasSkillsFetcher}}, _skill_plugins{{/if}}) {{/if}} {{/if}}