Skip to content

Commit 00a12fb

Browse files
committed
handle streaming httpclient error and closure in same asyncio task and context
1 parent 1a1046e commit 00a12fb

File tree

5 files changed

+25
-13
lines changed

5 files changed

+25
-13
lines changed

eval_protocol/mcp/client/connection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -544,10 +544,10 @@ async def close_session(self, session: MCPSession) -> None:
544544
await session._exit_stack.aclose()
545545
except asyncio.CancelledError:
546546
# Handle cancellation gracefully (especially important for Python 3.12)
547-
logger.debug(f"Session {session.session_id} close was cancelled")
547+
logger.error(f"Session {session.session_id} close was cancelled")
548548
except Exception as e:
549549
# Hitting this error, probably because of use of threads: "Attempted to exit cancel scope in a different task than it was entered in"
550-
logger.debug(f"Error closing session {session.session_id}: {e}")
550+
logger.error(f"Error closing session {session.session_id}: {e}")
551551
finally:
552552
session._exit_stack = None
553553
session._mcp_session = None

eval_protocol/mcp/execution/base_policy.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ async def _generate_live_tool_calls(
220220
return mcp_tool_calls, usage_stats
221221
else:
222222
# No tool calls in response - this is normal when episode ends or LLM provides only text
223-
logger.info(f"No tool calls in response for env {env_index}, message content: {message.get('content')}")
223+
logger.debug(f"No tool calls in response for env {env_index}, message content: {message.get('content')}")
224224
return [
225225
MCPToolCall(
226226
tool_name="_no_tool_call",

eval_protocol/mcp/execution/manager.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,12 @@ async def execute_rollouts(
9898

9999
async def _execute_with_semaphore(idx):
100100
async with semaphore:
101-
return await self._execute_rollout(
101+
result = await self._execute_rollout(
102102
envs, policy, idx, steps, openai_logger, recording_mode, playback_mode, start_time
103103
)
104104

105+
return result
106+
105107
tasks = [_execute_with_semaphore(i) for i in range(envs.n)]
106108
# exceptions will be try catched inside single _execute_rollout
107109
trajectories = await asyncio.gather(*tasks)
@@ -114,7 +116,7 @@ async def _execute_with_semaphore(idx):
114116
shared_tool_schema = envs.tool_schemas
115117

116118
# Clean up
117-
await envs.close()
119+
# await envs.close()
118120

119121
# Enhanced reporting with control plane info
120122
successful = sum(1 for traj in trajectories if traj.total_reward > 0)
@@ -227,6 +229,7 @@ async def _execute_rollout(
227229
"total_tokens": 0,
228230
},
229231
)
232+
failure_reason = None
230233
try:
231234
current_observation, tool_schema = await envs.reset(session)
232235
system_prompt = dataset_row.system_prompt
@@ -467,11 +470,25 @@ async def _execute_rollout(
467470
logger.info(
468471
f"✅ Rollout {rollout_idx} completed: {trajectory.steps} steps, reward: {trajectory.total_reward:.2f}, termination: {trajectory.termination_reason}, in thread {threading.current_thread().name}"
469472
)
473+
474+
except asyncio.CancelledError:
475+
logger.error(f"🚨 AsyncIO Cancel Error in roll out {rollout_idx}", exc_info=True)
476+
failure_reason = "asyncio context cancelled"
470477
except Exception as e:
471478
logger.error(f"🚨 Error in rollout {rollout_idx}: {e}", exc_info=True)
479+
failure_reason = str(e)
480+
finally:
472481
trajectory.terminated = True
473482
trajectory.termination_reason = TerminationReason.ERROR
474-
trajectory.control_plane_summary.update({"error_message": str(e)})
483+
trajectory.control_plane_summary.update({"error_message": f"{failure_reason}"})
484+
try:
485+
await envs.connection_manager.reset_session(session)
486+
except:
487+
logger.error(f"Error resetting session {session.session_id}")
488+
try:
489+
await envs.connection_manager.close_session(session)
490+
except:
491+
logger.error(f"Error closing session {session.session_id}")
475492
return trajectory
476493

477494
async def _get_control_plane_status(self, session) -> Optional[Dict[str, Any]]:

eval_protocol/mcp/session/manager.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ async def reset(self, session: MCPSession) -> Tuple[Any, List[Dict]]:
5858
5959
This is thread-safe and can be called from worker threads.
6060
"""
61+
await self.connection_manager.initialize_session(session)
6162
# Get available tools from MCP server
6263
tool_schemas = await self.connection_manager.discover_tools(session)
6364

eval_protocol/mcp_env.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ async def reset_mcp_sessions(envs: GeneralMCPVectorEnv):
8686
Reset mcp server sessions
8787
"""
8888
tasks = [envs.connection_manager.reset_session(session) for session in envs.sessions]
89-
await asyncio.gather(*tasks)
89+
await asyncio.gather(*tasks, return_exceptions=True)
9090

9191

9292
async def make(
@@ -236,12 +236,6 @@ async def make(
236236
sessions.append(session)
237237

238238
mcp_envs = GeneralMCPVectorEnv(sessions, dataset_rows, user_prompt_formatter)
239-
tasks = [mcp_envs.connection_manager.initialize_session(session) for session in sessions]
240-
await asyncio.gather(*tasks)
241-
242-
if reset_sessions:
243-
await reset_mcp_sessions(mcp_envs)
244-
245239
return mcp_envs
246240

247241

0 commit comments

Comments
 (0)