[codex] Accept ACP user messages during async turns#3376
Conversation
Co-authored-by: openhands <openhands@all-hands.dev>
Co-authored-by: openhands <openhands@all-hands.dev>
Python API breakage checks — ✅ PASSEDResult: ✅ PASSED |
REST API breakage checks (OpenAPI) — ✅ PASSEDResult: ✅ PASSED |
Coverage Report •
|
||||||||||||||||||||||||||||||||||||||||
all-hands-bot
left a comment
There was a problem hiding this comment.
Clean approach to the live-streaming problem: reusing the existing StreamingDeltaEvent / _pub_sub path for ACP deltas is exactly right — transient chunks don't belong in state.events, and the new helper _publish_streaming_delta_from_thread correctly encapsulates the cross-thread scheduling so neither the LLM token path nor the ACP path has to repeat the run_coroutine_threadsafe / suppress boilerplate. Test coverage is solid (wiring, unwiring, and actual delta delivery are all exercised). Two items worth a look before merge are called out below.
This review was generated by an AI agent (OpenHands) on behalf of the repository owner via OpenHands Automation.
dc3d0a3 to
ca04e8c
Compare
Co-authored-by: openhands <openhands@all-hands.dev>
|
Merged |
all-hands-bot
left a comment
There was a problem hiding this comment.
🟡 Acceptable, but I found two ACP concurrency races that should be addressed before merge: one can replay a just-arrived user prompt, and one can let cancelled prompt updates leak into the next turn. Risk: 🟡 medium, since this changes conversation loop/cancellation behavior.
This review was generated by an AI agent (OpenHands) on behalf of the user.
Was this automated review useful? React with 👍 or 👎 to this review to help us measure review quality.
Workflow run: https://github.com/OpenHands/software-agent-sdk/actions/runs/26368622109
Co-authored-by: openhands <openhands@all-hands.dev>
|
Addressed the two ACP async concurrency review items in 6c89026 and verified locally with ruff plus the full EventService test file (83 passed). Ready for another automated review.\n\n_This comment was generated by an AI agent (OpenHands) on behalf of the user._ |
Co-authored-by: openhands <openhands@all-hands.dev>
Co-authored-by: openhands <openhands@all-hands.dev>
all-hands-bot
left a comment
There was a problem hiding this comment.
🟡 Acceptable direction, but I found a few ACP concurrency ordering gaps that should be addressed before merge. Risk: 🟡 medium because this changes async conversation loop/cancellation behavior.
This review was generated by an AI agent (OpenHands) on behalf of the user.
Was this automated review useful? React with 👍 or 👎 to this review to help us measure review quality.
Workflow run: https://github.com/OpenHands/software-agent-sdk/actions/runs/26369467316
Co-authored-by: openhands <openhands@all-hands.dev>
|
Addressed the latest ACP concurrency review in c9bbb68: atomic async finalization, finish-gap queued-message reconciliation, and cancellation drain-before-failure ordering. Verified locally with ruff, pyright, and targeted ACP async tests.\n\n_This comment was generated by an AI agent (OpenHands) on behalf of the user._ |
|
Added a follow-up fix in c3749ff after investigating conversation Root cause: after two rapid user interruptions, the ACP bridge timed out waiting for the cancelled prompt to drain. Fix: preserve the persisted ACP session id/cwd across restart-after-drain-timeout so the restarted subprocess attempts Verification:
|
all-hands-bot
left a comment
There was a problem hiding this comment.
The iterative hardening across this PR (FIFO message tracking, shielded drain, deferred cancel-path restart, suffix-marker clearing on fallback) is solid work. Earlier rounds of review + fixes addressed the major concurrency races. Three remaining points below — two in production code, one in tests.
This review was generated by an AI agent (OpenHands) on behalf of the user via OpenHands Automation.
all-hands-bot
left a comment
There was a problem hiding this comment.
✅ QA Report: PASS
Verified ACP live message behavior through a real stdio ACP JSON-RPC harness; the PR achieves its stated goal with no QA issues found.
Does this PR achieve its stated goal?
Yes. I exercised the changed SDK and agent-server paths with a minimal ACP server instead of relying on unit tests: on the base commit, ACP send_message() waited behind an in-flight prompt and ACP string chunks did not produce streaming deltas; on the PR, the intervening user message persisted immediately, the queued ACP prompt ran, streaming deltas were published, and EventService sent session/cancel before restarting with the replacement prompt. The condenser settings path also constructs separate default and condenser LLM usage IDs on the PR.
| Phase | Result |
|---|---|
| Environment Setup | ✅ uv sync --dev --frozen completed (Checked 235 packages in 2ms) |
| CI Status | 🟡 Core build/test/pre-commit checks are green; OpenHands pr-review and qa-changes automation checks are still in progress |
| Functional Verification | ✅ ACP queued messages, streaming deltas, interrupt/cancel restart, and condenser usage IDs verified |
Functional Verification
Test 1: Local ACP conversation accepts a user message while a prompt is in flight
Step 1 — Reproduce baseline without the PR:
Checked out origin/main (2aa5256e2147a3252be8d1f96600f627ec27abbb) and ran OPENHANDS_SUPPRESS_BANNER=1 uv run python /tmp/qa_acp_harness.py local_queue:
QA_RESULT={"cancel_events": [], "completed_within_250ms": false, "mode": "local_queue", "prompts": ["initial request", "intervening request"], "second_prompt_auto": true, "send_elapsed_if_prompt": null, "send_elapsed_total": 2.7864337420000425}This confirms the old behavior: the intervening send_message() did not persist promptly; it waited ~2.8s for the in-flight ACP prompt/usage wait to complete.
Step 2 — Apply the PR's changes:
Checked out c3749ffa83e04f8f64ff102aa5529ef5c65e591c.
Step 3 — Re-run with the fix in place:
Ran OPENHANDS_SUPPRESS_BANNER=1 uv run python /tmp/qa_acp_harness.py local_queue:
QA_RESULT={"cancel_events": [], "completed_within_250ms": true, "mode": "local_queue", "prompts": ["initial request", "intervening request"], "second_prompt_auto": true, "send_elapsed_if_prompt": 0.0020447240000294187, "send_elapsed_total": 0.0020454750000453714}This shows the PR fixes the core async-turn issue: the new user message persisted in ~2ms while the first ACP prompt was still active, and the second ACP prompt was processed automatically in FIFO order.
Test 2: ACP plain-string token callbacks publish streaming deltas
Step 1 — Reproduce baseline without the PR:
Checked out origin/main and ran OPENHANDS_SUPPRESS_BANNER=1 uv run python /tmp/qa_acp_harness.py event_streaming:
QA_RESULT={"delta_contents": [], "delta_count": 0, "mode": "event_streaming", "prompts": ["stream this"]}This confirms the previous agent-server streaming path did not surface ACP plain-string chunks as StreamingDeltaEvents.
Step 2 — Apply the PR's changes:
Checked out c3749ffa83e04f8f64ff102aa5529ef5c65e591c.
Step 3 — Re-run with the fix in place:
Ran OPENHANDS_SUPPRESS_BANNER=1 uv run python /tmp/qa_acp_harness.py event_streaming:
QA_RESULT={"delta_contents": ["delta for stream this"], "delta_count": 1, "mode": "event_streaming", "prompts": ["stream this"]}This shows the ACP string chunk reached subscribers as a transient streaming delta.
Test 3: Agent-server interrupts an in-flight ACP turn and sends session/cancel
Step 1 — Reproduce baseline without the PR:
Checked out origin/main and ran OPENHANDS_SUPPRESS_BANNER=1 uv run python /tmp/qa_acp_harness.py event_interrupt:
QA_RESULT={"cancel_count": 0, "mode": "event_interrupt", "prompts": ["initial long request", "replacement request"], "replacement_prompt_seen": true, "send_elapsed": 7.073722684000018}This shows the old EventService path did not send session/cancel; the second send_message(run=True) waited for the first long ACP prompt and then for the replacement run.
Step 2 — Apply the PR's changes:
Checked out c3749ffa83e04f8f64ff102aa5529ef5c65e591c.
Step 3 — Re-run with the fix in place:
Ran OPENHANDS_SUPPRESS_BANNER=1 uv run python /tmp/qa_acp_harness.py event_interrupt:
QA_RESULT={"cancel_count": 1, "mode": "event_interrupt", "prompts": ["initial long request", "replacement request"], "replacement_prompt_seen": true, "send_elapsed": 2.009888352999951}This confirms the PR sends ACP session/cancel during the interrupted turn and restarts against the replacement user prompt instead of waiting for the old long prompt to complete.
Test 4: Condenser LLM usage IDs stay separated
Step 1 — Establish baseline:
On origin/main, ran a real settings construction script using OpenHandsAgentSettings, CondenserSettings(enabled=True), and create_agent_from_settings(...):
Agent
agent_llm_usage_id= default
condenser= LLMSummarizingCondenser
all_llm_usage_ids= ['default', 'condenser']
Step 2 — Apply the PR's changes and re-run:
On c3749ffa83e04f8f64ff102aa5529ef5c65e591c, ran the same script:
Agent
agent_llm_usage_id= default
condenser= LLMSummarizingCondenser
all_llm_usage_ids= ['default', 'condenser']
This verifies the PR branch exposes distinct runtime usage IDs for agent and condenser LLMs; the current origin/main already has the same behavior, so this was a no-regression check for the copied fix.
Issues Found
None.
This review was created by an AI agent (OpenHands) on behalf of the user.
all-hands-bot
left a comment
There was a problem hiding this comment.
🟡 Acceptable direction, but I found a few ACP async queue/cancellation edge cases that should be addressed before merge. Risk: 🟡 medium because this changes agent/conversation loop behavior; a human maintainer should decide eval coverage before approval.
This review was generated by an AI agent (OpenHands) on behalf of the user.
Was this automated review useful? React with 👍 or 👎 to this review to help us measure review quality.
Workflow run: https://github.com/OpenHands/software-agent-sdk/actions/runs/26379406959
all-hands-bot
left a comment
There was a problem hiding this comment.
🟡 Acceptable direction, but I found two ACP cancellation races that can strand a just-appended user message or desync local history from the ACP session. Risk: 🟡 medium because this changes conversation loop/cancellation behavior; please have a human maintainer decide eval coverage before merge.
This review was generated by an AI agent (OpenHands) on behalf of the user.
Was this automated review useful? React with 👍 or 👎 to this review to help us measure review quality.
Workflow run: https://github.com/OpenHands/software-agent-sdk/actions/runs/26411480756
all-hands-bot
left a comment
There was a problem hiding this comment.
🟡 Taste Rating: Acceptable, but I found a few ACP async edge cases worth fixing before merge.
[RISK ASSESSMENT]
- [Overall PR]
⚠️ Risk Assessment: 🟡 MEDIUM — this changes ACP cancellation/restart and conversation loop behavior, so a human maintainer should decide eval coverage before approval.
This review was generated by an AI agent (OpenHands) on behalf of the user.
Was this automated review useful? React with 👍 or 👎 to this review to help us measure review quality.
Workflow run: https://github.com/OpenHands/software-agent-sdk/actions/runs/26412087756
all-hands-bot
left a comment
There was a problem hiding this comment.
🟡 Taste Rating: Acceptable, but I found a few remaining ACP async races to address. Risk: 🟡 MEDIUM because this changes ACP conversation-loop/cancellation behavior; a human maintainer should decide eval coverage before merge.
This review was generated by an AI agent (OpenHands) on behalf of the user.
Was this automated review useful? React with 👍 or 👎 to this review to help us measure review quality.
Workflow run: https://github.com/OpenHands/software-agent-sdk/actions/runs/26415333311
all-hands-bot
left a comment
There was a problem hiding this comment.
🟡 Acceptable direction, but I found a few remaining ACP async edge cases around restart/cancellation and cursor handling. Risk: 🟡 MEDIUM because this changes ACP conversation-loop/cancellation behavior; a human maintainer should decide eval coverage before approval.
This review was generated by an AI agent (OpenHands) on behalf of the user.
Was this automated review useful? React with 👍 or 👎 to this review to help us measure review quality.
Workflow run: https://github.com/OpenHands/software-agent-sdk/actions/runs/26416390628
| interrupted_acp = True | ||
| await self.interrupt(internal_acp_rerun=True) | ||
| try: | ||
| await self.run() |
There was a problem hiding this comment.
🟠 Important: After an internal ACP supersede, this coroutine awaits interrupt(internal_acp_rerun=True) and then always calls run(). If the user explicitly presses Stop/Pause while that internal interrupt is draining, that explicit call clears the rerun flags, but this suspended send_message() still resumes here and restarts from PAUSED. Please re-check an internal rerun generation/flag after interrupt() returns, or let _run_and_publish own the restart, so explicit user stop intent wins.
| # within our short grace window; it does not prove the ACP server lost | ||
| # its persisted session. Preserve the session id so the restarted | ||
| # subprocess can load_session() and retain conversation memory. | ||
| self._restart_session_on_next_turn = False |
There was a problem hiding this comment.
🟠 Important: This clears _restart_session_on_next_turn before the replacement session has actually initialized. If _cleanup()/init_state() fails, the exception escapes before the prompt try block, the agent is left cleaned up, _agent_ready remains true, and future runs won't retry the deferred restart. Clear the flag only after init_state() succeeds, or restore it on failure.
| ) | ||
| raise | ||
| if drain_result.completed and drain_result.error is not None: | ||
| self._emit_turn_error(drain_result.error, state, on_event) |
There was a problem hiding this comment.
🟠 Important: In the explicit cancellation path, a drained prompt error is surfaced as an ACP error before re-raising CancelledError. Normal Stop can therefore append ACP error/ConversationErrorEvent before arun() marks the conversation PAUSED; cancellation during retry backoff can also surface a stale retriable attempt because prompt_future still points at the handled failure. For user cancellation, close in-flight tool cards/quarantine the session as needed and re-raise without emitting a turn error.
| event | ||
| for event in self._state.events | ||
| if isinstance(event, MessageEvent) | ||
| and event.source == "user" |
There was a problem hiding this comment.
🟠 Important: Stop-hook feedback is appended above as MessageEvent(source="environment", role="user"), but the ACP prompt queue filters only source == "user". After an ACP turn finishes and a stop hook returns should_stop=False, the cursor is already current, so this loop finds no prompt for the feedback and finishes anyway. Include the feedback event in the ACP queue, or otherwise send it to ACP, before allowing FINISHED.
| user_messages[0] if user_messages else None | ||
| ) | ||
| else: | ||
| last_prompt_index = next( |
There was a problem hiding this comment.
🟠 Important: If persisted acp_last_prompt_user_message_id no longer exists in user_messages (for example after condensation/truncation or a bad persisted state), last_prompt_index is None and no prompt is selected. The no-prompt branch then sees last_user_message_id != last_acp_prompt_user_message_id, sets RUNNING, and continues without incrementing iteration, causing a tight loop. Treat a missing cursor as a recoverable mismatch by resetting/repairing it or selecting a deterministic next prompt.
Summary
StreamingDeltaEvents, including ACP providers that invoke token callbacks with plain string chunksastep()awaits so websocket/API user messages can be persisted immediatelysession/cancelwhen an in-flight turn is interruptedRegression Tests
test_acp_string_token_callback_publishes_deltafails on the prior implementation because ACP plain-string token callbacks raised before publishing anyStreamingDeltaEventtest_acp_arun_accepts_user_message_while_step_is_in_flightfails on main becausesend_message()waits behind the in-flight ACP prompttests/sdk/test_settings.py -k condensercovers the copied condenser LLM usage-id behavior from fix(sdk): assign condenser LLM usage id #3368Validation
uv run pytest -q tests/sdk/test_settings.py -k condenseruv run pytest -q tests/agent_server/test_event_streaming.py tests/agent_server/test_event_service.py::TestEventServiceSendMessage tests/sdk/agent/test_acp_agent.py::TestACPAgentAstep tests/sdk/conversation/local/test_conversation_send_message.pyuv run ruff check openhands-sdk/openhands/sdk/settings/model.py tests/sdk/test_settings.py openhands-agent-server/openhands/agent_server/event_service.py tests/agent_server/test_event_streaming.pyAgent Server images for this PR
• GHCR package: https://github.com/OpenHands/agent-sdk/pkgs/container/agent-server
Variants & Base Images
eclipse-temurin:17-jdknikolaik/python-nodejs:python3.13-nodejs22-slimgolang:1.21-bookwormPull (multi-arch manifest)
# Each variant is a multi-arch manifest supporting both amd64 and arm64 docker pull ghcr.io/openhands/agent-server:c988567-pythonRun
All tags pushed for this build
About Multi-Architecture Support
c988567-python) is a multi-arch manifest supporting both amd64 and arm64c988567-python-amd64) are also available if needed