Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
0a180f1
fix(sdk): assign condenser LLM usage id
openhands-agent May 23, 2026
43032a6
fix(sdk): reset condenser LLM metrics
openhands-agent May 24, 2026
ca04e8c
fix(acp): accept user messages during async turns
neubig May 24, 2026
f50ed72
fix(agent-server): stream ACP text deltas
neubig May 24, 2026
71f8148
fix(acp): interrupt running turn on new user message
neubig May 24, 2026
b9dc93b
fix(agent-server): handle ACP string token deltas
neubig May 24, 2026
b336d05
merge main into PR #3376
openhands-agent May 24, 2026
6c89026
fix: address ACP async turn races (#3376)
openhands-agent May 24, 2026
49583a2
fix: satisfy ACP prompt future typing (#3376)
openhands-agent May 24, 2026
55b6c38
test: update ACP arun prompt snapshot test (#3376)
openhands-agent May 24, 2026
c9bbb68
fix: close ACP async ordering gaps (#3376)
openhands-agent May 24, 2026
ce68215
fix: address ACP cancellation edge cases (#3376)
openhands-agent May 24, 2026
f581a01
fix: address ACP review edge cases (#3376)
openhands-agent May 24, 2026
f1ca28d
Merge branch 'pr-3368-fix-condenser-usage-id' into codex/acp-live-mes…
neubig May 24, 2026
ffeb881
fix: close ACP rerun race windows (#3376)
openhands-agent May 24, 2026
c1c37ca
Merge branch 'main' into codex/acp-live-message-deltas
neubig May 24, 2026
1c390af
fix(acp): reassign agent state for prompt tracking
neubig May 24, 2026
c3749ff
fix(acp): resume session after cancel drain timeout
neubig May 25, 2026
6246d1e
Address ACP async review races
May 25, 2026
682fad9
Clarify ACP queued-message cleanup fixes
May 25, 2026
741399e
Fix ACP resume cursor after cancellation
May 25, 2026
89a68ff
Treat ACP prompt timeout as idle timeout
May 25, 2026
81b4713
Restore hard ACP prompt timeout
May 25, 2026
70e44e2
Fix ACP interrupt cursor races
May 25, 2026
ff3b45b
Format EventService ACP rerun logic
May 25, 2026
af1ac3f
Use reassignment-safe ACP cursor state updates
May 25, 2026
c988567
Fix remaining ACP async race reviews
May 25, 2026
9d7d2be
fix: address ACP async review feedback
openhands-agent May 26, 2026
1deacf2
fix: handle ACP double cancellation
openhands-agent May 26, 2026
91d11a8
fix: close ACP cancellation restart races
openhands-agent May 26, 2026
eb7bed3
fix: harden ACP cleanup interrupts
openhands-agent May 26, 2026
80dc346
fix: guard ACP restart ordering
openhands-agent May 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 170 additions & 38 deletions openhands-agent-server/openhands/agent_server/event_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@
)
from openhands.agent_server.pub_sub import PubSub, Subscriber
from openhands.sdk import LLM, AgentBase, Event, Message, get_logger
from openhands.sdk.agent import ACPAgent
from openhands.sdk.conversation.base import BaseConversation
from openhands.sdk.conversation.impl.local_conversation import LocalConversation
from openhands.sdk.conversation.impl.local_conversation import (
ACP_INFLIGHT_PROMPT_USER_MESSAGE_ID,
ACP_SUPERSEDE_INFLIGHT_PROMPT,
LocalConversation,
)
from openhands.sdk.conversation.response_utils import get_agent_final_response
from openhands.sdk.conversation.secret_registry import SecretValue
from openhands.sdk.conversation.state import (
Expand Down Expand Up @@ -71,6 +76,15 @@ class EventService:
# Set when a send_message(run=True) is rejected because a run is still
# wrapping up; consumed by _run_and_publish to re-run the stranded message.
_rerun_requested: bool = field(default=False, init=False)
# Set only for the internal ACP interrupt/restart path triggered by a new
# send_message(run=True). Explicit user pause/interrupt clears it so user
# stop intent wins over an earlier automatic restart request.
_acp_internal_rerun_requested: bool = field(default=False, init=False)
# Incremented for explicit user pause/interrupt requests. Internal ACP
# supersede restarts compare this generation after their interrupt drains
# so a later Stop/Pause cannot be overwritten by an automatic restart.
_explicit_interrupt_generation: int = field(default=0, init=False)
_closing: bool = field(default=False, init=False)
_run_lock: asyncio.Lock = field(default_factory=asyncio.Lock, init=False)
_callback_wrapper: AsyncCallbackWrapper | None = field(default=None, init=False)
_lease: ConversationLease | None = field(default=None, init=False)
Expand Down Expand Up @@ -419,11 +433,28 @@ async def batch_get_events(self, event_ids: list[str]) -> list[Event | None]:
async def send_message(self, message: Message, run: bool = False):
if not self._conversation:
raise ValueError("inactive_service")
explicit_interrupt_generation = self._explicit_interrupt_generation
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self._conversation.send_message, message)
if run:
(
did_mark_acp_prompt_superseded,
active_acp_prompt_has_latest_message,
) = await self._mark_running_acp_prompt_superseded()
interrupted_acp = False
if did_mark_acp_prompt_superseded:
self._acp_internal_rerun_requested = True
interrupted_acp = True
await self.interrupt(internal_acp_rerun=True)
if self._explicit_interrupt_generation != explicit_interrupt_generation:
return
try:
await self.run()
await self.run(
acp_internal_rerun_generation=explicit_interrupt_generation
if interrupted_acp
else None
)
self._acp_internal_rerun_requested = False
except ValueError as e:
# run() refused. If a run is still wrapping up (its
# wait_for_pending tail), the message we just appended won't be
Expand All @@ -433,8 +464,53 @@ async def send_message(self, message: Message, run: bool = False):
# is what keeps a deliberate run=False append, or an IDLE reached
# via another path, from triggering an unwanted run.
# "inactive_service" is terminal and must not re-arm.
if str(e) == "conversation_already_running":
if (
str(e) == "conversation_already_running"
and not active_acp_prompt_has_latest_message
):
self._rerun_requested = True
if interrupted_acp:
self._acp_internal_rerun_requested = True

def _mark_running_acp_prompt_superseded_sync(self) -> tuple[bool, bool]:
"""Mark the currently running ACP prompt superseded if needed.

The tuple is ``(did_mark_superseded, active_prompt_has_latest_message)``.
If the running ACP prompt has already advanced to the newly appended
user message, interrupting it would cancel the replacement prompt and
strand that message behind the persisted cursor.
"""
if not self._conversation:
return (False, False)
if self._run_task is None:
return (False, False)
if not isinstance(self._conversation.agent, ACPAgent):
return (False, False)
with self._conversation._state as state:
if state.execution_status != ConversationExecutionStatus.RUNNING:
return (False, False)
inflight_prompt_user_message_id = state.agent_state.get(
ACP_INFLIGHT_PROMPT_USER_MESSAGE_ID
)
last_user_message_id = state.last_user_message_id
if inflight_prompt_user_message_id is None or last_user_message_id is None:
return (False, False)
active_prompt_has_latest_message = (
inflight_prompt_user_message_id == last_user_message_id
)
if active_prompt_has_latest_message:
return (False, True)
state.agent_state = {
**state.agent_state,
ACP_SUPERSEDE_INFLIGHT_PROMPT: True,
Comment thread
neubig marked this conversation as resolved.
}
return (True, False)

async def _mark_running_acp_prompt_superseded(self) -> tuple[bool, bool]:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
None, self._mark_running_acp_prompt_superseded_sync
)

async def subscribe_to_events(self, subscriber: Subscriber[Event]) -> UUID:
subscriber_id = self._pub_sub.subscribe(subscriber)
Expand Down Expand Up @@ -624,41 +700,53 @@ async def start(self):
self._pub_sub, loop=asyncio.get_running_loop()
)

# Only wire token streaming if at least one LLM has stream=True.
# The LLM silently ignores on_token when stream is off, but skipping
# the wiring lets us log the decision so operators can tell from a
# log line whether deltas will flow.
streaming_enabled = any(llm.stream for llm in agent.get_all_llms())
# Only wire token streaming for agents that can actually emit token
# callbacks. SDK LLM agents need stream=True, while ACP agents emit
# AgentMessageChunk text through their bridge without exposing an LLM.
streaming_enabled = isinstance(agent, ACPAgent) or any(
llm.stream for llm in agent.get_all_llms()
)
logger.debug(
"Token streaming: %s",
"enabled" if streaming_enabled else "disabled (no LLM has stream=True)",
)

def _token_streaming_callback(chunk: LLMStreamChunk) -> None:
def _publish_stream_delta(
content: str | None = None,
reasoning_content: str | None = None,
) -> None:
# Published directly to _pub_sub (not via _callback_wrapper) so
# deltas reach subscribers but are NOT persisted to
# ConversationState.events. See StreamingDeltaEvent docstring.
if not self._main_loop or not self._main_loop.is_running():
return
# Use `is not None` rather than truthiness: some providers
# emit legitimate empty-string chunks at stream boundaries
# (e.g. after a tool call) that we still want to forward.
if content is None and reasoning_content is None:
return
event = StreamingDeltaEvent(
content=content,
reasoning_content=reasoning_content,
)
with suppress(RuntimeError):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Suggestion: suppress(RuntimeError) is broader than needed here. asyncio.run_coroutine_threadsafe raises RuntimeError only when the target event loop is already closed — that is the one case worth suppressing. As written this also silently swallows any unexpected RuntimeError from StreamingDeltaEvent construction or self._pub_sub.__call__ scheduling. Adding a brief inline comment makes the scope explicit rather than accidental:

Suggested change
with suppress(RuntimeError):
with suppress(RuntimeError): # loop already closed
asyncio.run_coroutine_threadsafe(self._pub_sub(event), self._main_loop)

(Flagged in a prior review round; still unresolved.)

asyncio.run_coroutine_threadsafe(self._pub_sub(event), self._main_loop)

def _token_streaming_callback(chunk: LLMStreamChunk | str) -> None:
if isinstance(chunk, str):
_publish_stream_delta(content=chunk)
return

for choice in chunk.choices or ():
delta = choice.delta
if delta is None:
continue
content = getattr(delta, "content", None)
reasoning = getattr(delta, "reasoning_content", None)
# Use `is not None` rather than truthiness: some providers
# emit legitimate empty-string chunks at stream boundaries
# (e.g. after a tool call) that we still want to forward.
if content is None and reasoning is None:
continue
event = StreamingDeltaEvent(
_publish_stream_delta(
content=content if isinstance(content, str) else None,
reasoning_content=reasoning if isinstance(reasoning, str) else None,
)
with suppress(RuntimeError):
asyncio.run_coroutine_threadsafe(
self._pub_sub(event), self._main_loop
)

conversation = LocalConversation(
agent=agent,
Expand Down Expand Up @@ -733,7 +821,7 @@ def _token_streaming_callback(chunk: LLMStreamChunk) -> None:
# Publish initial state update
await self._publish_state_update()

async def run(self):
async def run(self, acp_internal_rerun_generation: int | None = None):
"""Run the conversation asynchronously in the background.

This method starts the conversation run in a background task and returns
Expand All @@ -746,7 +834,7 @@ async def run(self):
Raises:
ValueError: If the service is inactive or conversation is already running.
"""
if not self._conversation:
if not self._conversation or self._closing:
raise ValueError("inactive_service")

# Use lock to make check-and-set atomic, preventing race conditions
Expand All @@ -756,6 +844,13 @@ async def run(self):
== ConversationExecutionStatus.RUNNING
):
raise ValueError("conversation_already_running")
if self._closing:
raise ValueError("inactive_service")
if (
acp_internal_rerun_generation is not None
and self._explicit_interrupt_generation != acp_internal_rerun_generation
):
return

# Check if there's already a running task
if self._run_task is not None and not self._run_task.done():
Expand Down Expand Up @@ -812,21 +907,47 @@ async def _run_and_publish():
# wrapping up. A send_message(run=True) that arrived during
# the wait_for_pending() tail above had its run() rejected as
# "conversation_already_running" and suppressed, setting
# _rerun_requested. Honor it only while the conversation is
# still IDLE — i.e. that message is genuinely pending. If the
# run loop was still alive it already absorbed the message
# (LocalConversation.run() keeps looping on FINISHED) and we
# are FINISHED here, so the IDLE guard avoids a redundant run.
# A deliberate run=False append, or an IDLE reached via
# another path, never sets the flag.
if self._rerun_requested:
self._rerun_requested = False
if (
await self._get_execution_status()
== ConversationExecutionStatus.IDLE
):
with suppress(ValueError):
await self.run()
# _rerun_requested. Honor it while the conversation is IDLE
# (pending input) or internally ACP-interrupted PAUSED (the
# old task finished its interrupt before the replacement run
# could start). Explicit user pause/interrupt clears the
# internal ACP flag, so user stop intent wins over an older
# automatic restart request. If the run loop was still alive
# it already absorbed the message and we are FINISHED here,
# so the guard avoids a redundant run. A deliberate
# run=False append, or an IDLE reached via another path,
# never sets the flag.
rerun_requested = self._rerun_requested
acp_internal_rerun_requested = self._acp_internal_rerun_requested
rerun_generation = self._explicit_interrupt_generation
self._rerun_requested = False
self._acp_internal_rerun_requested = False
if rerun_requested:
status = await self._get_execution_status()
acp_internal_rerun_still_valid = (
acp_internal_rerun_requested
and self._explicit_interrupt_generation == rerun_generation
)
should_restart = status == ConversationExecutionStatus.IDLE or (
Comment thread
neubig marked this conversation as resolved.
acp_internal_rerun_still_valid
and status == ConversationExecutionStatus.PAUSED
and isinstance(conversation.agent, ACPAgent)
)
if should_restart:
try:
await self.run(
acp_internal_rerun_generation=rerun_generation
if acp_internal_rerun_still_valid
else None
)
except ValueError as e:
if str(e) == "conversation_already_running":
self._rerun_requested = True
self._acp_internal_rerun_requested = (
acp_internal_rerun_requested
)
else:
raise

# Create task but don't await it - runs in background
self._run_task = asyncio.create_task(_run_and_publish())
Expand Down Expand Up @@ -857,25 +978,32 @@ async def reject_pending_actions(self, reason: str):

async def pause(self):
if self._conversation:
self._explicit_interrupt_generation += 1
self._rerun_requested = False
self._acp_internal_rerun_requested = False
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self._conversation.pause)
# Publish state update after pause to ensure stats are updated
await self._publish_state_update()

async def interrupt(self):
async def interrupt(self, *, internal_acp_rerun: bool = False):
"""Immediately cancel an in-flight async LLM call.

Delegates to :meth:`LocalConversation.interrupt` which cancels the
``arun()`` task. If no async run is in progress the call falls
back to :meth:`pause`.
"""
if self._conversation:
if not internal_acp_rerun:
self._explicit_interrupt_generation += 1
self._rerun_requested = False
self._acp_internal_rerun_requested = False
self._conversation.interrupt()
# Wait for the run task to finish so we can publish the final
# state update (PAUSED + InterruptEvent) cleanly.
if self._run_task is not None and not self._run_task.done():
with suppress(Exception):
await asyncio.wait_for(self._run_task, timeout=5.0)
await asyncio.wait_for(asyncio.shield(self._run_task), timeout=5.0)
# Only clear _run_task if it actually finished; if
# wait_for timed out the task may still be running and
# clearing prematurely would allow a second run() to
Expand Down Expand Up @@ -912,6 +1040,10 @@ async def set_security_analyzer(
)

async def close(self):
self._closing = True
self._explicit_interrupt_generation += 1
self._rerun_requested = False
self._acp_internal_rerun_requested = False
if self._lease_task is not None:
self._lease_task.cancel()
with suppress(asyncio.CancelledError):
Expand Down
Loading
Loading