Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 157 additions & 55 deletions src/assets/__tests__/__snapshots__/assets.snapshot.test.ts.snap
Original file line number Diff line number Diff line change
Expand Up @@ -3388,6 +3388,7 @@ exports[`Assets Directory Snapshots > Python framework assets > python/python/ht
"{{#if needsOs}}
import os
{{/if}}
from collections import OrderedDict
from autogen_agentchat.agents import AssistantAgent
from autogen_core.tools import FunctionTool
from bedrock_agentcore.runtime import BedrockAgentCoreApp
Expand Down Expand Up @@ -3478,23 +3479,40 @@ You have access to the following mounted filesystems. Use file_read, file_write,
{{/each}}{{/if}}
"""

@app.entrypoint
async def invoke(payload, context):
log.info("Invoking Agent.....")
# Reuses one AssistantAgent per session_id so each session keeps its own
# in-process conversation history (best-effort; resets on cold start). Caches up
# to 128 active sessions with LRU eviction (least-recently-used is dropped and
# its history reset).
_agents = OrderedDict()


async def get_or_create_agent(session_id):
if session_id in _agents:
_agents.move_to_end(session_id)
return _agents[session_id]
if len(_agents) >= 128:
_agents.popitem(last=False)
# Get MCP Tools
mcp_tools = await get_streamable_http_mcp_tools()

# Define an AssistantAgent with the model and tools
agent = AssistantAgent(
_agents[session_id] = AssistantAgent(
name="{{ name }}",
model_client=load_model(),
tools=tools + mcp_tools,
system_message=SYSTEM_MESSAGE,
)
return _agents[session_id]


@app.entrypoint
async def invoke(payload, context):
log.info("Invoking Agent.....")

# Process the user prompt
prompt = payload.get("prompt", "What can you help me with?")
session_id = getattr(context, "session_id", "default-session")

# Reuse the per-session agent (preserves conversation history)
agent = await get_or_create_agent(session_id)

# Run the agent
result = await agent.run(task=prompt)
Expand Down Expand Up @@ -3820,6 +3838,7 @@ exports[`Assets Directory Snapshots > Python framework assets > python/python/ht
"{{#if needsOs}}
import os
{{/if}}
from collections import OrderedDict
from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
Expand Down Expand Up @@ -3940,21 +3959,57 @@ agent = Agent(
)


# Session and Runner
async def setup_session_and_runner(user_id, session_id):
ensure_credentials_loaded()
session_service = InMemorySessionService()
session = await session_service.create_session(
# Module-level session service and runner preserve history across invocations.
# InMemorySessionService retains every (app_name, user_id, session_id) triple
# forever, so we bound it to 128 active sessions with LRU eviction (the
# least-recently-used session is deleted and its history reset) to keep a
# long-running process from growing without limit. For durable history, swap in
# a persistent session service (e.g. DatabaseSessionService).
_SESSION_LIMIT = 128
_session_service = InMemorySessionService()
_session_keys = OrderedDict()
_runner = None


def get_or_create_runner():
global _runner
if _runner is None:
ensure_credentials_loaded()
_runner = Runner(
agent=agent,
app_name=APP_NAME,
session_service=_session_service,
)
return _runner


async def get_or_create_session(user_id, session_id):
key = (user_id, session_id)
if key in _session_keys:
_session_keys.move_to_end(key)
else:
while len(_session_keys) >= _SESSION_LIMIT:
(old_user_id, old_session_id), _ = _session_keys.popitem(last=False)
await _session_service.delete_session(
app_name=APP_NAME, user_id=old_user_id, session_id=old_session_id
)
_session_keys[key] = True

session = await _session_service.get_session(
app_name=APP_NAME, user_id=user_id, session_id=session_id
)
runner = Runner(agent=agent, app_name=APP_NAME, session_service=session_service)
return session, runner
if session is None:
session = await _session_service.create_session(
app_name=APP_NAME, user_id=user_id, session_id=session_id
)
return session


# Agent Interaction
async def call_agent_async(query, user_id, session_id):
content = types.Content(role="user", parts=[types.Part(text=query)])
session, runner = await setup_session_and_runner(user_id, session_id)
runner = get_or_create_runner()
session = await get_or_create_session(user_id, session_id)
events = runner.run_async(
user_id=user_id, session_id=session.id, new_message=content
)
Expand Down Expand Up @@ -4241,9 +4296,11 @@ exports[`Assets Directory Snapshots > Python framework assets > python/python/ht
"{{#if needsOs}}
import os
{{/if}}
from collections import OrderedDict
from typing import Any

from langchain_core.messages import HumanMessage{{#if hasConfigBundle}}, SystemMessage{{/if}}
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.prebuilt import create_react_agent
from langchain.tools import tool
{{#if hasConfigBundle}}
Expand Down Expand Up @@ -4294,6 +4351,26 @@ def add_numbers(a: int, b: int) -> int:
# Define a collection of tools used by the model
tools = [add_numbers]

# Module-level checkpointer preserves conversation history across invocations.
# InMemorySaver keeps every thread_id (= session_id) checkpoint in memory
# forever, so we bound it to 128 active threads with LRU eviction (the
# least-recently-used thread is deleted and its history reset) to keep a
# long-running process from growing without limit. For durable history, swap in
# a persistent checkpointer (e.g. SqliteSaver/AsyncSqliteSaver with a file path).
_CHECKPOINT_LIMIT = 128
_checkpointer = InMemorySaver()
_thread_ids = OrderedDict()


def touch_thread(thread_id):
if thread_id in _thread_ids:
_thread_ids.move_to_end(thread_id)
return
while len(_thread_ids) >= _CHECKPOINT_LIMIT:
evicted, _ = _thread_ids.popitem(last=False)
_checkpointer.delete_thread(evicted)
_thread_ids[thread_id] = True

{{#if needsOs}}
_MOUNT_PATHS = [
{{#if sessionStorageMountPath}}"{{sessionStorageMountPath}}",{{/if}}
Expand Down Expand Up @@ -4389,29 +4466,46 @@ async def invoke(payload, context):
if mcp_client:
mcp_tools = await mcp_client.get_tools()

# Define the agent using create_react_agent
# Define the agent using create_react_agent (checkpointer is shared across invocations)
{{#if hasConfigBundle}}
graph = create_react_agent(get_or_create_model(), tools=mcp_tools + tools, prompt=DEFAULT_SYSTEM_PROMPT)
graph = create_react_agent(
get_or_create_model(),
tools=mcp_tools + tools,
prompt=DEFAULT_SYSTEM_PROMPT,
checkpointer=_checkpointer,
)
callback = ConfigBundleCallback()

# Process the user prompt
prompt = payload.get("prompt", "What can you help me with?")
session_id = getattr(context, "session_id", "default-session")
touch_thread(session_id)
log.info(f"Agent input: {prompt}")

# Run the agent with config bundle callback
# Run the agent with config bundle callback (checkpointer auto-loads/saves history per session)
result = await graph.ainvoke(
{"messages": [HumanMessage(content=prompt)]},
config={"callbacks": [callback]},
config={"callbacks": [callback], "configurable": {"thread_id": session_id}},
)
{{else}}
graph = create_react_agent(get_or_create_model(), tools=mcp_tools + tools, prompt=DEFAULT_SYSTEM_PROMPT)
graph = create_react_agent(
get_or_create_model(),
tools=mcp_tools + tools,
prompt=DEFAULT_SYSTEM_PROMPT,
checkpointer=_checkpointer,
)

# Process the user prompt
prompt = payload.get("prompt", "What can you help me with?")
session_id = getattr(context, "session_id", "default-session")
touch_thread(session_id)
log.info(f"Agent input: {prompt}")

# Run the agent
result = await graph.ainvoke({"messages": [HumanMessage(content=prompt)]})
# Run the agent (checkpointer auto-loads/saves history per session)
result = await graph.ainvoke(
{"messages": [HumanMessage(content=prompt)]},
config={"configurable": {"thread_id": session_id}},
)
{{/if}}

# Return result
Expand Down Expand Up @@ -4782,7 +4876,8 @@ import os
{{#if hasGateway}}
from contextlib import AsyncExitStack
{{/if}}
from agents import Agent, Runner, function_tool
from functools import lru_cache
from agents import Agent, Runner, SQLiteSession, function_tool
from bedrock_agentcore.runtime import BedrockAgentCoreApp
from model.load import load_model
{{#if hasGateway}}
Expand Down Expand Up @@ -4886,8 +4981,16 @@ You have access to the following mounted filesystems. Use file_read, file_write,
{{/each}}{{/if}}
"""

# Caches up to 128 active sessions; LRU eviction silently resets history for
# the oldest session. For production use, replace with a durable session store
# (e.g. SQLiteSession with a file path).
@lru_cache(maxsize=128)
def get_session(session_id):
return SQLiteSession(session_id)


# Define the agent execution
async def main(query):
async def main(query, session):
ensure_credentials_loaded()
try:
{{#if hasGateway}}
Expand All @@ -4906,7 +5009,7 @@ async def main(query):
tools=tools,
mcp_config={"include_server_in_tool_names": True},
)
result = await Runner.run(agent, query)
result = await Runner.run(agent, query, session=session)
return result
else:
agent = Agent(
Expand All @@ -4916,7 +5019,7 @@ async def main(query):
mcp_servers=[],
tools=tools
)
result = await Runner.run(agent, query)
result = await Runner.run(agent, query, session=session)
return result
{{else}}
if mcp_servers:
Expand All @@ -4929,7 +5032,7 @@ async def main(query):
mcp_servers=active_servers,
tools=tools
)
result = await Runner.run(agent, query)
result = await Runner.run(agent, query, session=session)
return result
else:
agent = Agent(
Expand All @@ -4939,7 +5042,7 @@ async def main(query):
mcp_servers=[],
tools=tools
)
result = await Runner.run(agent, query)
result = await Runner.run(agent, query, session=session)
return result
{{/if}}
except Exception as e:
Expand All @@ -4953,9 +5056,11 @@ async def invoke(payload, context):

# Process the user prompt
prompt = payload.get("prompt", "What can you help me with?")
session_id = getattr(context, "session_id", "default-session")
session = get_session(session_id)

# Run the agent
result = await main(prompt)
# Run the agent (session automatically loads/saves conversation history)
result = await main(prompt, session)

# Return result
return {"result": result.final_output}
Expand Down Expand Up @@ -5209,6 +5314,7 @@ Thumbs.db"

exports[`Assets Directory Snapshots > Python framework assets > python/python/http/strands/base/main.py should match snapshot 1`] = `
"from typing import Any
from collections import OrderedDict
{{#if inlineFunctionTools}}
import json

Expand Down Expand Up @@ -5634,26 +5740,21 @@ def agent_factory():
get_or_create_agent = agent_factory()
{{/unless}}
{{else}}
{{#if hasConfigBundle}}
def create_agent({{#if hasSkillsFetcher}}skill_plugins=None{{/if}}):
return Agent(
model=load_model(),
system_prompt=DEFAULT_SYSTEM_PROMPT,
tools=tools,
conversation_manager=_make_conversation_manager(),
{{#if hasSkillsFetcher}}
plugins=skill_plugins or None,
{{/if}}
hooks=[ConfigBundleHook()],
)
{{else}}
{{#unless hasPayment}}
_agent = None

def get_or_create_agent({{#if hasSkillsFetcher}}skill_plugins=None{{/if}}):
global _agent
if _agent is None:
_agent = Agent(
# Reuses one Agent per session_id so each session keeps its own in-process
# conversation history (best-effort; resets on cold start). The cache is bounded
# to 128 sessions with LRU eviction (least-recently-used is dropped and its
# history reset) so a single process serving many sessions cannot leak history
# between them or grow without limit. For durable history, attach a session manager.
def agent_factory():
cache = OrderedDict()
def get_or_create_agent(session_id{{#if hasSkillsFetcher}}, skill_plugins=None{{/if}}):
if session_id in cache:
cache.move_to_end(session_id)
return cache[session_id]
if len(cache) >= 128:
cache.popitem(last=False)
cache[session_id] = Agent(
model=load_model(),
system_prompt=DEFAULT_SYSTEM_PROMPT,
tools=tools,
Expand All @@ -5673,12 +5774,16 @@ def get_or_create_agent({{#if hasSkillsFetcher}}skill_plugins=None{{/if}}):
{{#if timeoutSeconds}}timeout_seconds={{timeoutSeconds}},{{/if}}
),
{{/if}}
{{#if hasConfigBundle}}
ConfigBundleHook(),
{{/if}}
],
)
return _agent
return cache[session_id]
return get_or_create_agent
get_or_create_agent = agent_factory()
{{/unless}}
{{/if}}
{{/if}}


def _extract_prompt(payload: dict):
Expand Down Expand Up @@ -5785,11 +5890,8 @@ async def invoke(payload, context):
hooks=[ConfigBundleHook()],{{/if}}
)
{{else}}
{{#if hasConfigBundle}}
agent = create_agent({{#if hasSkillsFetcher}}_skill_plugins{{/if}})
{{else}}
agent = get_or_create_agent({{#if hasSkillsFetcher}}_skill_plugins{{/if}})
{{/if}}
session_id = getattr(context, 'session_id', 'default-session')
agent = get_or_create_agent(session_id{{#if hasSkillsFetcher}}, _skill_plugins{{/if}})
{{/if}}
{{/if}}

Expand Down
Binary file not shown.
Loading
Loading