From 490b626151bcda2b41a6e18fee5f31e200eb0098 Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 21 May 2026 18:41:42 +0000 Subject: [PATCH 1/6] Fix async error path in acompletion/aresponses to invoke fallback/mapping/telemetry In acompletion() and aresponses(), the async retry loop was outside the try/except that calls _ahandle_error. This meant that on retry exhaustion or non-retryable provider errors, the async path: - Never invoked fallback_strategy (backup LLM silently skipped) - Never mapped litellm exceptions to SDK types via map_provider_exception - Never called telemetry.on_error Most critically, ContextWindowExceededError escaped unmapped so the agent's astep() recovery (except LLMContextWindowExceedError -> CondensationRequest) never triggered, erroring out the conversation. Fix: wrap the async retry loop inside the same try/except scope as the sync path, for both acompletion and aresponses. Co-authored-by: openhands --- openhands-sdk/openhands/sdk/llm/llm.py | 285 +++++++++++++------------ tests/sdk/llm/test_llm_fallback.py | 122 ++++++++++- 2 files changed, 267 insertions(+), 140 deletions(-) diff --git a/openhands-sdk/openhands/sdk/llm/llm.py b/openhands-sdk/openhands/sdk/llm/llm.py index 3bee201e0b..9a58185dc2 100644 --- a/openhands-sdk/openhands/sdk/llm/llm.py +++ b/openhands-sdk/openhands/sdk/llm/llm.py @@ -992,40 +992,40 @@ async def acompletion( if tools and not use_native_fc: telemetry_ctx["raw_messages"] = original_fncall_msgs - resp: ModelResponse | None = None - async for attempt in self.async_retry( - num_retries=self.num_retries, - retry_exceptions=LLM_RETRY_EXCEPTIONS, - retry_min_wait=self.retry_min_wait, - retry_max_wait=self.retry_max_wait, - retry_multiplier=self.retry_multiplier, - retry_listener=self._retry_listener_fn, - ): - with attempt: - assert self._telemetry is not None - self._telemetry.on_request(telemetry_ctx=telemetry_ctx) - resp = await self._atransport_call( - messages=formatted_messages, - **call_kwargs, - enable_streaming=enable_streaming, - on_token=on_token, - ) - raw_resp: ModelResponse | None = None - if use_mock_tools: - raw_resp = copy.deepcopy(resp) - resp = self.post_response_prompt_mock( - resp, - nonfncall_msgs=formatted_messages, - tools=cc_tools, - include_security_params=add_security_risk_prediction, - ) - self._telemetry.on_response(resp, raw_resp=raw_resp) - if not resp.get("choices") or len(resp["choices"]) < 1: - raise LLMNoResponseError( - "Response choices is less than 1. Response: " + str(resp) + try: + resp: ModelResponse | None = None + async for attempt in self.async_retry( + num_retries=self.num_retries, + retry_exceptions=LLM_RETRY_EXCEPTIONS, + retry_min_wait=self.retry_min_wait, + retry_max_wait=self.retry_max_wait, + retry_multiplier=self.retry_multiplier, + retry_listener=self._retry_listener_fn, + ): + with attempt: + assert self._telemetry is not None + self._telemetry.on_request(telemetry_ctx=telemetry_ctx) + resp = await self._atransport_call( + messages=formatted_messages, + **call_kwargs, + enable_streaming=enable_streaming, + on_token=on_token, ) + raw_resp: ModelResponse | None = None + if use_mock_tools: + raw_resp = copy.deepcopy(resp) + resp = self.post_response_prompt_mock( + resp, + nonfncall_msgs=formatted_messages, + tools=cc_tools, + include_security_params=add_security_risk_prediction, + ) + self._telemetry.on_response(resp, raw_resp=raw_resp) + if not resp.get("choices") or len(resp["choices"]) < 1: + raise LLMNoResponseError( + "Response choices is less than 1. Response: " + str(resp) + ) - try: assert resp is not None first_choice = resp["choices"][0] message = Message.from_llm_chat_message(first_choice["message"]) @@ -1334,116 +1334,123 @@ async def aresponses( } ) - completed: ResponsesAPIResponse | None = None - async for attempt in self.async_retry( - num_retries=self.num_retries, - retry_exceptions=LLM_RETRY_EXCEPTIONS, - retry_min_wait=self.retry_min_wait, - retry_max_wait=self.retry_max_wait, - retry_multiplier=self.retry_multiplier, - retry_listener=self._retry_listener_fn, - ): - with attempt: - assert self._telemetry is not None - self._telemetry.on_request(telemetry_ctx=telemetry_ctx) - final_kwargs = {**call_kwargs} - with self._litellm_modify_params_ctx(self.modify_params): - with warnings.catch_warnings(): - warnings.filterwarnings("ignore", category=DeprecationWarning) - typed_input: ResponseInputParam | str = ( - cast(ResponseInputParam, input_items) if input_items else "" - ) - api_key_value = self._get_litellm_api_key_value() - - ret = await litellm_aresponses( - model=self.model, - input=typed_input, - instructions=instructions, - tools=resp_tools, - api_key=api_key_value, - api_base=self.base_url, - api_version=self.api_version, - timeout=self.timeout, - drop_params=self.drop_params, - seed=self.seed, - **{**self._aws_kwargs(), **final_kwargs}, - ) - if isinstance(ret, ResponsesAPIResponse): - if user_enable_streaming: - logger.warning( - "Responses streaming was requested, but the " - "provider returned a non-streaming response; " - "no on_token deltas will be emitted." - ) - self._telemetry.on_response(ret) - completed = ret - elif final_kwargs.get("stream", False): - if not isinstance(ret, ResponsesAPIStreamingIterator): - raise AssertionError( - "Expected Responses async stream " - f"iterator, got {type(ret)}" - ) + try: + completed: ResponsesAPIResponse | None = None + async for attempt in self.async_retry( + num_retries=self.num_retries, + retry_exceptions=LLM_RETRY_EXCEPTIONS, + retry_min_wait=self.retry_min_wait, + retry_max_wait=self.retry_max_wait, + retry_multiplier=self.retry_multiplier, + retry_listener=self._retry_listener_fn, + ): + with attempt: + assert self._telemetry is not None + self._telemetry.on_request(telemetry_ctx=telemetry_ctx) + final_kwargs = {**call_kwargs} + with self._litellm_modify_params_ctx(self.modify_params): + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", category=DeprecationWarning + ) + typed_input: ResponseInputParam | str = ( + cast(ResponseInputParam, input_items) + if input_items + else "" + ) + api_key_value = self._get_litellm_api_key_value() + + ret = await litellm_aresponses( + model=self.model, + input=typed_input, + instructions=instructions, + tools=resp_tools, + api_key=api_key_value, + api_base=self.base_url, + api_version=self.api_version, + timeout=self.timeout, + drop_params=self.drop_params, + seed=self.seed, + **{**self._aws_kwargs(), **final_kwargs}, + ) + if isinstance(ret, ResponsesAPIResponse): + if user_enable_streaming: + logger.warning( + "Responses streaming was requested, " + "but the provider returned a " + "non-streaming response; no on_token " + "deltas will be emitted." + ) + self._telemetry.on_response(ret) + completed = ret + elif final_kwargs.get("stream", False): + if not isinstance(ret, ResponsesAPIStreamingIterator): + raise AssertionError( + "Expected Responses async stream " + f"iterator, got {type(ret)}" + ) - stream_cb = on_token if user_enable_streaming else None - collected_output_items: list[Any] = [] - async for event in ret: - if event is None: - continue - evt_type = getattr(event, "type", None) - if ( - evt_type - == ResponsesAPIStreamEvents.OUTPUT_ITEM_DONE - ): - item = getattr(event, "item", None) - if item is not None: - collected_output_items.append(item) - if stream_cb is None: - continue - if isinstance( - event, - ( - OutputTextDeltaEvent, - RefusalDeltaEvent, - ReasoningSummaryTextDeltaEvent, - ), + stream_cb = on_token if user_enable_streaming else None + collected_output_items: list[Any] = [] + async for event in ret: + if event is None: + continue + evt_type = getattr(event, "type", None) + if ( + evt_type + == ResponsesAPIStreamEvents.OUTPUT_ITEM_DONE + ): + item = getattr(event, "item", None) + if item is not None: + collected_output_items.append(item) + if stream_cb is None: + continue + if isinstance( + event, + ( + OutputTextDeltaEvent, + RefusalDeltaEvent, + ReasoningSummaryTextDeltaEvent, + ), + ): + delta = event.delta + if delta: + await _invoke_token_callback( + stream_cb, + ModelResponseStream( + choices=[ + StreamingChoices( + delta=Delta(content=delta) + ) + ] + ), + ) + + completed_event = ret.completed_response + if completed_event is None: + raise LLMNoResponseError( + "Responses stream finished without " + "a completed response" + ) + if not isinstance( + completed_event, ResponseCompletedEvent ): - delta = event.delta - if delta: - await _invoke_token_callback( - stream_cb, - ModelResponseStream( - choices=[ - StreamingChoices( - delta=Delta(content=delta) - ) - ] - ), - ) - - completed_event = ret.completed_response - if completed_event is None: - raise LLMNoResponseError( - "Responses stream finished without a " - "completed response" - ) - if not isinstance(completed_event, ResponseCompletedEvent): - raise LLMNoResponseError( - "Unexpected completed event: " - f"{type(completed_event)}" - ) + raise LLMNoResponseError( + "Unexpected completed event: " + f"{type(completed_event)}" + ) - completed_resp = completed_event.response - if not completed_resp.output and collected_output_items: - completed_resp.output = collected_output_items + completed_resp = completed_event.response + if not completed_resp.output and collected_output_items: + completed_resp.output = collected_output_items - self._telemetry.on_response(completed_resp) - completed = completed_resp - else: - raise AssertionError( - f"Expected ResponsesAPIResponse, got {type(ret)}" - ) + self._telemetry.on_response(completed_resp) + completed = completed_resp + else: + raise AssertionError( + f"Expected ResponsesAPIResponse, got {type(ret)}" + ) - try: assert completed is not None output_seq = cast(Sequence[Any], completed.output or []) message = Message.from_llm_responses_output(output_seq) @@ -1454,7 +1461,9 @@ async def aresponses( accumulated_token_usage=self.metrics.accumulated_token_usage, ) return LLMResponse( - message=message, metrics=metrics_snapshot, raw_response=completed + message=message, + metrics=metrics_snapshot, + raw_response=completed, ) except Exception as e: _fb_token = cast("TokenCallbackType | None", on_token) diff --git a/tests/sdk/llm/test_llm_fallback.py b/tests/sdk/llm/test_llm_fallback.py index 9758d65a02..baba3ed714 100644 --- a/tests/sdk/llm/test_llm_fallback.py +++ b/tests/sdk/llm/test_llm_fallback.py @@ -1,8 +1,9 @@ -from unittest.mock import patch +from unittest.mock import AsyncMock, patch import pytest from litellm.exceptions import ( APIConnectionError, + ContextWindowExceededError, RateLimitError, ) from litellm.types.utils import ( @@ -14,7 +15,10 @@ from pydantic import SecretStr from openhands.sdk.llm import LLM, FallbackStrategy, Message, TextContent -from openhands.sdk.llm.exceptions import LLMServiceUnavailableError +from openhands.sdk.llm.exceptions import ( + LLMContextWindowExceedError, + LLMServiceUnavailableError, +) def _get_mock_response(content: str = "ok", model: str = "gpt-4o") -> ModelResponse: @@ -305,3 +309,117 @@ def side_effect(**kwargs): content = resp.message.content[0] assert isinstance(content, TextContent) assert content.text == "from store" + + +# ========================================================================= +# Async error-handling parity tests (acompletion / aresponses) +# ========================================================================= + + +@patch("openhands.sdk.llm.llm.litellm_completion") +@patch("openhands.sdk.llm.llm.litellm_acompletion", new_callable=AsyncMock) +@pytest.mark.asyncio +async def test_acompletion_fallback_on_transport_error(mock_acomp, mock_comp): + """acompletion must invoke fallback when the primary transport raises.""" + primary_error = APIConnectionError( + message="connection reset", llm_provider="openai", model="gpt-4o" + ) + mock_acomp.side_effect = primary_error + + # Fallback uses sync completion path + mock_comp.return_value = _get_mock_response("fallback ok", model="fallback-model") + + fb = _get_llm("fallback-model") + strategy = FallbackStrategy(fallback_llms=["fb-profile"]) + primary = _get_llm("gpt-4o", fallback_strategy=strategy) + _patch_resolve(primary, [fb]) + + resp = await primary.acompletion(_MSGS) + content = resp.message.content[0] + assert isinstance(content, TextContent) + assert content.text == "fallback ok" + + +@patch("openhands.sdk.llm.llm.litellm_acompletion", new_callable=AsyncMock) +@pytest.mark.asyncio +async def test_acompletion_maps_context_window_error(mock_acomp): + """acompletion must map ContextWindowExceededError to SDK type.""" + mock_acomp.side_effect = ContextWindowExceededError( + message="context window exceeded", + llm_provider="openai", + model="gpt-4o", + ) + primary = _get_llm("gpt-4o") + with pytest.raises(LLMContextWindowExceedError): + await primary.acompletion(_MSGS) + + +@patch("openhands.sdk.llm.llm.litellm_acompletion", new_callable=AsyncMock) +@pytest.mark.asyncio +async def test_acompletion_maps_connection_error(mock_acomp): + """acompletion must map APIConnectionError to LLMServiceUnavailableError.""" + mock_acomp.side_effect = APIConnectionError( + message="down", llm_provider="openai", model="gpt-4o" + ) + primary = _get_llm("gpt-4o") + with pytest.raises(LLMServiceUnavailableError): + await primary.acompletion(_MSGS) + + +@patch("openhands.sdk.llm.llm.litellm_responses") +@patch("openhands.sdk.llm.llm.litellm_aresponses", new_callable=AsyncMock) +@pytest.mark.asyncio +async def test_aresponses_fallback_on_transport_error(mock_aresp, mock_resp): + """aresponses must invoke fallback when the primary transport raises.""" + from litellm.types.llms.openai import ResponsesAPIResponse + + primary_error = APIConnectionError( + message="down", llm_provider="openai", model="gpt-4o" + ) + mock_aresp.side_effect = primary_error + + fallback_response = ResponsesAPIResponse( + id="resp-fb", + created_at=1, + model="fb", + object="response", + output=[ + { + "type": "message", + "id": "msg-1", + "role": "assistant", + "status": "completed", + "content": [ + {"type": "output_text", "text": "fb ok", "annotations": []} + ], + } + ], + parallel_tool_calls=False, + tool_choice="auto", + tools=[], + ) + mock_resp.return_value = fallback_response + + fb = _get_llm("fb") + strategy = FallbackStrategy(fallback_llms=["fb-profile"]) + primary = _get_llm("gpt-4o", fallback_strategy=strategy) + _patch_resolve(primary, [fb]) + + resp = await primary.aresponses(_MSGS) + content = resp.message.content[0] + assert isinstance(content, TextContent) + assert content.text == "fb ok" + + +@patch("openhands.sdk.llm.llm.litellm_aresponses", new_callable=AsyncMock) +@pytest.mark.asyncio +async def test_aresponses_maps_context_window_error(mock_aresp): + """aresponses must map ContextWindowExceededError to SDK type.""" + mock_aresp.side_effect = ContextWindowExceededError( + message="context window exceeded", + llm_provider="openai", + model="gpt-4o", + ) + primary = _get_llm("gpt-4o") + with pytest.raises(LLMContextWindowExceedError): + await primary.aresponses(_MSGS) From 5c3e6cbb45451de4751c7d499542b9c55f67d277 Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 21 May 2026 19:16:06 +0000 Subject: [PATCH 2/6] Address review: move @pytest.mark.asyncio outermost, add aresponses connection error test - Move @pytest.mark.asyncio above @patch decorators on all async tests for conventional ordering and readability - Add test_aresponses_maps_connection_error for full mapping parity between acompletion and aresponses Co-authored-by: openhands --- tests/sdk/llm/test_llm_fallback.py | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/tests/sdk/llm/test_llm_fallback.py b/tests/sdk/llm/test_llm_fallback.py index baba3ed714..66d14bd1f2 100644 --- a/tests/sdk/llm/test_llm_fallback.py +++ b/tests/sdk/llm/test_llm_fallback.py @@ -316,9 +316,9 @@ def side_effect(**kwargs): # ========================================================================= +@pytest.mark.asyncio @patch("openhands.sdk.llm.llm.litellm_completion") @patch("openhands.sdk.llm.llm.litellm_acompletion", new_callable=AsyncMock) -@pytest.mark.asyncio async def test_acompletion_fallback_on_transport_error(mock_acomp, mock_comp): """acompletion must invoke fallback when the primary transport raises.""" primary_error = APIConnectionError( @@ -340,8 +340,8 @@ async def test_acompletion_fallback_on_transport_error(mock_acomp, mock_comp): assert content.text == "fallback ok" -@patch("openhands.sdk.llm.llm.litellm_acompletion", new_callable=AsyncMock) @pytest.mark.asyncio +@patch("openhands.sdk.llm.llm.litellm_acompletion", new_callable=AsyncMock) async def test_acompletion_maps_context_window_error(mock_acomp): """acompletion must map ContextWindowExceededError to SDK type.""" mock_acomp.side_effect = ContextWindowExceededError( @@ -354,8 +354,8 @@ async def test_acompletion_maps_context_window_error(mock_acomp): await primary.acompletion(_MSGS) -@patch("openhands.sdk.llm.llm.litellm_acompletion", new_callable=AsyncMock) @pytest.mark.asyncio +@patch("openhands.sdk.llm.llm.litellm_acompletion", new_callable=AsyncMock) async def test_acompletion_maps_connection_error(mock_acomp): """acompletion must map APIConnectionError to LLMServiceUnavailableError.""" mock_acomp.side_effect = APIConnectionError( @@ -366,9 +366,9 @@ async def test_acompletion_maps_connection_error(mock_acomp): await primary.acompletion(_MSGS) +@pytest.mark.asyncio @patch("openhands.sdk.llm.llm.litellm_responses") @patch("openhands.sdk.llm.llm.litellm_aresponses", new_callable=AsyncMock) -@pytest.mark.asyncio async def test_aresponses_fallback_on_transport_error(mock_aresp, mock_resp): """aresponses must invoke fallback when the primary transport raises.""" from litellm.types.llms.openai import ResponsesAPIResponse @@ -411,8 +411,8 @@ async def test_aresponses_fallback_on_transport_error(mock_aresp, mock_resp): assert content.text == "fb ok" -@patch("openhands.sdk.llm.llm.litellm_aresponses", new_callable=AsyncMock) @pytest.mark.asyncio +@patch("openhands.sdk.llm.llm.litellm_aresponses", new_callable=AsyncMock) async def test_aresponses_maps_context_window_error(mock_aresp): """aresponses must map ContextWindowExceededError to SDK type.""" mock_aresp.side_effect = ContextWindowExceededError( @@ -423,3 +423,15 @@ async def test_aresponses_maps_context_window_error(mock_aresp): primary = _get_llm("gpt-4o") with pytest.raises(LLMContextWindowExceedError): await primary.aresponses(_MSGS) + + +@pytest.mark.asyncio +@patch("openhands.sdk.llm.llm.litellm_aresponses", new_callable=AsyncMock) +async def test_aresponses_maps_connection_error(mock_aresp): + """aresponses must map APIConnectionError to LLMServiceUnavailableError.""" + mock_aresp.side_effect = APIConnectionError( + message="down", llm_provider="openai", model="gpt-4o" + ) + primary = _get_llm("gpt-4o") + with pytest.raises(LLMServiceUnavailableError): + await primary.aresponses(_MSGS) From 6bafea9d1214de2cc8bffb6d0d9b9c7773ee75a8 Mon Sep 17 00:00:00 2001 From: Rohit Malhotra Date: Fri, 22 May 2026 07:48:39 -0400 Subject: [PATCH 3/6] Update openhands-sdk/openhands/sdk/llm/llm.py Co-authored-by: Vasco Schiavo <115561717+VascoSch92@users.noreply.github.com> --- openhands-sdk/openhands/sdk/llm/llm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openhands-sdk/openhands/sdk/llm/llm.py b/openhands-sdk/openhands/sdk/llm/llm.py index 9a58185dc2..2e48f9edf0 100644 --- a/openhands-sdk/openhands/sdk/llm/llm.py +++ b/openhands-sdk/openhands/sdk/llm/llm.py @@ -1463,7 +1463,7 @@ async def aresponses( return LLMResponse( message=message, metrics=metrics_snapshot, - raw_response=completed, + raw_response=completed ) except Exception as e: _fb_token = cast("TokenCallbackType | None", on_token) From 79f19b72af31944a008376d6d456c9d062ceaa9a Mon Sep 17 00:00:00 2001 From: VascoSch92 Date: Mon, 25 May 2026 20:19:18 +0200 Subject: [PATCH 4/6] Address review: merge nested with-statements and use guard clauses in streaming loop Merge the litellm_modify_params/catch_warnings context managers into a single parenthesized with across aresponses, sync responses, and both transport calls for consistency. Convert the aresponses streaming delta checks to continue guard clauses to reduce nesting. --- openhands-sdk/openhands/sdk/llm/llm.py | 595 ++++++++++++------------- 1 file changed, 296 insertions(+), 299 deletions(-) diff --git a/openhands-sdk/openhands/sdk/llm/llm.py b/openhands-sdk/openhands/sdk/llm/llm.py index 2e48f9edf0..bf11af7ae1 100644 --- a/openhands-sdk/openhands/sdk/llm/llm.py +++ b/openhands-sdk/openhands/sdk/llm/llm.py @@ -1143,106 +1143,104 @@ def _one_attempt(**retry_kwargs) -> ResponsesAPIResponse: assert self._telemetry is not None self._telemetry.on_request(telemetry_ctx=telemetry_ctx) final_kwargs = {**call_kwargs, **retry_kwargs} - with self._litellm_modify_params_ctx(self.modify_params): - with warnings.catch_warnings(): - warnings.filterwarnings("ignore", category=DeprecationWarning) - typed_input: ResponseInputParam | str = ( - cast(ResponseInputParam, input_items) if input_items else "" - ) - api_key_value = self._get_litellm_api_key_value() - - ret = litellm_responses( - model=self.model, - input=typed_input, - instructions=instructions, - tools=resp_tools, - api_key=api_key_value, - api_base=self.base_url, - api_version=self.api_version, - timeout=self.timeout, - drop_params=self.drop_params, - seed=self.seed, - **{**self._aws_kwargs(), **final_kwargs}, - ) - if isinstance(ret, ResponsesAPIResponse): - if user_enable_streaming: - logger.warning( - "Responses streaming was requested, but the provider " - "returned a non-streaming response; no on_token deltas " - "will be emitted." - ) - self._telemetry.on_response(ret) - return ret - - # When stream=True, LiteLLM returns a streaming iterator rather than - # a single ResponsesAPIResponse. Drain the iterator and use the - # completed response. - if final_kwargs.get("stream", False): - if not isinstance(ret, SyncResponsesAPIStreamingIterator): - raise AssertionError( - f"Expected Responses stream iterator, got {type(ret)}" - ) + with ( + self._litellm_modify_params_ctx(self.modify_params), + warnings.catch_warnings(), + ): + warnings.filterwarnings("ignore", category=DeprecationWarning) + typed_input: ResponseInputParam | str = ( + cast(ResponseInputParam, input_items) if input_items else "" + ) + api_key_value = self._get_litellm_api_key_value() - stream_callback = on_token if user_enable_streaming else None - # Collect output items from streaming events. - # Some endpoints (e.g., Codex subscription) send output - # items as separate events but the final response.completed - # event has output=[]. We accumulate them here and patch - # the completed response if needed. - collected_output_items: list[Any] = [] - for event in ret: - if event is None: - continue - # Collect finished output items - evt_type = getattr(event, "type", None) - if evt_type == ResponsesAPIStreamEvents.OUTPUT_ITEM_DONE: - item = getattr(event, "item", None) - if item is not None: - collected_output_items.append(item) - if stream_callback is None: - continue - if isinstance( - event, - ( - OutputTextDeltaEvent, - RefusalDeltaEvent, - ReasoningSummaryTextDeltaEvent, - ), - ): - delta = event.delta - if delta: - stream_callback( - ModelResponseStream( - choices=[ - StreamingChoices( - delta=Delta(content=delta) - ) - ] - ) + ret = litellm_responses( + model=self.model, + input=typed_input, + instructions=instructions, + tools=resp_tools, + api_key=api_key_value, + api_base=self.base_url, + api_version=self.api_version, + timeout=self.timeout, + drop_params=self.drop_params, + seed=self.seed, + **{**self._aws_kwargs(), **final_kwargs}, + ) + if isinstance(ret, ResponsesAPIResponse): + if user_enable_streaming: + logger.warning( + "Responses streaming was requested, but the provider " + "returned a non-streaming response; no on_token deltas " + "will be emitted." + ) + self._telemetry.on_response(ret) + return ret + + # When stream=True, LiteLLM returns a streaming iterator rather than + # a single ResponsesAPIResponse. Drain the iterator and use the + # completed response. + if final_kwargs.get("stream", False): + if not isinstance(ret, SyncResponsesAPIStreamingIterator): + raise AssertionError( + f"Expected Responses stream iterator, got {type(ret)}" + ) + + stream_callback = on_token if user_enable_streaming else None + # Collect output items from streaming events. + # Some endpoints (e.g., Codex subscription) send output + # items as separate events but the final response.completed + # event has output=[]. We accumulate them here and patch + # the completed response if needed. + collected_output_items: list[Any] = [] + for event in ret: + if event is None: + continue + # Collect finished output items + evt_type = getattr(event, "type", None) + if evt_type == ResponsesAPIStreamEvents.OUTPUT_ITEM_DONE: + item = getattr(event, "item", None) + if item is not None: + collected_output_items.append(item) + if stream_callback is None: + continue + if isinstance( + event, + ( + OutputTextDeltaEvent, + RefusalDeltaEvent, + ReasoningSummaryTextDeltaEvent, + ), + ): + delta = event.delta + if delta: + stream_callback( + ModelResponseStream( + choices=[ + StreamingChoices(delta=Delta(content=delta)) + ] ) + ) - completed_event = ret.completed_response - if completed_event is None: - raise LLMNoResponseError( - "Responses stream finished without a completed response" - ) - if not isinstance(completed_event, ResponseCompletedEvent): - raise LLMNoResponseError( - f"Unexpected completed event: {type(completed_event)}" - ) + completed_event = ret.completed_response + if completed_event is None: + raise LLMNoResponseError( + "Responses stream finished without a completed response" + ) + if not isinstance(completed_event, ResponseCompletedEvent): + raise LLMNoResponseError( + f"Unexpected completed event: {type(completed_event)}" + ) - completed_resp = completed_event.response + completed_resp = completed_event.response - # Patch empty output with items collected from stream - if not completed_resp.output and collected_output_items: - completed_resp.output = collected_output_items + # Patch empty output with items collected from stream + if not completed_resp.output and collected_output_items: + completed_resp.output = collected_output_items - self._telemetry.on_response(completed_resp) - return completed_resp + self._telemetry.on_response(completed_resp) + return completed_resp - raise AssertionError( - f"Expected ResponsesAPIResponse, got {type(ret)}" - ) + raise AssertionError(f"Expected ResponsesAPIResponse, got {type(ret)}") try: resp: ResponsesAPIResponse = _one_attempt() @@ -1348,108 +1346,105 @@ async def aresponses( assert self._telemetry is not None self._telemetry.on_request(telemetry_ctx=telemetry_ctx) final_kwargs = {**call_kwargs} - with self._litellm_modify_params_ctx(self.modify_params): - with warnings.catch_warnings(): - warnings.filterwarnings( - "ignore", category=DeprecationWarning - ) - typed_input: ResponseInputParam | str = ( - cast(ResponseInputParam, input_items) - if input_items - else "" - ) - api_key_value = self._get_litellm_api_key_value() - - ret = await litellm_aresponses( - model=self.model, - input=typed_input, - instructions=instructions, - tools=resp_tools, - api_key=api_key_value, - api_base=self.base_url, - api_version=self.api_version, - timeout=self.timeout, - drop_params=self.drop_params, - seed=self.seed, - **{**self._aws_kwargs(), **final_kwargs}, - ) - if isinstance(ret, ResponsesAPIResponse): - if user_enable_streaming: - logger.warning( - "Responses streaming was requested, " - "but the provider returned a " - "non-streaming response; no on_token " - "deltas will be emitted." - ) - self._telemetry.on_response(ret) - completed = ret - elif final_kwargs.get("stream", False): - if not isinstance(ret, ResponsesAPIStreamingIterator): - raise AssertionError( - "Expected Responses async stream " - f"iterator, got {type(ret)}" - ) - - stream_cb = on_token if user_enable_streaming else None - collected_output_items: list[Any] = [] - async for event in ret: - if event is None: - continue - evt_type = getattr(event, "type", None) - if ( - evt_type - == ResponsesAPIStreamEvents.OUTPUT_ITEM_DONE - ): - item = getattr(event, "item", None) - if item is not None: - collected_output_items.append(item) - if stream_cb is None: - continue - if isinstance( - event, - ( - OutputTextDeltaEvent, - RefusalDeltaEvent, - ReasoningSummaryTextDeltaEvent, - ), - ): - delta = event.delta - if delta: - await _invoke_token_callback( - stream_cb, - ModelResponseStream( - choices=[ - StreamingChoices( - delta=Delta(content=delta) - ) - ] - ), - ) + with ( + self._litellm_modify_params_ctx(self.modify_params), + warnings.catch_warnings(), + ): + warnings.filterwarnings("ignore", category=DeprecationWarning) + typed_input: ResponseInputParam | str = ( + cast(ResponseInputParam, input_items) if input_items else "" + ) + api_key_value = self._get_litellm_api_key_value() + + ret = await litellm_aresponses( + model=self.model, + input=typed_input, + instructions=instructions, + tools=resp_tools, + api_key=api_key_value, + api_base=self.base_url, + api_version=self.api_version, + timeout=self.timeout, + drop_params=self.drop_params, + seed=self.seed, + **{**self._aws_kwargs(), **final_kwargs}, + ) + if isinstance(ret, ResponsesAPIResponse): + if user_enable_streaming: + logger.warning( + "Responses streaming was requested, " + "but the provider returned a " + "non-streaming response; no on_token " + "deltas will be emitted." + ) + self._telemetry.on_response(ret) + completed = ret + elif final_kwargs.get("stream", False): + if not isinstance(ret, ResponsesAPIStreamingIterator): + raise AssertionError( + "Expected Responses async stream " + f"iterator, got {type(ret)}" + ) - completed_event = ret.completed_response - if completed_event is None: - raise LLMNoResponseError( - "Responses stream finished without " - "a completed response" - ) + stream_cb = on_token if user_enable_streaming else None + collected_output_items: list[Any] = [] + async for event in ret: + if event is None: + continue + evt_type = getattr(event, "type", None) + if ( + evt_type + == ResponsesAPIStreamEvents.OUTPUT_ITEM_DONE + ): + item = getattr(event, "item", None) + if item is not None: + collected_output_items.append(item) + if stream_cb is None: + continue if not isinstance( - completed_event, ResponseCompletedEvent + event, + ( + OutputTextDeltaEvent, + RefusalDeltaEvent, + ReasoningSummaryTextDeltaEvent, + ), ): - raise LLMNoResponseError( - "Unexpected completed event: " - f"{type(completed_event)}" - ) - - completed_resp = completed_event.response - if not completed_resp.output and collected_output_items: - completed_resp.output = collected_output_items + continue + if not event.delta: + continue + await _invoke_token_callback( + stream_cb, + ModelResponseStream( + choices=[ + StreamingChoices( + delta=Delta(content=event.delta) + ) + ] + ), + ) - self._telemetry.on_response(completed_resp) - completed = completed_resp - else: - raise AssertionError( - f"Expected ResponsesAPIResponse, got {type(ret)}" + completed_event = ret.completed_response + if completed_event is None: + raise LLMNoResponseError( + "Responses stream finished without " + "a completed response" ) + if not isinstance(completed_event, ResponseCompletedEvent): + raise LLMNoResponseError( + "Unexpected completed event: " + f"{type(completed_event)}" + ) + + completed_resp = completed_event.response + if not completed_resp.output and collected_output_items: + completed_resp.output = collected_output_items + + self._telemetry.on_response(completed_resp) + completed = completed_resp + else: + raise AssertionError( + f"Expected ResponsesAPIResponse, got {type(ret)}" + ) assert completed is not None output_seq = cast(Sequence[Any], completed.output or []) @@ -1461,9 +1456,7 @@ async def aresponses( accumulated_token_usage=self.metrics.accumulated_token_usage, ) return LLMResponse( - message=message, - metrics=metrics_snapshot, - raw_response=completed + message=message, metrics=metrics_snapshot, raw_response=completed ) except Exception as e: _fb_token = cast("TokenCallbackType | None", on_token) @@ -1524,63 +1517,65 @@ def _transport_call( **kwargs, ) -> ModelResponse: # litellm.modify_params is GLOBAL; guard it for thread-safety - with self._litellm_modify_params_ctx(self.modify_params): - with warnings.catch_warnings(): - warnings.filterwarnings( - "ignore", category=DeprecationWarning, module="httpx.*" - ) - warnings.filterwarnings( - "ignore", - message=r".*content=.*upload.*", - category=DeprecationWarning, - ) - warnings.filterwarnings( - "ignore", - message=r"There is no current event loop", - category=DeprecationWarning, - ) - warnings.filterwarnings( - "ignore", - category=UserWarning, - ) - warnings.filterwarnings( - "ignore", - category=DeprecationWarning, - message="Accessing the 'model_fields' attribute.*", - ) - api_key_value = self._get_litellm_api_key_value() - - # When streaming, request usage in the final chunk so that - # detailed token breakdowns (prompt_tokens_details with - # cached_tokens, etc.) are not silently discarded by - # litellm's streaming handler. - if enable_streaming: - kwargs.setdefault("stream_options", {"include_usage": True}) - - # Some providers need renames handled in _normalize_call_kwargs. - ret = litellm_completion( - model=self.model, - api_key=api_key_value, - api_base=self.base_url, - api_version=self.api_version, - timeout=self.timeout, - drop_params=self.drop_params, - seed=self.seed, - messages=messages, - **{**self._aws_kwargs(), **kwargs}, - ) - if enable_streaming and on_token is not None: - assert isinstance(ret, CustomStreamWrapper) - chunks = [] - for chunk in ret: - on_token(chunk) - chunks.append(chunk) - ret = litellm.stream_chunk_builder(chunks, messages=messages) - - assert isinstance(ret, ModelResponse), ( - f"Expected ModelResponse, got {type(ret)}" - ) - return ret + with ( + self._litellm_modify_params_ctx(self.modify_params), + warnings.catch_warnings(), + ): + warnings.filterwarnings( + "ignore", category=DeprecationWarning, module="httpx.*" + ) + warnings.filterwarnings( + "ignore", + message=r".*content=.*upload.*", + category=DeprecationWarning, + ) + warnings.filterwarnings( + "ignore", + message=r"There is no current event loop", + category=DeprecationWarning, + ) + warnings.filterwarnings( + "ignore", + category=UserWarning, + ) + warnings.filterwarnings( + "ignore", + category=DeprecationWarning, + message="Accessing the 'model_fields' attribute.*", + ) + api_key_value = self._get_litellm_api_key_value() + + # When streaming, request usage in the final chunk so that + # detailed token breakdowns (prompt_tokens_details with + # cached_tokens, etc.) are not silently discarded by + # litellm's streaming handler. + if enable_streaming: + kwargs.setdefault("stream_options", {"include_usage": True}) + + # Some providers need renames handled in _normalize_call_kwargs. + ret = litellm_completion( + model=self.model, + api_key=api_key_value, + api_base=self.base_url, + api_version=self.api_version, + timeout=self.timeout, + drop_params=self.drop_params, + seed=self.seed, + messages=messages, + **{**self._aws_kwargs(), **kwargs}, + ) + if enable_streaming and on_token is not None: + assert isinstance(ret, CustomStreamWrapper) + chunks = [] + for chunk in ret: + on_token(chunk) + chunks.append(chunk) + ret = litellm.stream_chunk_builder(chunks, messages=messages) + + assert isinstance(ret, ModelResponse), ( + f"Expected ModelResponse, got {type(ret)}" + ) + return ret async def _atransport_call( self, @@ -1591,55 +1586,57 @@ async def _atransport_call( **kwargs, ) -> ModelResponse: """Async variant of :meth:`_transport_call`.""" - with self._litellm_modify_params_ctx(self.modify_params): - with warnings.catch_warnings(): - warnings.filterwarnings( - "ignore", category=DeprecationWarning, module="httpx.*" - ) - warnings.filterwarnings( - "ignore", - message=r".*content=.*upload.*", - category=DeprecationWarning, - ) - warnings.filterwarnings( - "ignore", - message=r"There is no current event loop", - category=DeprecationWarning, - ) - warnings.filterwarnings("ignore", category=UserWarning) - warnings.filterwarnings( - "ignore", - category=DeprecationWarning, - message="Accessing the 'model_fields' attribute.*", - ) - api_key_value = self._get_litellm_api_key_value() - - if enable_streaming: - kwargs.setdefault("stream_options", {"include_usage": True}) - - ret = await litellm_acompletion( - model=self.model, - api_key=api_key_value, - api_base=self.base_url, - api_version=self.api_version, - timeout=self.timeout, - drop_params=self.drop_params, - seed=self.seed, - messages=messages, - **{**self._aws_kwargs(), **kwargs}, - ) - if enable_streaming and on_token is not None: - assert isinstance(ret, CustomStreamWrapper) - chunks = [] - async for chunk in ret: - await _invoke_token_callback(on_token, chunk) - chunks.append(chunk) - ret = litellm.stream_chunk_builder(chunks, messages=messages) - - assert isinstance(ret, ModelResponse), ( - f"Expected ModelResponse, got {type(ret)}" - ) - return ret + with ( + self._litellm_modify_params_ctx(self.modify_params), + warnings.catch_warnings(), + ): + warnings.filterwarnings( + "ignore", category=DeprecationWarning, module="httpx.*" + ) + warnings.filterwarnings( + "ignore", + message=r".*content=.*upload.*", + category=DeprecationWarning, + ) + warnings.filterwarnings( + "ignore", + message=r"There is no current event loop", + category=DeprecationWarning, + ) + warnings.filterwarnings("ignore", category=UserWarning) + warnings.filterwarnings( + "ignore", + category=DeprecationWarning, + message="Accessing the 'model_fields' attribute.*", + ) + api_key_value = self._get_litellm_api_key_value() + + if enable_streaming: + kwargs.setdefault("stream_options", {"include_usage": True}) + + ret = await litellm_acompletion( + model=self.model, + api_key=api_key_value, + api_base=self.base_url, + api_version=self.api_version, + timeout=self.timeout, + drop_params=self.drop_params, + seed=self.seed, + messages=messages, + **{**self._aws_kwargs(), **kwargs}, + ) + if enable_streaming and on_token is not None: + assert isinstance(ret, CustomStreamWrapper) + chunks = [] + async for chunk in ret: + await _invoke_token_callback(on_token, chunk) + chunks.append(chunk) + ret = litellm.stream_chunk_builder(chunks, messages=messages) + + assert isinstance(ret, ModelResponse), ( + f"Expected ModelResponse, got {type(ret)}" + ) + return ret @contextmanager def _litellm_modify_params_ctx(self, flag: bool): From f066c22476a17cf76de2fdc2da450288e6dc1952 Mon Sep 17 00:00:00 2001 From: openhands Date: Mon, 25 May 2026 20:31:22 +0000 Subject: [PATCH 5/6] Move ResponsesAPIResponse import to top of file per PEP 8 Co-authored-by: openhands --- tests/sdk/llm/test_llm_fallback.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/sdk/llm/test_llm_fallback.py b/tests/sdk/llm/test_llm_fallback.py index 66d14bd1f2..30e92c4e26 100644 --- a/tests/sdk/llm/test_llm_fallback.py +++ b/tests/sdk/llm/test_llm_fallback.py @@ -12,6 +12,7 @@ ModelResponse, Usage, ) +from litellm.types.llms.openai import ResponsesAPIResponse from pydantic import SecretStr from openhands.sdk.llm import LLM, FallbackStrategy, Message, TextContent @@ -371,7 +372,6 @@ async def test_acompletion_maps_connection_error(mock_acomp): @patch("openhands.sdk.llm.llm.litellm_aresponses", new_callable=AsyncMock) async def test_aresponses_fallback_on_transport_error(mock_aresp, mock_resp): """aresponses must invoke fallback when the primary transport raises.""" - from litellm.types.llms.openai import ResponsesAPIResponse primary_error = APIConnectionError( message="down", llm_provider="openai", model="gpt-4o" From 1e564c517951922da9c3d515544ac9b3c53e289b Mon Sep 17 00:00:00 2001 From: openhands Date: Mon, 25 May 2026 20:36:52 +0000 Subject: [PATCH 6/6] Auto-format: reorder litellm imports (Ruff) Co-authored-by: openhands --- tests/sdk/llm/test_llm_fallback.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/sdk/llm/test_llm_fallback.py b/tests/sdk/llm/test_llm_fallback.py index 30e92c4e26..a2c3f3d1f1 100644 --- a/tests/sdk/llm/test_llm_fallback.py +++ b/tests/sdk/llm/test_llm_fallback.py @@ -6,13 +6,13 @@ ContextWindowExceededError, RateLimitError, ) +from litellm.types.llms.openai import ResponsesAPIResponse from litellm.types.utils import ( Choices, Message as LiteLLMMessage, ModelResponse, Usage, ) -from litellm.types.llms.openai import ResponsesAPIResponse from pydantic import SecretStr from openhands.sdk.llm import LLM, FallbackStrategy, Message, TextContent