From 321d7ac9abfb3dc1ba0a4dea9082fcbf83cad768 Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 21 May 2026 20:08:01 +0000 Subject: [PATCH 1/6] Fix async retry losing temperature bump on LLMNoResponseError The async methods (acompletion, aresponses) used an AsyncRetrying 'async for' loop that has no wrapped function for tenacity to re-invoke with updated kwargs. The before_sleep callback wrote temperature=1.0 to retry_state.kwargs, but the loop body used a local call_kwargs that was never updated. Replace the AsyncRetrying loop with the same @retry decorator pattern used by the sync methods. Tenacity's decorator handles async functions natively and automatically threads retry_state.kwargs into the wrapped function, so the temperature bump works identically for both sync and async paths. Also deduplicate shared logic across completion/acompletion and responses/aresponses: - _current_metrics_snapshot(): replaces 4 identical MetricsSnapshot constructions - _prepare_completion_params(): shared setup (format messages, tools, mock tools, select_chat_options, telemetry ctx) - _prepare_responses_params(): shared setup for responses path - _validate_chat_response(): shared post-processing (mock tools, telemetry, empty-choices check) Removes the now-unused async_retry() method and AsyncRetrying import from RetryMixin. Co-authored-by: openhands --- openhands-sdk/openhands/sdk/llm/llm.py | 738 +++++++++--------- .../openhands/sdk/llm/utils/retry_mixin.py | 32 - tests/sdk/llm/test_llm_no_response_retry.py | 76 +- 3 files changed, 451 insertions(+), 395 deletions(-) diff --git a/openhands-sdk/openhands/sdk/llm/llm.py b/openhands-sdk/openhands/sdk/llm/llm.py index 3bee201e0b..c4645e9b1b 100644 --- a/openhands-sdk/openhands/sdk/llm/llm.py +++ b/openhands-sdk/openhands/sdk/llm/llm.py @@ -753,6 +753,189 @@ async def _ahandle_error( raise mapped from error raise + # ========================================================================= + # Shared helpers for completion / acompletion / responses / aresponses + # ========================================================================= + + def _current_metrics_snapshot(self) -> MetricsSnapshot: + """Snapshot current LLM metrics for an :class:`LLMResponse`.""" + return MetricsSnapshot( + model_name=self.metrics.model_name, + accumulated_cost=self.metrics.accumulated_cost, + max_budget_per_task=self.metrics.max_budget_per_task, + accumulated_token_usage=self.metrics.accumulated_token_usage, + ) + + def _prepare_completion_params( + self, + messages: list[Message], + tools: Sequence[ToolDefinition] | None, + add_security_risk_prediction: bool, + kwargs: dict[str, Any], + ) -> tuple[ + list[dict[str, Any]], + list[ChatCompletionToolParam], + bool, + dict[str, Any], + dict[str, Any], + ]: + """Shared setup for :meth:`completion` and :meth:`acompletion`. + + Returns: + (formatted_messages, cc_tools, use_mock_tools, call_kwargs, + telemetry_ctx) + """ + formatted_messages = self.format_messages_for_llm(messages) + use_native_fc = self.native_tool_calling + original_fncall_msgs = copy.deepcopy(formatted_messages) + + cc_tools: list[ChatCompletionToolParam] = [] + if tools: + cc_tools = [ + t.to_openai_tool( + add_security_risk_prediction=add_security_risk_prediction, + ) + for t in tools + ] + + use_mock_tools = self.should_mock_tool_calls(cc_tools) + if use_mock_tools: + formatted_messages, kwargs = self.pre_request_prompt_mock( + formatted_messages, + cc_tools or [], + kwargs, + include_security_params=add_security_risk_prediction, + ) + + kwargs["tools"] = cc_tools if (bool(cc_tools) and use_native_fc) else None + has_tools_flag = bool(cc_tools) and use_native_fc + call_kwargs = select_chat_options(self, kwargs, has_tools=has_tools_flag) + + # Always pass context_window so metrics are tracked even when + # logging is disabled. + assert self._telemetry is not None + telemetry_ctx: dict[str, Any] = { + "context_window": self.effective_max_input_tokens or 0 + } + if self._telemetry.log_enabled: + telemetry_ctx.update( + { + "messages": formatted_messages[:], + "tools": tools, + "kwargs": {k: v for k, v in call_kwargs.items()}, + } + ) + if tools and not use_native_fc: + telemetry_ctx["raw_messages"] = original_fncall_msgs + + return ( + formatted_messages, + cc_tools, + use_mock_tools, + call_kwargs, + telemetry_ctx, + ) + + def _prepare_responses_params( + self, + messages: list[Message], + tools: Sequence[ToolDefinition] | None, + include: list[str] | None, + store: bool | None, + add_security_risk_prediction: bool, + kwargs: dict[str, Any], + ) -> tuple[ + str | None, + list[dict[str, Any]], + list[Any] | None, + dict[str, Any], + dict[str, Any], + ]: + """Shared setup for :meth:`responses` and :meth:`aresponses`. + + Returns: + (instructions, input_items, resp_tools, call_kwargs, + telemetry_ctx) + """ + instructions, input_items = self.format_messages_for_responses(messages) + + # Responses path always supports function tools + resp_tools = ( + [ + t.to_responses_tool( + add_security_risk_prediction=add_security_risk_prediction, + ) + for t in tools + ] + if tools + else None + ) + + call_kwargs = select_responses_options( + self, kwargs, include=include, store=store + ) + + # Always pass context_window so metrics are tracked even when + # logging is disabled. + assert self._telemetry is not None + telemetry_ctx: dict[str, Any] = { + "context_window": self.effective_max_input_tokens or 0 + } + if self._telemetry.log_enabled: + telemetry_ctx.update( + { + "llm_path": "responses", + "instructions": instructions, + "input": input_items[:], + "tools": tools, + "kwargs": {k: v for k, v in call_kwargs.items()}, + } + ) + + return instructions, input_items, resp_tools, call_kwargs, telemetry_ctx + + def _validate_chat_response( + self, + resp: ModelResponse, + *, + use_mock_tools: bool, + formatted_messages: list[dict[str, Any]], + cc_tools: list[ChatCompletionToolParam], + add_security_risk_prediction: bool, + ) -> tuple[ModelResponse, ModelResponse | None]: + """Post-process a chat completion response inside the retry boundary. + + Returns ``(resp, raw_resp)`` where *raw_resp* is non-``None`` only + when mock-tool post-processing was applied. + + Raises: + LLMNoResponseError: If the response has no choices + (Gemini sometimes returns empty choices; raising here + inside the retry boundary ensures it is retried). + """ + raw_resp: ModelResponse | None = None + if use_mock_tools: + raw_resp = copy.deepcopy(resp) + resp = self.post_response_prompt_mock( + resp, + nonfncall_msgs=formatted_messages, + tools=cc_tools, + include_security_params=add_security_risk_prediction, + ) + + assert self._telemetry is not None + self._telemetry.on_response(resp, raw_resp=raw_resp) + + if not resp.get("choices") or len(resp["choices"]) < 1: + raise LLMNoResponseError( + "Response choices is less than 1. Response: " + str(resp) + ) + return resp, raw_resp + + # ========================================================================= + # Chat Completion API + # ========================================================================= + def completion( self, messages: list[Message], @@ -800,61 +983,16 @@ def completion( raise ValueError("Streaming requires an on_token callback") kwargs["stream"] = True - # 1) serialize messages - formatted_messages = self.format_messages_for_llm(messages) - - # 2) choose function-calling strategy - use_native_fc = self.native_tool_calling - original_fncall_msgs = copy.deepcopy(formatted_messages) - - # Convert Tool objects to ChatCompletionToolParam once here - cc_tools: list[ChatCompletionToolParam] = [] - if tools: - cc_tools = [ - t.to_openai_tool( - add_security_risk_prediction=add_security_risk_prediction, - ) - for t in tools - ] - - use_mock_tools = self.should_mock_tool_calls(cc_tools) - if use_mock_tools: - logger.debug( - "LLM.completion: mocking function-calling via prompt " - f"for model {self.model}" - ) - formatted_messages, kwargs = self.pre_request_prompt_mock( - formatted_messages, - cc_tools or [], - kwargs, - include_security_params=add_security_risk_prediction, - ) - - # 3) normalize provider params - # Only pass tools when native FC is active - kwargs["tools"] = cc_tools if (bool(cc_tools) and use_native_fc) else None - has_tools_flag = bool(cc_tools) and use_native_fc - # Behavior-preserving: delegate to select_chat_options - call_kwargs = select_chat_options(self, kwargs, has_tools=has_tools_flag) - - # 4) request context for telemetry (always include context_window for metrics) - assert self._telemetry is not None - # Always pass context_window so metrics are tracked even when logging disabled - telemetry_ctx: dict[str, Any] = { - "context_window": self.effective_max_input_tokens or 0 - } - if self._telemetry.log_enabled: - telemetry_ctx.update( - { - "messages": formatted_messages[:], # already simple dicts - "tools": tools, - "kwargs": {k: v for k, v in call_kwargs.items()}, - } - ) - if tools and not use_native_fc: - telemetry_ctx["raw_messages"] = original_fncall_msgs + ( + formatted_messages, + cc_tools, + use_mock_tools, + call_kwargs, + telemetry_ctx, + ) = self._prepare_completion_params( + messages, tools, add_security_risk_prediction, kwargs + ) - # 5) do the call with retries @self.retry_decorator( num_retries=self.num_retries, retry_exceptions=LLM_RETRY_EXCEPTIONS, @@ -863,10 +1001,9 @@ def completion( retry_multiplier=self.retry_multiplier, retry_listener=self._retry_listener_fn, ) - def _one_attempt(**retry_kwargs) -> ModelResponse: + def _one_attempt(**retry_kwargs: Any) -> ModelResponse: assert self._telemetry is not None self._telemetry.on_request(telemetry_ctx=telemetry_ctx) - # Merge retry-modified kwargs (like temperature) with call_kwargs final_kwargs = {**call_kwargs, **retry_kwargs} resp = self._transport_call( messages=formatted_messages, @@ -874,46 +1011,23 @@ def _one_attempt(**retry_kwargs) -> ModelResponse: enable_streaming=enable_streaming, on_token=on_token, ) - raw_resp: ModelResponse | None = None - if use_mock_tools: - raw_resp = copy.deepcopy(resp) - resp = self.post_response_prompt_mock( - resp, - nonfncall_msgs=formatted_messages, - tools=cc_tools, - include_security_params=add_security_risk_prediction, - ) - # 6) telemetry - self._telemetry.on_response(resp, raw_resp=raw_resp) - - # Ensure at least one choice. - # Gemini sometimes returns empty choices; we raise LLMNoResponseError here - # inside the retry boundary so it is retried. - if not resp.get("choices") or len(resp["choices"]) < 1: - raise LLMNoResponseError( - "Response choices is less than 1. Response: " + str(resp) - ) - + resp, _ = self._validate_chat_response( + resp, + use_mock_tools=use_mock_tools, + formatted_messages=formatted_messages, + cc_tools=cc_tools, + add_security_risk_prediction=add_security_risk_prediction, + ) return resp try: resp = _one_attempt() - - # Convert the first choice to an OpenHands Message first_choice = resp["choices"][0] message = Message.from_llm_chat_message(first_choice["message"]) - - # Get current metrics snapshot - metrics_snapshot = MetricsSnapshot( - model_name=self.metrics.model_name, - accumulated_cost=self.metrics.accumulated_cost, - max_budget_per_task=self.metrics.max_budget_per_task, - accumulated_token_usage=self.metrics.accumulated_token_usage, - ) - - # Create and return LLMResponse return LLMResponse( - message=message, metrics=metrics_snapshot, raw_response=resp + message=message, + metrics=self._current_metrics_snapshot(), + raw_response=resp, ) except Exception as e: return self._handle_error( @@ -950,93 +1064,51 @@ async def acompletion( raise ValueError("Streaming requires an on_token callback") kwargs["stream"] = True - formatted_messages = self.format_messages_for_llm(messages) - - use_native_fc = self.native_tool_calling - original_fncall_msgs = copy.deepcopy(formatted_messages) - - cc_tools: list[ChatCompletionToolParam] = [] - if tools: - cc_tools = [ - t.to_openai_tool( - add_security_risk_prediction=add_security_risk_prediction, - ) - for t in tools - ] - - use_mock_tools = self.should_mock_tool_calls(cc_tools) - if use_mock_tools: - formatted_messages, kwargs = self.pre_request_prompt_mock( - formatted_messages, - cc_tools or [], - kwargs, - include_security_params=add_security_risk_prediction, - ) - - kwargs["tools"] = cc_tools if (bool(cc_tools) and use_native_fc) else None - has_tools_flag = bool(cc_tools) and use_native_fc - call_kwargs = select_chat_options(self, kwargs, has_tools=has_tools_flag) - - assert self._telemetry is not None - telemetry_ctx: dict[str, Any] = { - "context_window": self.effective_max_input_tokens or 0 - } - if self._telemetry.log_enabled: - telemetry_ctx.update( - { - "messages": formatted_messages[:], - "tools": tools, - "kwargs": {k: v for k, v in call_kwargs.items()}, - } - ) - if tools and not use_native_fc: - telemetry_ctx["raw_messages"] = original_fncall_msgs + ( + formatted_messages, + cc_tools, + use_mock_tools, + call_kwargs, + telemetry_ctx, + ) = self._prepare_completion_params( + messages, tools, add_security_risk_prediction, kwargs + ) - resp: ModelResponse | None = None - async for attempt in self.async_retry( + @self.retry_decorator( num_retries=self.num_retries, retry_exceptions=LLM_RETRY_EXCEPTIONS, retry_min_wait=self.retry_min_wait, retry_max_wait=self.retry_max_wait, retry_multiplier=self.retry_multiplier, retry_listener=self._retry_listener_fn, - ): - with attempt: - assert self._telemetry is not None - self._telemetry.on_request(telemetry_ctx=telemetry_ctx) - resp = await self._atransport_call( - messages=formatted_messages, - **call_kwargs, - enable_streaming=enable_streaming, - on_token=on_token, - ) - raw_resp: ModelResponse | None = None - if use_mock_tools: - raw_resp = copy.deepcopy(resp) - resp = self.post_response_prompt_mock( - resp, - nonfncall_msgs=formatted_messages, - tools=cc_tools, - include_security_params=add_security_risk_prediction, - ) - self._telemetry.on_response(resp, raw_resp=raw_resp) - if not resp.get("choices") or len(resp["choices"]) < 1: - raise LLMNoResponseError( - "Response choices is less than 1. Response: " + str(resp) - ) + ) + async def _one_attempt(**retry_kwargs: Any) -> ModelResponse: + assert self._telemetry is not None + self._telemetry.on_request(telemetry_ctx=telemetry_ctx) + final_kwargs = {**call_kwargs, **retry_kwargs} + resp = await self._atransport_call( + messages=formatted_messages, + **final_kwargs, + enable_streaming=enable_streaming, + on_token=on_token, + ) + resp, _ = self._validate_chat_response( + resp, + use_mock_tools=use_mock_tools, + formatted_messages=formatted_messages, + cc_tools=cc_tools, + add_security_risk_prediction=add_security_risk_prediction, + ) + return resp try: - assert resp is not None + resp = await _one_attempt() first_choice = resp["choices"][0] message = Message.from_llm_chat_message(first_choice["message"]) - metrics_snapshot = MetricsSnapshot( - model_name=self.metrics.model_name, - accumulated_cost=self.metrics.accumulated_cost, - max_budget_per_task=self.metrics.max_budget_per_task, - accumulated_token_usage=self.metrics.accumulated_token_usage, - ) return LLMResponse( - message=message, metrics=metrics_snapshot, raw_response=resp + message=message, + metrics=self._current_metrics_snapshot(), + raw_response=resp, ) except Exception as e: # Fallback is synchronous; cast the token callback since the @@ -1087,50 +1159,21 @@ def responses( """ user_enable_streaming = bool(kwargs.get("stream", False)) or self.stream if user_enable_streaming: + # We allow on_token to be None for subscription mode if on_token is None and not self.is_subscription: - # We allow on_token to be None for subscription mode raise ValueError("Streaming requires an on_token callback") kwargs["stream"] = True - # Build instructions + input list using dedicated Responses formatter - instructions, input_items = self.format_messages_for_responses(messages) - - # Convert Tool objects to Responses ToolParam - # (Responses path always supports function tools) - resp_tools = ( - [ - t.to_responses_tool( - add_security_risk_prediction=add_security_risk_prediction, - ) - for t in tools - ] - if tools - else None + ( + instructions, + input_items, + resp_tools, + call_kwargs, + telemetry_ctx, + ) = self._prepare_responses_params( + messages, tools, include, store, add_security_risk_prediction, kwargs ) - # Normalize/override Responses kwargs consistently - call_kwargs = select_responses_options( - self, kwargs, include=include, store=store - ) - - # Request context for telemetry (always include context_window for metrics) - assert self._telemetry is not None - # Always pass context_window so metrics are tracked even when logging disabled - telemetry_ctx: dict[str, Any] = { - "context_window": self.effective_max_input_tokens or 0 - } - if self._telemetry.log_enabled: - telemetry_ctx.update( - { - "llm_path": "responses", - "instructions": instructions, - "input": input_items[:], - "tools": tools, - "kwargs": {k: v for k, v in call_kwargs.items()}, - } - ) - - # Perform call with retries @self.retry_decorator( num_retries=self.num_retries, retry_exceptions=LLM_RETRY_EXCEPTIONS, @@ -1139,7 +1182,7 @@ def responses( retry_multiplier=self.retry_multiplier, retry_listener=self._retry_listener_fn, ) - def _one_attempt(**retry_kwargs) -> ResponsesAPIResponse: + def _one_attempt(**retry_kwargs: Any) -> ResponsesAPIResponse: assert self._telemetry is not None self._telemetry.on_request(telemetry_ctx=telemetry_ctx) final_kwargs = {**call_kwargs, **retry_kwargs} @@ -1174,9 +1217,9 @@ def _one_attempt(**retry_kwargs) -> ResponsesAPIResponse: self._telemetry.on_response(ret) return ret - # When stream=True, LiteLLM returns a streaming iterator rather than - # a single ResponsesAPIResponse. Drain the iterator and use the - # completed response. + # When stream=True, LiteLLM returns a streaming + # iterator rather than a single ResponsesAPIResponse. + # Drain the iterator and use the completed response. if final_kwargs.get("stream", False): if not isinstance(ret, SyncResponsesAPIStreamingIterator): raise AssertionError( @@ -1185,15 +1228,15 @@ def _one_attempt(**retry_kwargs) -> ResponsesAPIResponse: stream_callback = on_token if user_enable_streaming else None # Collect output items from streaming events. - # Some endpoints (e.g., Codex subscription) send output - # items as separate events but the final response.completed - # event has output=[]. We accumulate them here and patch - # the completed response if needed. + # Some endpoints (e.g., Codex subscription) send + # output items as separate events but the final + # response.completed event has output=[]. We + # accumulate them here and patch the completed + # response if needed. collected_output_items: list[Any] = [] for event in ret: if event is None: continue - # Collect finished output items evt_type = getattr(event, "type", None) if evt_type == ResponsesAPIStreamEvents.OUTPUT_ITEM_DONE: item = getattr(event, "item", None) @@ -1232,7 +1275,6 @@ def _one_attempt(**retry_kwargs) -> ResponsesAPIResponse: ) completed_resp = completed_event.response - # Patch empty output with items collected from stream if not completed_resp.output and collected_output_items: completed_resp.output = collected_output_items @@ -1246,22 +1288,12 @@ def _one_attempt(**retry_kwargs) -> ResponsesAPIResponse: try: resp: ResponsesAPIResponse = _one_attempt() - - # Parse output -> Message (typed) - # Cast to a typed sequence - # accepted by from_llm_responses_output output_seq = cast(Sequence[Any], resp.output or []) message = Message.from_llm_responses_output(output_seq) - - metrics_snapshot = MetricsSnapshot( - model_name=self.metrics.model_name, - accumulated_cost=self.metrics.accumulated_cost, - max_budget_per_task=self.metrics.max_budget_per_task, - accumulated_token_usage=self.metrics.accumulated_token_usage, - ) - return LLMResponse( - message=message, metrics=metrics_snapshot, raw_response=resp + message=message, + metrics=self._current_metrics_snapshot(), + raw_response=resp, ) except Exception as e: return self._handle_error( @@ -1298,163 +1330,145 @@ async def aresponses( """ user_enable_streaming = bool(kwargs.get("stream", False)) or self.stream if user_enable_streaming: + # We allow on_token to be None for subscription mode if on_token is None and not self.is_subscription: raise ValueError("Streaming requires an on_token callback") kwargs["stream"] = True - instructions, input_items = self.format_messages_for_responses(messages) - - resp_tools = ( - [ - t.to_responses_tool( - add_security_risk_prediction=add_security_risk_prediction, - ) - for t in tools - ] - if tools - else None + ( + instructions, + input_items, + resp_tools, + call_kwargs, + telemetry_ctx, + ) = self._prepare_responses_params( + messages, tools, include, store, add_security_risk_prediction, kwargs ) - call_kwargs = select_responses_options( - self, kwargs, include=include, store=store - ) - - assert self._telemetry is not None - telemetry_ctx: dict[str, Any] = { - "context_window": self.effective_max_input_tokens or 0 - } - if self._telemetry.log_enabled: - telemetry_ctx.update( - { - "llm_path": "responses", - "instructions": instructions, - "input": input_items[:], - "tools": tools, - "kwargs": {k: v for k, v in call_kwargs.items()}, - } - ) - - completed: ResponsesAPIResponse | None = None - async for attempt in self.async_retry( + @self.retry_decorator( num_retries=self.num_retries, retry_exceptions=LLM_RETRY_EXCEPTIONS, retry_min_wait=self.retry_min_wait, retry_max_wait=self.retry_max_wait, retry_multiplier=self.retry_multiplier, retry_listener=self._retry_listener_fn, - ): - with attempt: - assert self._telemetry is not None - self._telemetry.on_request(telemetry_ctx=telemetry_ctx) - final_kwargs = {**call_kwargs} - with self._litellm_modify_params_ctx(self.modify_params): - with warnings.catch_warnings(): - warnings.filterwarnings("ignore", category=DeprecationWarning) - typed_input: ResponseInputParam | str = ( - cast(ResponseInputParam, input_items) if input_items else "" - ) - api_key_value = self._get_litellm_api_key_value() - - ret = await litellm_aresponses( - model=self.model, - input=typed_input, - instructions=instructions, - tools=resp_tools, - api_key=api_key_value, - api_base=self.base_url, - api_version=self.api_version, - timeout=self.timeout, - drop_params=self.drop_params, - seed=self.seed, - **{**self._aws_kwargs(), **final_kwargs}, - ) - if isinstance(ret, ResponsesAPIResponse): - if user_enable_streaming: - logger.warning( - "Responses streaming was requested, but the " - "provider returned a non-streaming response; " - "no on_token deltas will be emitted." - ) - self._telemetry.on_response(ret) - completed = ret - elif final_kwargs.get("stream", False): - if not isinstance(ret, ResponsesAPIStreamingIterator): - raise AssertionError( - "Expected Responses async stream " - f"iterator, got {type(ret)}" - ) - - stream_cb = on_token if user_enable_streaming else None - collected_output_items: list[Any] = [] - async for event in ret: - if event is None: - continue - evt_type = getattr(event, "type", None) - if ( - evt_type - == ResponsesAPIStreamEvents.OUTPUT_ITEM_DONE - ): - item = getattr(event, "item", None) - if item is not None: - collected_output_items.append(item) - if stream_cb is None: - continue - if isinstance( - event, - ( - OutputTextDeltaEvent, - RefusalDeltaEvent, - ReasoningSummaryTextDeltaEvent, - ), - ): - delta = event.delta - if delta: - await _invoke_token_callback( - stream_cb, - ModelResponseStream( - choices=[ - StreamingChoices( - delta=Delta(content=delta) - ) - ] - ), - ) + ) + async def _one_attempt( + **retry_kwargs: Any, + ) -> ResponsesAPIResponse: + assert self._telemetry is not None + self._telemetry.on_request(telemetry_ctx=telemetry_ctx) + final_kwargs = {**call_kwargs, **retry_kwargs} + with self._litellm_modify_params_ctx(self.modify_params): + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=DeprecationWarning) + typed_input: ResponseInputParam | str = ( + cast(ResponseInputParam, input_items) if input_items else "" + ) + api_key_value = self._get_litellm_api_key_value() + + ret = await litellm_aresponses( + model=self.model, + input=typed_input, + instructions=instructions, + tools=resp_tools, + api_key=api_key_value, + api_base=self.base_url, + api_version=self.api_version, + timeout=self.timeout, + drop_params=self.drop_params, + seed=self.seed, + **{**self._aws_kwargs(), **final_kwargs}, + ) + if isinstance(ret, ResponsesAPIResponse): + if user_enable_streaming: + logger.warning( + "Responses streaming was requested, but the " + "provider returned a non-streaming response; " + "no on_token deltas will be emitted." + ) + self._telemetry.on_response(ret) + return ret - completed_event = ret.completed_response - if completed_event is None: - raise LLMNoResponseError( - "Responses stream finished without a " - "completed response" - ) - if not isinstance(completed_event, ResponseCompletedEvent): - raise LLMNoResponseError( - "Unexpected completed event: " - f"{type(completed_event)}" - ) - - completed_resp = completed_event.response - if not completed_resp.output and collected_output_items: - completed_resp.output = collected_output_items - - self._telemetry.on_response(completed_resp) - completed = completed_resp - else: + # When stream=True, LiteLLM returns a streaming + # iterator rather than a single ResponsesAPIResponse. + # Drain the iterator and use the completed response. + if final_kwargs.get("stream", False): + if not isinstance(ret, ResponsesAPIStreamingIterator): raise AssertionError( - f"Expected ResponsesAPIResponse, got {type(ret)}" + "Expected Responses async stream " + f"iterator, got {type(ret)}" + ) + + stream_cb = on_token if user_enable_streaming else None + # Collect output items from streaming events. + # Some endpoints (e.g., Codex subscription) send + # output items as separate events but the final + # response.completed event has output=[]. We + # accumulate them here and patch the completed + # response if needed. + collected_output_items: list[Any] = [] + async for event in ret: + if event is None: + continue + evt_type = getattr(event, "type", None) + if evt_type == ResponsesAPIStreamEvents.OUTPUT_ITEM_DONE: + item = getattr(event, "item", None) + if item is not None: + collected_output_items.append(item) + if stream_cb is None: + continue + if isinstance( + event, + ( + OutputTextDeltaEvent, + RefusalDeltaEvent, + ReasoningSummaryTextDeltaEvent, + ), + ): + delta = event.delta + if delta: + await _invoke_token_callback( + stream_cb, + ModelResponseStream( + choices=[ + StreamingChoices( + delta=Delta(content=delta) + ) + ] + ), + ) + + completed_event = ret.completed_response + if completed_event is None: + raise LLMNoResponseError( + "Responses stream finished without a completed response" + ) + if not isinstance(completed_event, ResponseCompletedEvent): + raise LLMNoResponseError( + f"Unexpected completed event: {type(completed_event)}" ) + completed_resp = completed_event.response + # Patch empty output with items collected from stream + if not completed_resp.output and collected_output_items: + completed_resp.output = collected_output_items + + self._telemetry.on_response(completed_resp) + return completed_resp + + raise AssertionError( + f"Expected ResponsesAPIResponse, got {type(ret)}" + ) + try: - assert completed is not None + completed: ResponsesAPIResponse = await _one_attempt() output_seq = cast(Sequence[Any], completed.output or []) message = Message.from_llm_responses_output(output_seq) - metrics_snapshot = MetricsSnapshot( - model_name=self.metrics.model_name, - accumulated_cost=self.metrics.accumulated_cost, - max_budget_per_task=self.metrics.max_budget_per_task, - accumulated_token_usage=self.metrics.accumulated_token_usage, - ) return LLMResponse( - message=message, metrics=metrics_snapshot, raw_response=completed + message=message, + metrics=self._current_metrics_snapshot(), + raw_response=completed, ) except Exception as e: _fb_token = cast("TokenCallbackType | None", on_token) diff --git a/openhands-sdk/openhands/sdk/llm/utils/retry_mixin.py b/openhands-sdk/openhands/sdk/llm/utils/retry_mixin.py index 768664703e..db39ddd387 100644 --- a/openhands-sdk/openhands/sdk/llm/utils/retry_mixin.py +++ b/openhands-sdk/openhands/sdk/llm/utils/retry_mixin.py @@ -2,7 +2,6 @@ from typing import Any, cast from tenacity import ( - AsyncRetrying, RetryCallState, retry, retry_if_exception_type, @@ -94,37 +93,6 @@ def retry_decorator( ) return retry_decorator - def async_retry( - self, - num_retries: int = 5, - retry_exceptions: tuple[type[BaseException], ...] = (LLMNoResponseError,), - retry_min_wait: int = 8, - retry_max_wait: int = 64, - retry_multiplier: float = 2.0, - retry_listener: RetryListener | None = None, - ) -> AsyncRetrying: - """Return an ``AsyncRetrying`` instance for use in ``async for`` blocks. - - Usage:: - - async for attempt in self.async_retry(...): - with attempt: - result = await _do_work() - """ - before_sleep = self._build_before_sleep(num_retries, retry_listener) - - return AsyncRetrying( - before_sleep=before_sleep, - stop=stop_after_attempt(num_retries), - reraise=True, - retry=retry_if_exception_type(retry_exceptions), - wait=wait_exponential( - multiplier=retry_multiplier, - min=retry_min_wait, - max=retry_max_wait, - ), - ) - def log_retry_attempt(self, retry_state: RetryCallState) -> None: """Log retry attempts.""" diff --git a/tests/sdk/llm/test_llm_no_response_retry.py b/tests/sdk/llm/test_llm_no_response_retry.py index c8f5809554..dba6cd71d1 100644 --- a/tests/sdk/llm/test_llm_no_response_retry.py +++ b/tests/sdk/llm/test_llm_no_response_retry.py @@ -1,4 +1,4 @@ -from unittest.mock import patch +from unittest.mock import AsyncMock, patch import pytest from litellm.types.utils import Choices, Message as LiteLLMMessage, ModelResponse, Usage @@ -106,3 +106,77 @@ def test_no_response_retry_bumps_temperature(mock_completion, base_llm: LLM) -> # Grab kwargs from the second call _, second_kwargs = mock_completion.call_args_list[1] assert second_kwargs.get("temperature") == 1.0 + + +# ------------------------------------------------------------------ +# Async acompletion tests +# ------------------------------------------------------------------ + + +@pytest.mark.asyncio +@patch( + "openhands.sdk.llm.llm.litellm_acompletion", + new_callable=AsyncMock, +) +async def test_async_no_response_retry_bumps_temperature( + mock_acompletion: AsyncMock, base_llm: LLM +) -> None: + """Async acompletion must apply the temperature bump on retry (B2 regression).""" + assert base_llm.temperature == 0.0 + + mock_acompletion.side_effect = [ + create_empty_choices_response("empty-1"), + create_mock_response("ok"), + ] + + resp = await base_llm.acompletion( + messages=[Message(role="user", content=[TextContent(text="hi")])] + ) + + assert isinstance(resp, LLMResponse) + assert mock_acompletion.call_count == 2 + _, second_kwargs = mock_acompletion.call_args_list[1] + assert second_kwargs.get("temperature") == 1.0 + + +@pytest.mark.asyncio +@patch( + "openhands.sdk.llm.llm.litellm_acompletion", + new_callable=AsyncMock, +) +async def test_async_no_response_retries_then_succeeds( + mock_acompletion: AsyncMock, base_llm: LLM +) -> None: + mock_acompletion.side_effect = [ + create_empty_choices_response("empty-1"), + create_mock_response("success"), + ] + + resp = await base_llm.acompletion( + messages=[Message(role="user", content=[TextContent(text="hi")])] + ) + + assert isinstance(resp, LLMResponse) + assert resp.message is not None + assert mock_acompletion.call_count == 2 + + +@pytest.mark.asyncio +@patch( + "openhands.sdk.llm.llm.litellm_acompletion", + new_callable=AsyncMock, +) +async def test_async_no_response_exhausts_retries( + mock_acompletion: AsyncMock, base_llm: LLM +) -> None: + mock_acompletion.side_effect = [ + create_empty_choices_response("empty-1"), + create_empty_choices_response("empty-2"), + ] + + with pytest.raises(LLMNoResponseError): + await base_llm.acompletion( + messages=[Message(role="user", content=[TextContent(text="hi")])] + ) + + assert mock_acompletion.call_count == base_llm.num_retries From 46f392275a29074d9dc05aec3c2478fa8f39fa40 Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 21 May 2026 21:10:09 +0000 Subject: [PATCH 2/6] Address review: defensive kwargs copy + aresponses tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add defensive kwargs = dict(kwargs) at the top of both _prepare_completion_params and _prepare_responses_params so callers are not affected by internal mutations (e.g. kwargs['tools'] = ...). - Add 3 aresponses regression tests covering temperature bump on retry, retry-then-succeed, and retry exhaustion — parallel to the existing acompletion tests. Co-authored-by: openhands --- openhands-sdk/openhands/sdk/llm/llm.py | 7 ++ tests/sdk/llm/test_llm_no_response_retry.py | 98 +++++++++++++++++++++ 2 files changed, 105 insertions(+) diff --git a/openhands-sdk/openhands/sdk/llm/llm.py b/openhands-sdk/openhands/sdk/llm/llm.py index c4645e9b1b..20d1123a01 100644 --- a/openhands-sdk/openhands/sdk/llm/llm.py +++ b/openhands-sdk/openhands/sdk/llm/llm.py @@ -785,6 +785,10 @@ def _prepare_completion_params( (formatted_messages, cc_tools, use_mock_tools, call_kwargs, telemetry_ctx) """ + # Defensive copy — this method mutates kwargs (e.g. kwargs["tools"]) + # and the caller should not observe those side-effects. + kwargs = dict(kwargs) + formatted_messages = self.format_messages_for_llm(messages) use_native_fc = self.native_tool_calling original_fncall_msgs = copy.deepcopy(formatted_messages) @@ -857,6 +861,9 @@ def _prepare_responses_params( (instructions, input_items, resp_tools, call_kwargs, telemetry_ctx) """ + # Defensive copy — select_responses_options may mutate kwargs. + kwargs = dict(kwargs) + instructions, input_items = self.format_messages_for_responses(messages) # Responses path always supports function tools diff --git a/tests/sdk/llm/test_llm_no_response_retry.py b/tests/sdk/llm/test_llm_no_response_retry.py index dba6cd71d1..4d6ccd06b0 100644 --- a/tests/sdk/llm/test_llm_no_response_retry.py +++ b/tests/sdk/llm/test_llm_no_response_retry.py @@ -1,7 +1,9 @@ from unittest.mock import AsyncMock, patch import pytest +from litellm.types.llms.openai import ResponsesAPIResponse from litellm.types.utils import Choices, Message as LiteLLMMessage, ModelResponse, Usage +from openai.types.responses import ResponseOutputMessage, ResponseOutputText from pydantic import SecretStr from openhands.sdk.llm import LLM, LLMResponse, Message, TextContent @@ -180,3 +182,99 @@ async def test_async_no_response_exhausts_retries( ) assert mock_acompletion.call_count == base_llm.num_retries + + +# ------------------------------------------------------------------ +# Async aresponses tests +# ------------------------------------------------------------------ + + +def create_mock_responses_api_response( + text: str = "ok", +) -> ResponsesAPIResponse: + return ResponsesAPIResponse( + id="resp-1", + created_at=1, + output=[ + ResponseOutputMessage( + id="msg-1", + type="message", + role="assistant", + status="completed", + content=[ + ResponseOutputText(type="output_text", text=text, annotations=[]) + ], + ) + ], + model="gpt-4o", + object="response", + ) + + +@pytest.mark.asyncio +@patch( + "openhands.sdk.llm.llm.litellm_aresponses", + new_callable=AsyncMock, +) +async def test_async_aresponses_retry_bumps_temperature( + mock_aresponses: AsyncMock, base_llm: LLM +) -> None: + """aresponses must apply the temperature bump on retry (B2 regression).""" + assert base_llm.temperature == 0.0 + + mock_aresponses.side_effect = [ + LLMNoResponseError("empty response"), + create_mock_responses_api_response("ok"), + ] + + resp = await base_llm.aresponses( + messages=[Message(role="user", content=[TextContent(text="hi")])] + ) + + assert isinstance(resp, LLMResponse) + assert mock_aresponses.call_count == 2 + _, second_kwargs = mock_aresponses.call_args_list[1] + assert second_kwargs.get("temperature") == 1.0 + + +@pytest.mark.asyncio +@patch( + "openhands.sdk.llm.llm.litellm_aresponses", + new_callable=AsyncMock, +) +async def test_async_aresponses_retries_then_succeeds( + mock_aresponses: AsyncMock, base_llm: LLM +) -> None: + mock_aresponses.side_effect = [ + LLMNoResponseError("empty response"), + create_mock_responses_api_response("success"), + ] + + resp = await base_llm.aresponses( + messages=[Message(role="user", content=[TextContent(text="hi")])] + ) + + assert isinstance(resp, LLMResponse) + assert resp.message is not None + assert mock_aresponses.call_count == 2 + + +@pytest.mark.asyncio +@patch( + "openhands.sdk.llm.llm.litellm_aresponses", + new_callable=AsyncMock, +) +async def test_async_aresponses_exhausts_retries( + mock_aresponses: AsyncMock, base_llm: LLM +) -> None: + mock_aresponses.side_effect = [ + LLMNoResponseError("empty-1"), + LLMNoResponseError("empty-2"), + ] + + with pytest.raises(LLMNoResponseError): + await base_llm.aresponses( + messages=[Message(role="user", content=[TextContent(text="hi")])] + ) + + assert mock_aresponses.call_count == base_llm.num_retries From a7616581e188ad8c17e8d3ba1cf6a41d7c0950b1 Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 21 May 2026 21:37:11 +0000 Subject: [PATCH 3/6] Deduplicate retry-handled methods in LLM MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract shared helpers to eliminate code duplication across completion/acompletion/responses/aresponses: Layer 1 — _make_retry_decorator(): replaces 4 copies of the 7-line retry decorator configuration. Layer 2 — _build_completion_result() / _build_responses_result(): shared result-building logic (response → Message → LLMResponse), replaces 2×5 identical lines per pair. Layer 3 — responses stream processing: - _build_responses_call_kwargs(): shared litellm call kwargs dict - _process_stream_event(): event type matching + delta extraction - _finalize_stream_response(): post-stream validation + output patching Net result: -33 lines, and the sync/async responses _one_attempt bodies now differ only in the transport call (litellm_responses vs await litellm_aresponses) and iteration (for vs async for). Co-authored-by: openhands --- openhands-sdk/openhands/sdk/llm/llm.py | 351 +++++++++++-------------- 1 file changed, 159 insertions(+), 192 deletions(-) diff --git a/openhands-sdk/openhands/sdk/llm/llm.py b/openhands-sdk/openhands/sdk/llm/llm.py index 20d1123a01..085f856a14 100644 --- a/openhands-sdk/openhands/sdk/llm/llm.py +++ b/openhands-sdk/openhands/sdk/llm/llm.py @@ -766,6 +766,128 @@ def _current_metrics_snapshot(self) -> MetricsSnapshot: accumulated_token_usage=self.metrics.accumulated_token_usage, ) + def _make_retry_decorator( + self, + ) -> Callable[[Callable[..., Any]], Callable[..., Any]]: + """Return a configured retry decorator using this LLM's retry settings.""" + return self.retry_decorator( + num_retries=self.num_retries, + retry_exceptions=LLM_RETRY_EXCEPTIONS, + retry_min_wait=self.retry_min_wait, + retry_max_wait=self.retry_max_wait, + retry_multiplier=self.retry_multiplier, + retry_listener=self._retry_listener_fn, + ) + + def _build_completion_result(self, resp: ModelResponse) -> LLMResponse: + """Convert a raw :class:`ModelResponse` into an :class:`LLMResponse`.""" + first_choice = resp["choices"][0] + message = Message.from_llm_chat_message(first_choice["message"]) + return LLMResponse( + message=message, + metrics=self._current_metrics_snapshot(), + raw_response=resp, + ) + + def _build_responses_result(self, resp: ResponsesAPIResponse) -> LLMResponse: + """Convert a raw :class:`ResponsesAPIResponse` into an :class:`LLMResponse`.""" + output_seq = cast(Sequence[Any], resp.output or []) + message = Message.from_llm_responses_output(output_seq) + return LLMResponse( + message=message, + metrics=self._current_metrics_snapshot(), + raw_response=resp, + ) + + def _build_responses_call_kwargs( + self, + input_items: list[dict[str, Any]], + instructions: str | None, + resp_tools: list[Any] | None, + final_kwargs: dict[str, Any], + ) -> dict[str, Any]: + """Build the shared kwargs dict for litellm_responses / litellm_aresponses.""" + typed_input: ResponseInputParam | str = ( + cast(ResponseInputParam, input_items) if input_items else "" + ) + api_key_value = self._get_litellm_api_key_value() + return { + "model": self.model, + "input": typed_input, + "instructions": instructions, + "tools": resp_tools, + "api_key": api_key_value, + "api_base": self.base_url, + "api_version": self.api_version, + "timeout": self.timeout, + "drop_params": self.drop_params, + "seed": self.seed, + **self._aws_kwargs(), + **final_kwargs, + } + + def _process_stream_event( + self, event: Any + ) -> tuple[Any | None, ModelResponseStream | None]: + """Extract output item and delta chunk from a Responses stream event. + + Returns: + (output_item, delta_chunk) — either or both may be ``None``. + """ + output_item: Any | None = None + delta_chunk: ModelResponseStream | None = None + + evt_type = getattr(event, "type", None) + if evt_type == ResponsesAPIStreamEvents.OUTPUT_ITEM_DONE: + item = getattr(event, "item", None) + if item is not None: + output_item = item + + if isinstance( + event, + ( + OutputTextDeltaEvent, + RefusalDeltaEvent, + ReasoningSummaryTextDeltaEvent, + ), + ): + delta = event.delta + if delta: + delta_chunk = ModelResponseStream( + choices=[StreamingChoices(delta=Delta(content=delta))] + ) + + return output_item, delta_chunk + + def _finalize_stream_response( + self, + completed_response: Any, + collected_output_items: list[Any], + ) -> ResponsesAPIResponse: + """Validate and patch the completed response from a Responses stream. + + Raises: + LLMNoResponseError: If the stream finished without a completed + response or with an unexpected event type. + """ + if completed_response is None: + raise LLMNoResponseError( + "Responses stream finished without a completed response" + ) + if not isinstance(completed_response, ResponseCompletedEvent): + raise LLMNoResponseError( + f"Unexpected completed event: {type(completed_response)}" + ) + + completed_resp = completed_response.response + # Patch empty output with items collected from stream + if not completed_resp.output and collected_output_items: + completed_resp.output = collected_output_items + + assert self._telemetry is not None + self._telemetry.on_response(completed_resp) + return completed_resp + def _prepare_completion_params( self, messages: list[Message], @@ -1000,14 +1122,7 @@ def completion( messages, tools, add_security_risk_prediction, kwargs ) - @self.retry_decorator( - num_retries=self.num_retries, - retry_exceptions=LLM_RETRY_EXCEPTIONS, - retry_min_wait=self.retry_min_wait, - retry_max_wait=self.retry_max_wait, - retry_multiplier=self.retry_multiplier, - retry_listener=self._retry_listener_fn, - ) + @self._make_retry_decorator() def _one_attempt(**retry_kwargs: Any) -> ModelResponse: assert self._telemetry is not None self._telemetry.on_request(telemetry_ctx=telemetry_ctx) @@ -1028,14 +1143,7 @@ def _one_attempt(**retry_kwargs: Any) -> ModelResponse: return resp try: - resp = _one_attempt() - first_choice = resp["choices"][0] - message = Message.from_llm_chat_message(first_choice["message"]) - return LLMResponse( - message=message, - metrics=self._current_metrics_snapshot(), - raw_response=resp, - ) + return self._build_completion_result(_one_attempt()) except Exception as e: return self._handle_error( e, @@ -1081,14 +1189,7 @@ async def acompletion( messages, tools, add_security_risk_prediction, kwargs ) - @self.retry_decorator( - num_retries=self.num_retries, - retry_exceptions=LLM_RETRY_EXCEPTIONS, - retry_min_wait=self.retry_min_wait, - retry_max_wait=self.retry_max_wait, - retry_multiplier=self.retry_multiplier, - retry_listener=self._retry_listener_fn, - ) + @self._make_retry_decorator() async def _one_attempt(**retry_kwargs: Any) -> ModelResponse: assert self._telemetry is not None self._telemetry.on_request(telemetry_ctx=telemetry_ctx) @@ -1109,14 +1210,7 @@ async def _one_attempt(**retry_kwargs: Any) -> ModelResponse: return resp try: - resp = await _one_attempt() - first_choice = resp["choices"][0] - message = Message.from_llm_chat_message(first_choice["message"]) - return LLMResponse( - message=message, - metrics=self._current_metrics_snapshot(), - raw_response=resp, - ) + return self._build_completion_result(await _one_attempt()) except Exception as e: # Fallback is synchronous; cast the token callback since the # fallback LLM's sync path accepts TokenCallbackType. @@ -1181,14 +1275,7 @@ def responses( messages, tools, include, store, add_security_risk_prediction, kwargs ) - @self.retry_decorator( - num_retries=self.num_retries, - retry_exceptions=LLM_RETRY_EXCEPTIONS, - retry_min_wait=self.retry_min_wait, - retry_max_wait=self.retry_max_wait, - retry_multiplier=self.retry_multiplier, - retry_listener=self._retry_listener_fn, - ) + @self._make_retry_decorator() def _one_attempt(**retry_kwargs: Any) -> ResponsesAPIResponse: assert self._telemetry is not None self._telemetry.on_request(telemetry_ctx=telemetry_ctx) @@ -1196,30 +1283,17 @@ def _one_attempt(**retry_kwargs: Any) -> ResponsesAPIResponse: with self._litellm_modify_params_ctx(self.modify_params): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=DeprecationWarning) - typed_input: ResponseInputParam | str = ( - cast(ResponseInputParam, input_items) if input_items else "" - ) - api_key_value = self._get_litellm_api_key_value() - - ret = litellm_responses( - model=self.model, - input=typed_input, - instructions=instructions, - tools=resp_tools, - api_key=api_key_value, - api_base=self.base_url, - api_version=self.api_version, - timeout=self.timeout, - drop_params=self.drop_params, - seed=self.seed, - **{**self._aws_kwargs(), **final_kwargs}, + litellm_kwargs = self._build_responses_call_kwargs( + input_items, instructions, resp_tools, final_kwargs ) + ret = litellm_responses(**litellm_kwargs) + if isinstance(ret, ResponsesAPIResponse): if user_enable_streaming: logger.warning( - "Responses streaming was requested, but the provider " - "returned a non-streaming response; no on_token deltas " - "will be emitted." + "Responses streaming was requested, but the " + "provider returned a non-streaming response; " + "no on_token deltas will be emitted." ) self._telemetry.on_response(ret) return ret @@ -1232,7 +1306,6 @@ def _one_attempt(**retry_kwargs: Any) -> ResponsesAPIResponse: raise AssertionError( f"Expected Responses stream iterator, got {type(ret)}" ) - stream_callback = on_token if user_enable_streaming else None # Collect output items from streaming events. # Some endpoints (e.g., Codex subscription) send @@ -1244,64 +1317,22 @@ def _one_attempt(**retry_kwargs: Any) -> ResponsesAPIResponse: for event in ret: if event is None: continue - evt_type = getattr(event, "type", None) - if evt_type == ResponsesAPIStreamEvents.OUTPUT_ITEM_DONE: - item = getattr(event, "item", None) - if item is not None: - collected_output_items.append(item) - if stream_callback is None: - continue - if isinstance( - event, - ( - OutputTextDeltaEvent, - RefusalDeltaEvent, - ReasoningSummaryTextDeltaEvent, - ), - ): - delta = event.delta - if delta: - stream_callback( - ModelResponseStream( - choices=[ - StreamingChoices( - delta=Delta(content=delta) - ) - ] - ) - ) - - completed_event = ret.completed_response - if completed_event is None: - raise LLMNoResponseError( - "Responses stream finished without a completed response" - ) - if not isinstance(completed_event, ResponseCompletedEvent): - raise LLMNoResponseError( - f"Unexpected completed event: {type(completed_event)}" - ) - - completed_resp = completed_event.response - # Patch empty output with items collected from stream - if not completed_resp.output and collected_output_items: - completed_resp.output = collected_output_items - - self._telemetry.on_response(completed_resp) - return completed_resp + output_item, delta_chunk = self._process_stream_event(event) + if output_item is not None: + collected_output_items.append(output_item) + if stream_callback is not None and delta_chunk is not None: + stream_callback(delta_chunk) + + return self._finalize_stream_response( + ret.completed_response, collected_output_items + ) raise AssertionError( f"Expected ResponsesAPIResponse, got {type(ret)}" ) try: - resp: ResponsesAPIResponse = _one_attempt() - output_seq = cast(Sequence[Any], resp.output or []) - message = Message.from_llm_responses_output(output_seq) - return LLMResponse( - message=message, - metrics=self._current_metrics_snapshot(), - raw_response=resp, - ) + return self._build_responses_result(_one_attempt()) except Exception as e: return self._handle_error( e, @@ -1352,14 +1383,7 @@ async def aresponses( messages, tools, include, store, add_security_risk_prediction, kwargs ) - @self.retry_decorator( - num_retries=self.num_retries, - retry_exceptions=LLM_RETRY_EXCEPTIONS, - retry_min_wait=self.retry_min_wait, - retry_max_wait=self.retry_max_wait, - retry_multiplier=self.retry_multiplier, - retry_listener=self._retry_listener_fn, - ) + @self._make_retry_decorator() async def _one_attempt( **retry_kwargs: Any, ) -> ResponsesAPIResponse: @@ -1369,24 +1393,11 @@ async def _one_attempt( with self._litellm_modify_params_ctx(self.modify_params): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=DeprecationWarning) - typed_input: ResponseInputParam | str = ( - cast(ResponseInputParam, input_items) if input_items else "" - ) - api_key_value = self._get_litellm_api_key_value() - - ret = await litellm_aresponses( - model=self.model, - input=typed_input, - instructions=instructions, - tools=resp_tools, - api_key=api_key_value, - api_base=self.base_url, - api_version=self.api_version, - timeout=self.timeout, - drop_params=self.drop_params, - seed=self.seed, - **{**self._aws_kwargs(), **final_kwargs}, + litellm_kwargs = self._build_responses_call_kwargs( + input_items, instructions, resp_tools, final_kwargs ) + ret = await litellm_aresponses(**litellm_kwargs) + if isinstance(ret, ResponsesAPIResponse): if user_enable_streaming: logger.warning( @@ -1406,7 +1417,6 @@ async def _one_attempt( "Expected Responses async stream " f"iterator, got {type(ret)}" ) - stream_cb = on_token if user_enable_streaming else None # Collect output items from streaming events. # Some endpoints (e.g., Codex subscription) send @@ -1418,65 +1428,22 @@ async def _one_attempt( async for event in ret: if event is None: continue - evt_type = getattr(event, "type", None) - if evt_type == ResponsesAPIStreamEvents.OUTPUT_ITEM_DONE: - item = getattr(event, "item", None) - if item is not None: - collected_output_items.append(item) - if stream_cb is None: - continue - if isinstance( - event, - ( - OutputTextDeltaEvent, - RefusalDeltaEvent, - ReasoningSummaryTextDeltaEvent, - ), - ): - delta = event.delta - if delta: - await _invoke_token_callback( - stream_cb, - ModelResponseStream( - choices=[ - StreamingChoices( - delta=Delta(content=delta) - ) - ] - ), - ) - - completed_event = ret.completed_response - if completed_event is None: - raise LLMNoResponseError( - "Responses stream finished without a completed response" - ) - if not isinstance(completed_event, ResponseCompletedEvent): - raise LLMNoResponseError( - f"Unexpected completed event: {type(completed_event)}" - ) - - completed_resp = completed_event.response - # Patch empty output with items collected from stream - if not completed_resp.output and collected_output_items: - completed_resp.output = collected_output_items - - self._telemetry.on_response(completed_resp) - return completed_resp + output_item, delta_chunk = self._process_stream_event(event) + if output_item is not None: + collected_output_items.append(output_item) + if stream_cb is not None and delta_chunk is not None: + await _invoke_token_callback(stream_cb, delta_chunk) + + return self._finalize_stream_response( + ret.completed_response, collected_output_items + ) raise AssertionError( f"Expected ResponsesAPIResponse, got {type(ret)}" ) try: - completed: ResponsesAPIResponse = await _one_attempt() - output_seq = cast(Sequence[Any], completed.output or []) - message = Message.from_llm_responses_output(output_seq) - return LLMResponse( - message=message, - metrics=self._current_metrics_snapshot(), - raw_response=completed, - ) + return self._build_responses_result(await _one_attempt()) except Exception as e: _fb_token = cast("TokenCallbackType | None", on_token) return await self._ahandle_error( From 273242869c72e7329707d8255265594a4cd97c23 Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 21 May 2026 21:53:30 +0000 Subject: [PATCH 4/6] Add streaming-path retry test for aresponses MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Test that when a streaming response completes without a ResponseCompletedEvent (i.e. _finalize_stream_response raises LLMNoResponseError), the retry mechanism still bumps temperature from 0→1.0. Uses a minimal _FakeAsyncStreamIterator that inherits from ResponsesAPIStreamingIterator to pass isinstance checks. Co-authored-by: openhands --- tests/sdk/llm/test_llm_no_response_retry.py | 75 +++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/tests/sdk/llm/test_llm_no_response_retry.py b/tests/sdk/llm/test_llm_no_response_retry.py index 4d6ccd06b0..fc4613c45d 100644 --- a/tests/sdk/llm/test_llm_no_response_retry.py +++ b/tests/sdk/llm/test_llm_no_response_retry.py @@ -1,6 +1,7 @@ from unittest.mock import AsyncMock, patch import pytest +from litellm.responses.streaming_iterator import ResponsesAPIStreamingIterator from litellm.types.llms.openai import ResponsesAPIResponse from litellm.types.utils import Choices, Message as LiteLLMMessage, ModelResponse, Usage from openai.types.responses import ResponseOutputMessage, ResponseOutputText @@ -278,3 +279,77 @@ async def test_async_aresponses_exhausts_retries( ) assert mock_aresponses.call_count == base_llm.num_retries + + +# ------------------------------------------------------------------ +# Streaming-path retry tests (stream completes without ResponseCompletedEvent) +# ------------------------------------------------------------------ + + +class _FakeAsyncStreamIterator(ResponsesAPIStreamingIterator): + """Minimal async stream iterator for testing stream-path failures. + + Inherits from ``ResponsesAPIStreamingIterator`` so it passes the + ``isinstance`` check inside ``aresponses._one_attempt``, but skips + the heavyweight parent ``__init__``. + """ + + def __init__( + self, + events: list, + completed_response=None, + ) -> None: + # Intentionally skip parent __init__; we only need iteration + # and the completed_response attribute. + self._events = list(events) + self.completed_response = completed_response + + def __aiter__(self): + return self + + async def __anext__(self): + if not self._events: + raise StopAsyncIteration + return self._events.pop(0) + + +@pytest.mark.asyncio +@patch( + "openhands.sdk.llm.llm.litellm_aresponses", + new_callable=AsyncMock, +) +async def test_async_aresponses_stream_path_retry_bumps_temperature( + mock_aresponses: AsyncMock, +) -> None: + """When a streaming response has no completed event, _finalize_stream_response + raises LLMNoResponseError inside the retry boundary. On retry the temperature + should be bumped from 0→1.0, just like the non-streaming path. + """ + streaming_llm = LLM( + usage_id="test-stream", + model="gpt-4o", + api_key=SecretStr("test_key"), + num_retries=2, + retry_min_wait=1, + retry_max_wait=2, + temperature=0.0, + stream=True, + ) + + # First call: return an iterator that ends without a completed event + # → _finalize_stream_response raises LLMNoResponseError + # Second call: return a valid non-streaming response (fast path) + mock_aresponses.side_effect = [ + _FakeAsyncStreamIterator(events=[], completed_response=None), + create_mock_responses_api_response("ok"), + ] + + resp = await streaming_llm.aresponses( + messages=[Message(role="user", content=[TextContent(text="hi")])], + on_token=lambda _chunk: None, + ) + + assert isinstance(resp, LLMResponse) + assert mock_aresponses.call_count == 2 + _, second_kwargs = mock_aresponses.call_args_list[1] + assert second_kwargs.get("temperature") == 1.0 From 7377e1508f5ae9edbcab4a273f8bdb6df026b13d Mon Sep 17 00:00:00 2001 From: VascoSch92 Date: Mon, 25 May 2026 00:34:30 +0200 Subject: [PATCH 5/6] Address review: drop validate tuple, restore logs/comments, skip unused deltas, add sync tests - _validate_chat_response returns just resp (raw_resp is consumed internally by Telemetry.on_response); both call sites updated - Restore the mock-tool logger.debug and the informative comments lost when code moved into _prepare_completion_params / _prepare_responses_params - _process_stream_event gains emit_deltas so the ModelResponseStream chunk is not built when there is no stream callback to receive it - Add sync responses retry + sync streaming retry regression tests --- openhands-sdk/openhands/sdk/llm/llm.py | 54 +++++++--- tests/sdk/llm/test_llm_no_response_retry.py | 107 +++++++++++++++++++- 2 files changed, 148 insertions(+), 13 deletions(-) diff --git a/openhands-sdk/openhands/sdk/llm/llm.py b/openhands-sdk/openhands/sdk/llm/llm.py index 085f856a14..df857d49d1 100644 --- a/openhands-sdk/openhands/sdk/llm/llm.py +++ b/openhands-sdk/openhands/sdk/llm/llm.py @@ -827,23 +827,29 @@ def _build_responses_call_kwargs( } def _process_stream_event( - self, event: Any + self, event: Any, *, emit_deltas: bool = True ) -> tuple[Any | None, ModelResponseStream | None]: """Extract output item and delta chunk from a Responses stream event. + Args: + event: A single Responses streaming event. + emit_deltas: When ``False`` the delta chunk is never built — skip + the allocation when there is no stream callback to receive it. + Returns: (output_item, delta_chunk) — either or both may be ``None``. """ output_item: Any | None = None delta_chunk: ModelResponseStream | None = None + # Collect finished output items evt_type = getattr(event, "type", None) if evt_type == ResponsesAPIStreamEvents.OUTPUT_ITEM_DONE: item = getattr(event, "item", None) if item is not None: output_item = item - if isinstance( + if emit_deltas and isinstance( event, ( OutputTextDeltaEvent, @@ -911,10 +917,14 @@ def _prepare_completion_params( # and the caller should not observe those side-effects. kwargs = dict(kwargs) + # 1) serialize messages formatted_messages = self.format_messages_for_llm(messages) + + # 2) choose function-calling strategy use_native_fc = self.native_tool_calling original_fncall_msgs = copy.deepcopy(formatted_messages) + # Convert Tool objects to ChatCompletionToolParam once here cc_tools: list[ChatCompletionToolParam] = [] if tools: cc_tools = [ @@ -926,6 +936,10 @@ def _prepare_completion_params( use_mock_tools = self.should_mock_tool_calls(cc_tools) if use_mock_tools: + logger.debug( + "LLM.completion: mocking function-calling via prompt " + f"for model {self.model}" + ) formatted_messages, kwargs = self.pre_request_prompt_mock( formatted_messages, cc_tools or [], @@ -933,10 +947,14 @@ def _prepare_completion_params( include_security_params=add_security_risk_prediction, ) + # 3) normalize provider params + # Only pass tools when native FC is active kwargs["tools"] = cc_tools if (bool(cc_tools) and use_native_fc) else None has_tools_flag = bool(cc_tools) and use_native_fc + # Behavior-preserving: delegate to select_chat_options call_kwargs = select_chat_options(self, kwargs, has_tools=has_tools_flag) + # 4) request context for telemetry (always include context_window for metrics) # Always pass context_window so metrics are tracked even when # logging is disabled. assert self._telemetry is not None @@ -946,7 +964,7 @@ def _prepare_completion_params( if self._telemetry.log_enabled: telemetry_ctx.update( { - "messages": formatted_messages[:], + "messages": formatted_messages[:], # already simple dicts "tools": tools, "kwargs": {k: v for k, v in call_kwargs.items()}, } @@ -986,9 +1004,11 @@ def _prepare_responses_params( # Defensive copy — select_responses_options may mutate kwargs. kwargs = dict(kwargs) + # Build instructions + input list using dedicated Responses formatter instructions, input_items = self.format_messages_for_responses(messages) - # Responses path always supports function tools + # Convert Tool objects to Responses ToolParam + # (Responses path always supports function tools) resp_tools = ( [ t.to_responses_tool( @@ -1000,10 +1020,12 @@ def _prepare_responses_params( else None ) + # Normalize/override Responses kwargs consistently call_kwargs = select_responses_options( self, kwargs, include=include, store=store ) + # Request context for telemetry (always include context_window for metrics) # Always pass context_window so metrics are tracked even when # logging is disabled. assert self._telemetry is not None @@ -1031,11 +1053,11 @@ def _validate_chat_response( formatted_messages: list[dict[str, Any]], cc_tools: list[ChatCompletionToolParam], add_security_risk_prediction: bool, - ) -> tuple[ModelResponse, ModelResponse | None]: + ) -> ModelResponse: """Post-process a chat completion response inside the retry boundary. - Returns ``(resp, raw_resp)`` where *raw_resp* is non-``None`` only - when mock-tool post-processing was applied. + The raw (pre-mock) response is consumed internally by + ``Telemetry.on_response`` and is not returned to the caller. Raises: LLMNoResponseError: If the response has no choices @@ -1052,14 +1074,18 @@ def _validate_chat_response( include_security_params=add_security_risk_prediction, ) + # 6) telemetry assert self._telemetry is not None self._telemetry.on_response(resp, raw_resp=raw_resp) + # Ensure at least one choice. + # Gemini sometimes returns empty choices; we raise LLMNoResponseError here + # inside the retry boundary so it is retried. if not resp.get("choices") or len(resp["choices"]) < 1: raise LLMNoResponseError( "Response choices is less than 1. Response: " + str(resp) ) - return resp, raw_resp + return resp # ========================================================================= # Chat Completion API @@ -1133,7 +1159,7 @@ def _one_attempt(**retry_kwargs: Any) -> ModelResponse: enable_streaming=enable_streaming, on_token=on_token, ) - resp, _ = self._validate_chat_response( + resp = self._validate_chat_response( resp, use_mock_tools=use_mock_tools, formatted_messages=formatted_messages, @@ -1200,7 +1226,7 @@ async def _one_attempt(**retry_kwargs: Any) -> ModelResponse: enable_streaming=enable_streaming, on_token=on_token, ) - resp, _ = self._validate_chat_response( + resp = self._validate_chat_response( resp, use_mock_tools=use_mock_tools, formatted_messages=formatted_messages, @@ -1317,7 +1343,9 @@ def _one_attempt(**retry_kwargs: Any) -> ResponsesAPIResponse: for event in ret: if event is None: continue - output_item, delta_chunk = self._process_stream_event(event) + output_item, delta_chunk = self._process_stream_event( + event, emit_deltas=stream_callback is not None + ) if output_item is not None: collected_output_items.append(output_item) if stream_callback is not None and delta_chunk is not None: @@ -1428,7 +1456,9 @@ async def _one_attempt( async for event in ret: if event is None: continue - output_item, delta_chunk = self._process_stream_event(event) + output_item, delta_chunk = self._process_stream_event( + event, emit_deltas=stream_cb is not None + ) if output_item is not None: collected_output_items.append(output_item) if stream_cb is not None and delta_chunk is not None: diff --git a/tests/sdk/llm/test_llm_no_response_retry.py b/tests/sdk/llm/test_llm_no_response_retry.py index fc4613c45d..4f39a59f69 100644 --- a/tests/sdk/llm/test_llm_no_response_retry.py +++ b/tests/sdk/llm/test_llm_no_response_retry.py @@ -1,7 +1,10 @@ from unittest.mock import AsyncMock, patch import pytest -from litellm.responses.streaming_iterator import ResponsesAPIStreamingIterator +from litellm.responses.streaming_iterator import ( + ResponsesAPIStreamingIterator, + SyncResponsesAPIStreamingIterator, +) from litellm.types.llms.openai import ResponsesAPIResponse from litellm.types.utils import Choices, Message as LiteLLMMessage, ModelResponse, Usage from openai.types.responses import ResponseOutputMessage, ResponseOutputText @@ -281,11 +284,113 @@ async def test_async_aresponses_exhausts_retries( assert mock_aresponses.call_count == base_llm.num_retries +# ------------------------------------------------------------------ +# Sync responses tests (exercise the shared helper extraction) +# ------------------------------------------------------------------ + + +@patch("openhands.sdk.llm.llm.litellm_responses") +def test_responses_retry_bumps_temperature(mock_responses, base_llm: LLM) -> None: + """Sync responses must apply the temperature bump on retry after the + _prepare_responses_params / _build_responses_call_kwargs extraction.""" + assert base_llm.temperature == 0.0 + + mock_responses.side_effect = [ + LLMNoResponseError("empty response"), + create_mock_responses_api_response("ok"), + ] + + resp = base_llm.responses( + messages=[Message(role="user", content=[TextContent(text="hi")])] + ) + + assert isinstance(resp, LLMResponse) + assert mock_responses.call_count == 2 + _, second_kwargs = mock_responses.call_args_list[1] + assert second_kwargs.get("temperature") == 1.0 + + +@patch("openhands.sdk.llm.llm.litellm_responses") +def test_responses_retries_then_succeeds(mock_responses, base_llm: LLM) -> None: + mock_responses.side_effect = [ + LLMNoResponseError("empty response"), + create_mock_responses_api_response("success"), + ] + + resp = base_llm.responses( + messages=[Message(role="user", content=[TextContent(text="hi")])] + ) + + assert isinstance(resp, LLMResponse) + assert resp.message is not None + assert mock_responses.call_count == 2 + + # ------------------------------------------------------------------ # Streaming-path retry tests (stream completes without ResponseCompletedEvent) # ------------------------------------------------------------------ +class _FakeSyncStreamIterator(SyncResponsesAPIStreamingIterator): + """Minimal sync stream iterator for testing stream-path failures. + + Mirrors :class:`_FakeAsyncStreamIterator` for the synchronous + ``responses`` path: inherits from ``SyncResponsesAPIStreamingIterator`` + so it passes the ``isinstance`` check inside ``responses._one_attempt``, + but skips the heavyweight parent ``__init__``. + """ + + def __init__( + self, + events: list, + completed_response=None, + ) -> None: + # Intentionally skip parent __init__; we only need iteration + # and the completed_response attribute. + self._events = list(events) + self.completed_response = completed_response + + def __iter__(self): + return self + + def __next__(self): + if not self._events: + raise StopIteration + return self._events.pop(0) + + +@patch("openhands.sdk.llm.llm.litellm_responses") +def test_responses_stream_path_retry_bumps_temperature(mock_responses) -> None: + """Sync streaming counterpart of the aresponses stream-path test: an + iterator that ends without a completed event raises LLMNoResponseError + inside the retry boundary, and the retry bumps temperature 0→1.0.""" + streaming_llm = LLM( + usage_id="test-stream-sync", + model="gpt-4o", + api_key=SecretStr("test_key"), + num_retries=2, + retry_min_wait=1, + retry_max_wait=2, + temperature=0.0, + stream=True, + ) + + mock_responses.side_effect = [ + _FakeSyncStreamIterator(events=[], completed_response=None), + create_mock_responses_api_response("ok"), + ] + + resp = streaming_llm.responses( + messages=[Message(role="user", content=[TextContent(text="hi")])], + on_token=lambda _chunk: None, + ) + + assert isinstance(resp, LLMResponse) + assert mock_responses.call_count == 2 + _, second_kwargs = mock_responses.call_args_list[1] + assert second_kwargs.get("temperature") == 1.0 + + class _FakeAsyncStreamIterator(ResponsesAPIStreamingIterator): """Minimal async stream iterator for testing stream-path failures. From 8df29dcf7af4a20b164aa4116a3f1ac04ac648b9 Mon Sep 17 00:00:00 2001 From: VascoSch92 Date: Mon, 25 May 2026 20:05:50 +0200 Subject: [PATCH 6/6] Use retry_min_wait=0 / retry_max_wait=0 in streaming retry tests Avoids the per-retry sleep in the two stream-path tests without changing what the temperature-bump assertions verify. --- tests/sdk/llm/test_llm_no_response_retry.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/sdk/llm/test_llm_no_response_retry.py b/tests/sdk/llm/test_llm_no_response_retry.py index 4f39a59f69..96050ad007 100644 --- a/tests/sdk/llm/test_llm_no_response_retry.py +++ b/tests/sdk/llm/test_llm_no_response_retry.py @@ -369,8 +369,8 @@ def test_responses_stream_path_retry_bumps_temperature(mock_responses) -> None: model="gpt-4o", api_key=SecretStr("test_key"), num_retries=2, - retry_min_wait=1, - retry_max_wait=2, + retry_min_wait=0, + retry_max_wait=0, temperature=0.0, stream=True, ) @@ -435,8 +435,8 @@ async def test_async_aresponses_stream_path_retry_bumps_temperature( model="gpt-4o", api_key=SecretStr("test_key"), num_retries=2, - retry_min_wait=1, - retry_max_wait=2, + retry_min_wait=0, + retry_max_wait=0, temperature=0.0, stream=True, )