-
Notifications
You must be signed in to change notification settings - Fork 264
[codex] Accept ACP user messages during async turns #3376
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
0a180f1
43032a6
ca04e8c
f50ed72
71f8148
b9dc93b
b336d05
6c89026
49583a2
55b6c38
c9bbb68
ce68215
f581a01
f1ca28d
ffeb881
c1c37ca
1c390af
c3749ff
6246d1e
682fad9
741399e
89a68ff
81b4713
70e44e2
ff3b45b
af1ac3f
c988567
9d7d2be
1deacf2
91d11a8
eb7bed3
80dc346
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -18,8 +18,13 @@ | |||||||
| ) | ||||||||
| from openhands.agent_server.pub_sub import PubSub, Subscriber | ||||||||
| from openhands.sdk import LLM, AgentBase, Event, Message, get_logger | ||||||||
| from openhands.sdk.agent import ACPAgent | ||||||||
| from openhands.sdk.conversation.base import BaseConversation | ||||||||
| from openhands.sdk.conversation.impl.local_conversation import LocalConversation | ||||||||
| from openhands.sdk.conversation.impl.local_conversation import ( | ||||||||
| ACP_INFLIGHT_PROMPT_USER_MESSAGE_ID, | ||||||||
| ACP_SUPERSEDE_INFLIGHT_PROMPT, | ||||||||
| LocalConversation, | ||||||||
| ) | ||||||||
| from openhands.sdk.conversation.response_utils import get_agent_final_response | ||||||||
| from openhands.sdk.conversation.secret_registry import SecretValue | ||||||||
| from openhands.sdk.conversation.state import ( | ||||||||
|
|
@@ -71,6 +76,15 @@ class EventService: | |||||||
| # Set when a send_message(run=True) is rejected because a run is still | ||||||||
| # wrapping up; consumed by _run_and_publish to re-run the stranded message. | ||||||||
| _rerun_requested: bool = field(default=False, init=False) | ||||||||
| # Set only for the internal ACP interrupt/restart path triggered by a new | ||||||||
| # send_message(run=True). Explicit user pause/interrupt clears it so user | ||||||||
| # stop intent wins over an earlier automatic restart request. | ||||||||
| _acp_internal_rerun_requested: bool = field(default=False, init=False) | ||||||||
| # Incremented for explicit user pause/interrupt requests. Internal ACP | ||||||||
| # supersede restarts compare this generation after their interrupt drains | ||||||||
| # so a later Stop/Pause cannot be overwritten by an automatic restart. | ||||||||
| _explicit_interrupt_generation: int = field(default=0, init=False) | ||||||||
| _closing: bool = field(default=False, init=False) | ||||||||
| _run_lock: asyncio.Lock = field(default_factory=asyncio.Lock, init=False) | ||||||||
| _callback_wrapper: AsyncCallbackWrapper | None = field(default=None, init=False) | ||||||||
| _lease: ConversationLease | None = field(default=None, init=False) | ||||||||
|
|
@@ -419,11 +433,28 @@ async def batch_get_events(self, event_ids: list[str]) -> list[Event | None]: | |||||||
| async def send_message(self, message: Message, run: bool = False): | ||||||||
| if not self._conversation: | ||||||||
| raise ValueError("inactive_service") | ||||||||
| explicit_interrupt_generation = self._explicit_interrupt_generation | ||||||||
| loop = asyncio.get_running_loop() | ||||||||
| await loop.run_in_executor(None, self._conversation.send_message, message) | ||||||||
| if run: | ||||||||
| ( | ||||||||
| did_mark_acp_prompt_superseded, | ||||||||
| active_acp_prompt_has_latest_message, | ||||||||
| ) = await self._mark_running_acp_prompt_superseded() | ||||||||
| interrupted_acp = False | ||||||||
| if did_mark_acp_prompt_superseded: | ||||||||
| self._acp_internal_rerun_requested = True | ||||||||
| interrupted_acp = True | ||||||||
| await self.interrupt(internal_acp_rerun=True) | ||||||||
| if self._explicit_interrupt_generation != explicit_interrupt_generation: | ||||||||
| return | ||||||||
| try: | ||||||||
| await self.run() | ||||||||
| await self.run( | ||||||||
| acp_internal_rerun_generation=explicit_interrupt_generation | ||||||||
| if interrupted_acp | ||||||||
| else None | ||||||||
| ) | ||||||||
| self._acp_internal_rerun_requested = False | ||||||||
| except ValueError as e: | ||||||||
| # run() refused. If a run is still wrapping up (its | ||||||||
| # wait_for_pending tail), the message we just appended won't be | ||||||||
|
|
@@ -433,8 +464,53 @@ async def send_message(self, message: Message, run: bool = False): | |||||||
| # is what keeps a deliberate run=False append, or an IDLE reached | ||||||||
| # via another path, from triggering an unwanted run. | ||||||||
| # "inactive_service" is terminal and must not re-arm. | ||||||||
| if str(e) == "conversation_already_running": | ||||||||
| if ( | ||||||||
| str(e) == "conversation_already_running" | ||||||||
| and not active_acp_prompt_has_latest_message | ||||||||
| ): | ||||||||
| self._rerun_requested = True | ||||||||
| if interrupted_acp: | ||||||||
| self._acp_internal_rerun_requested = True | ||||||||
|
|
||||||||
| def _mark_running_acp_prompt_superseded_sync(self) -> tuple[bool, bool]: | ||||||||
| """Mark the currently running ACP prompt superseded if needed. | ||||||||
|
|
||||||||
| The tuple is ``(did_mark_superseded, active_prompt_has_latest_message)``. | ||||||||
| If the running ACP prompt has already advanced to the newly appended | ||||||||
| user message, interrupting it would cancel the replacement prompt and | ||||||||
| strand that message behind the persisted cursor. | ||||||||
| """ | ||||||||
| if not self._conversation: | ||||||||
| return (False, False) | ||||||||
| if self._run_task is None: | ||||||||
| return (False, False) | ||||||||
| if not isinstance(self._conversation.agent, ACPAgent): | ||||||||
| return (False, False) | ||||||||
| with self._conversation._state as state: | ||||||||
| if state.execution_status != ConversationExecutionStatus.RUNNING: | ||||||||
| return (False, False) | ||||||||
| inflight_prompt_user_message_id = state.agent_state.get( | ||||||||
| ACP_INFLIGHT_PROMPT_USER_MESSAGE_ID | ||||||||
| ) | ||||||||
| last_user_message_id = state.last_user_message_id | ||||||||
| if inflight_prompt_user_message_id is None or last_user_message_id is None: | ||||||||
| return (False, False) | ||||||||
| active_prompt_has_latest_message = ( | ||||||||
| inflight_prompt_user_message_id == last_user_message_id | ||||||||
| ) | ||||||||
| if active_prompt_has_latest_message: | ||||||||
| return (False, True) | ||||||||
| state.agent_state = { | ||||||||
| **state.agent_state, | ||||||||
| ACP_SUPERSEDE_INFLIGHT_PROMPT: True, | ||||||||
| } | ||||||||
| return (True, False) | ||||||||
|
|
||||||||
| async def _mark_running_acp_prompt_superseded(self) -> tuple[bool, bool]: | ||||||||
| loop = asyncio.get_running_loop() | ||||||||
| return await loop.run_in_executor( | ||||||||
| None, self._mark_running_acp_prompt_superseded_sync | ||||||||
| ) | ||||||||
|
|
||||||||
| async def subscribe_to_events(self, subscriber: Subscriber[Event]) -> UUID: | ||||||||
| subscriber_id = self._pub_sub.subscribe(subscriber) | ||||||||
|
|
@@ -624,41 +700,53 @@ async def start(self): | |||||||
| self._pub_sub, loop=asyncio.get_running_loop() | ||||||||
| ) | ||||||||
|
|
||||||||
| # Only wire token streaming if at least one LLM has stream=True. | ||||||||
| # The LLM silently ignores on_token when stream is off, but skipping | ||||||||
| # the wiring lets us log the decision so operators can tell from a | ||||||||
| # log line whether deltas will flow. | ||||||||
| streaming_enabled = any(llm.stream for llm in agent.get_all_llms()) | ||||||||
| # Only wire token streaming for agents that can actually emit token | ||||||||
| # callbacks. SDK LLM agents need stream=True, while ACP agents emit | ||||||||
| # AgentMessageChunk text through their bridge without exposing an LLM. | ||||||||
| streaming_enabled = isinstance(agent, ACPAgent) or any( | ||||||||
| llm.stream for llm in agent.get_all_llms() | ||||||||
| ) | ||||||||
| logger.debug( | ||||||||
| "Token streaming: %s", | ||||||||
| "enabled" if streaming_enabled else "disabled (no LLM has stream=True)", | ||||||||
| ) | ||||||||
|
|
||||||||
| def _token_streaming_callback(chunk: LLMStreamChunk) -> None: | ||||||||
| def _publish_stream_delta( | ||||||||
| content: str | None = None, | ||||||||
| reasoning_content: str | None = None, | ||||||||
| ) -> None: | ||||||||
| # Published directly to _pub_sub (not via _callback_wrapper) so | ||||||||
| # deltas reach subscribers but are NOT persisted to | ||||||||
| # ConversationState.events. See StreamingDeltaEvent docstring. | ||||||||
| if not self._main_loop or not self._main_loop.is_running(): | ||||||||
| return | ||||||||
| # Use `is not None` rather than truthiness: some providers | ||||||||
| # emit legitimate empty-string chunks at stream boundaries | ||||||||
| # (e.g. after a tool call) that we still want to forward. | ||||||||
| if content is None and reasoning_content is None: | ||||||||
| return | ||||||||
| event = StreamingDeltaEvent( | ||||||||
| content=content, | ||||||||
| reasoning_content=reasoning_content, | ||||||||
| ) | ||||||||
| with suppress(RuntimeError): | ||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Suggestion:
Suggested change
(Flagged in a prior review round; still unresolved.) |
||||||||
| asyncio.run_coroutine_threadsafe(self._pub_sub(event), self._main_loop) | ||||||||
|
|
||||||||
| def _token_streaming_callback(chunk: LLMStreamChunk | str) -> None: | ||||||||
| if isinstance(chunk, str): | ||||||||
| _publish_stream_delta(content=chunk) | ||||||||
| return | ||||||||
|
|
||||||||
| for choice in chunk.choices or (): | ||||||||
| delta = choice.delta | ||||||||
| if delta is None: | ||||||||
| continue | ||||||||
| content = getattr(delta, "content", None) | ||||||||
| reasoning = getattr(delta, "reasoning_content", None) | ||||||||
| # Use `is not None` rather than truthiness: some providers | ||||||||
| # emit legitimate empty-string chunks at stream boundaries | ||||||||
| # (e.g. after a tool call) that we still want to forward. | ||||||||
| if content is None and reasoning is None: | ||||||||
| continue | ||||||||
| event = StreamingDeltaEvent( | ||||||||
| _publish_stream_delta( | ||||||||
| content=content if isinstance(content, str) else None, | ||||||||
| reasoning_content=reasoning if isinstance(reasoning, str) else None, | ||||||||
| ) | ||||||||
| with suppress(RuntimeError): | ||||||||
| asyncio.run_coroutine_threadsafe( | ||||||||
| self._pub_sub(event), self._main_loop | ||||||||
| ) | ||||||||
|
|
||||||||
| conversation = LocalConversation( | ||||||||
| agent=agent, | ||||||||
|
|
@@ -733,7 +821,7 @@ def _token_streaming_callback(chunk: LLMStreamChunk) -> None: | |||||||
| # Publish initial state update | ||||||||
| await self._publish_state_update() | ||||||||
|
|
||||||||
| async def run(self): | ||||||||
| async def run(self, acp_internal_rerun_generation: int | None = None): | ||||||||
| """Run the conversation asynchronously in the background. | ||||||||
|
|
||||||||
| This method starts the conversation run in a background task and returns | ||||||||
|
|
@@ -746,7 +834,7 @@ async def run(self): | |||||||
| Raises: | ||||||||
| ValueError: If the service is inactive or conversation is already running. | ||||||||
| """ | ||||||||
| if not self._conversation: | ||||||||
| if not self._conversation or self._closing: | ||||||||
| raise ValueError("inactive_service") | ||||||||
|
|
||||||||
| # Use lock to make check-and-set atomic, preventing race conditions | ||||||||
|
|
@@ -756,6 +844,13 @@ async def run(self): | |||||||
| == ConversationExecutionStatus.RUNNING | ||||||||
| ): | ||||||||
| raise ValueError("conversation_already_running") | ||||||||
| if self._closing: | ||||||||
| raise ValueError("inactive_service") | ||||||||
| if ( | ||||||||
| acp_internal_rerun_generation is not None | ||||||||
| and self._explicit_interrupt_generation != acp_internal_rerun_generation | ||||||||
| ): | ||||||||
| return | ||||||||
|
|
||||||||
| # Check if there's already a running task | ||||||||
| if self._run_task is not None and not self._run_task.done(): | ||||||||
|
|
@@ -812,21 +907,47 @@ async def _run_and_publish(): | |||||||
| # wrapping up. A send_message(run=True) that arrived during | ||||||||
| # the wait_for_pending() tail above had its run() rejected as | ||||||||
| # "conversation_already_running" and suppressed, setting | ||||||||
| # _rerun_requested. Honor it only while the conversation is | ||||||||
| # still IDLE — i.e. that message is genuinely pending. If the | ||||||||
| # run loop was still alive it already absorbed the message | ||||||||
| # (LocalConversation.run() keeps looping on FINISHED) and we | ||||||||
| # are FINISHED here, so the IDLE guard avoids a redundant run. | ||||||||
| # A deliberate run=False append, or an IDLE reached via | ||||||||
| # another path, never sets the flag. | ||||||||
| if self._rerun_requested: | ||||||||
| self._rerun_requested = False | ||||||||
| if ( | ||||||||
| await self._get_execution_status() | ||||||||
| == ConversationExecutionStatus.IDLE | ||||||||
| ): | ||||||||
| with suppress(ValueError): | ||||||||
| await self.run() | ||||||||
| # _rerun_requested. Honor it while the conversation is IDLE | ||||||||
| # (pending input) or internally ACP-interrupted PAUSED (the | ||||||||
| # old task finished its interrupt before the replacement run | ||||||||
| # could start). Explicit user pause/interrupt clears the | ||||||||
| # internal ACP flag, so user stop intent wins over an older | ||||||||
| # automatic restart request. If the run loop was still alive | ||||||||
| # it already absorbed the message and we are FINISHED here, | ||||||||
| # so the guard avoids a redundant run. A deliberate | ||||||||
| # run=False append, or an IDLE reached via another path, | ||||||||
| # never sets the flag. | ||||||||
| rerun_requested = self._rerun_requested | ||||||||
| acp_internal_rerun_requested = self._acp_internal_rerun_requested | ||||||||
| rerun_generation = self._explicit_interrupt_generation | ||||||||
| self._rerun_requested = False | ||||||||
| self._acp_internal_rerun_requested = False | ||||||||
| if rerun_requested: | ||||||||
| status = await self._get_execution_status() | ||||||||
| acp_internal_rerun_still_valid = ( | ||||||||
| acp_internal_rerun_requested | ||||||||
| and self._explicit_interrupt_generation == rerun_generation | ||||||||
| ) | ||||||||
| should_restart = status == ConversationExecutionStatus.IDLE or ( | ||||||||
|
neubig marked this conversation as resolved.
|
||||||||
| acp_internal_rerun_still_valid | ||||||||
| and status == ConversationExecutionStatus.PAUSED | ||||||||
| and isinstance(conversation.agent, ACPAgent) | ||||||||
| ) | ||||||||
| if should_restart: | ||||||||
| try: | ||||||||
| await self.run( | ||||||||
| acp_internal_rerun_generation=rerun_generation | ||||||||
| if acp_internal_rerun_still_valid | ||||||||
| else None | ||||||||
| ) | ||||||||
| except ValueError as e: | ||||||||
| if str(e) == "conversation_already_running": | ||||||||
| self._rerun_requested = True | ||||||||
| self._acp_internal_rerun_requested = ( | ||||||||
| acp_internal_rerun_requested | ||||||||
| ) | ||||||||
| else: | ||||||||
| raise | ||||||||
|
|
||||||||
| # Create task but don't await it - runs in background | ||||||||
| self._run_task = asyncio.create_task(_run_and_publish()) | ||||||||
|
|
@@ -857,25 +978,32 @@ async def reject_pending_actions(self, reason: str): | |||||||
|
|
||||||||
| async def pause(self): | ||||||||
| if self._conversation: | ||||||||
| self._explicit_interrupt_generation += 1 | ||||||||
| self._rerun_requested = False | ||||||||
| self._acp_internal_rerun_requested = False | ||||||||
| loop = asyncio.get_running_loop() | ||||||||
| await loop.run_in_executor(None, self._conversation.pause) | ||||||||
| # Publish state update after pause to ensure stats are updated | ||||||||
| await self._publish_state_update() | ||||||||
|
|
||||||||
| async def interrupt(self): | ||||||||
| async def interrupt(self, *, internal_acp_rerun: bool = False): | ||||||||
| """Immediately cancel an in-flight async LLM call. | ||||||||
|
|
||||||||
| Delegates to :meth:`LocalConversation.interrupt` which cancels the | ||||||||
| ``arun()`` task. If no async run is in progress the call falls | ||||||||
| back to :meth:`pause`. | ||||||||
| """ | ||||||||
| if self._conversation: | ||||||||
| if not internal_acp_rerun: | ||||||||
| self._explicit_interrupt_generation += 1 | ||||||||
| self._rerun_requested = False | ||||||||
| self._acp_internal_rerun_requested = False | ||||||||
| self._conversation.interrupt() | ||||||||
| # Wait for the run task to finish so we can publish the final | ||||||||
| # state update (PAUSED + InterruptEvent) cleanly. | ||||||||
| if self._run_task is not None and not self._run_task.done(): | ||||||||
| with suppress(Exception): | ||||||||
| await asyncio.wait_for(self._run_task, timeout=5.0) | ||||||||
| await asyncio.wait_for(asyncio.shield(self._run_task), timeout=5.0) | ||||||||
| # Only clear _run_task if it actually finished; if | ||||||||
| # wait_for timed out the task may still be running and | ||||||||
| # clearing prematurely would allow a second run() to | ||||||||
|
|
@@ -912,6 +1040,10 @@ async def set_security_analyzer( | |||||||
| ) | ||||||||
|
|
||||||||
| async def close(self): | ||||||||
| self._closing = True | ||||||||
| self._explicit_interrupt_generation += 1 | ||||||||
| self._rerun_requested = False | ||||||||
| self._acp_internal_rerun_requested = False | ||||||||
| if self._lease_task is not None: | ||||||||
| self._lease_task.cancel() | ||||||||
| with suppress(asyncio.CancelledError): | ||||||||
|
|
||||||||
Uh oh!
There was an error while loading. Please reload this page.