diff --git a/openhands-sdk/openhands/sdk/llm/llm.py b/openhands-sdk/openhands/sdk/llm/llm.py index bf11af7ae1..df857d49d1 100644 --- a/openhands-sdk/openhands/sdk/llm/llm.py +++ b/openhands-sdk/openhands/sdk/llm/llm.py @@ -753,52 +753,169 @@ async def _ahandle_error( raise mapped from error raise - def completion( + # ========================================================================= + # Shared helpers for completion / acompletion / responses / aresponses + # ========================================================================= + + def _current_metrics_snapshot(self) -> MetricsSnapshot: + """Snapshot current LLM metrics for an :class:`LLMResponse`.""" + return MetricsSnapshot( + model_name=self.metrics.model_name, + accumulated_cost=self.metrics.accumulated_cost, + max_budget_per_task=self.metrics.max_budget_per_task, + accumulated_token_usage=self.metrics.accumulated_token_usage, + ) + + def _make_retry_decorator( self, - messages: list[Message], - tools: Sequence[ToolDefinition] | None = None, - _return_metrics: bool = False, - add_security_risk_prediction: bool = False, - on_token: TokenCallbackType | None = None, - **kwargs, - ) -> LLMResponse: - """Generate a completion from the language model. + ) -> Callable[[Callable[..., Any]], Callable[..., Any]]: + """Return a configured retry decorator using this LLM's retry settings.""" + return self.retry_decorator( + num_retries=self.num_retries, + retry_exceptions=LLM_RETRY_EXCEPTIONS, + retry_min_wait=self.retry_min_wait, + retry_max_wait=self.retry_max_wait, + retry_multiplier=self.retry_multiplier, + retry_listener=self._retry_listener_fn, + ) - This is the method for getting responses from the model via Completion API. - It handles message formatting, tool calling, and response processing. + def _build_completion_result(self, resp: ModelResponse) -> LLMResponse: + """Convert a raw :class:`ModelResponse` into an :class:`LLMResponse`.""" + first_choice = resp["choices"][0] + message = Message.from_llm_chat_message(first_choice["message"]) + return LLMResponse( + message=message, + metrics=self._current_metrics_snapshot(), + raw_response=resp, + ) + + def _build_responses_result(self, resp: ResponsesAPIResponse) -> LLMResponse: + """Convert a raw :class:`ResponsesAPIResponse` into an :class:`LLMResponse`.""" + output_seq = cast(Sequence[Any], resp.output or []) + message = Message.from_llm_responses_output(output_seq) + return LLMResponse( + message=message, + metrics=self._current_metrics_snapshot(), + raw_response=resp, + ) + + def _build_responses_call_kwargs( + self, + input_items: list[dict[str, Any]], + instructions: str | None, + resp_tools: list[Any] | None, + final_kwargs: dict[str, Any], + ) -> dict[str, Any]: + """Build the shared kwargs dict for litellm_responses / litellm_aresponses.""" + typed_input: ResponseInputParam | str = ( + cast(ResponseInputParam, input_items) if input_items else "" + ) + api_key_value = self._get_litellm_api_key_value() + return { + "model": self.model, + "input": typed_input, + "instructions": instructions, + "tools": resp_tools, + "api_key": api_key_value, + "api_base": self.base_url, + "api_version": self.api_version, + "timeout": self.timeout, + "drop_params": self.drop_params, + "seed": self.seed, + **self._aws_kwargs(), + **final_kwargs, + } + + def _process_stream_event( + self, event: Any, *, emit_deltas: bool = True + ) -> tuple[Any | None, ModelResponseStream | None]: + """Extract output item and delta chunk from a Responses stream event. Args: - messages: List of conversation messages. - tools: Optional list of tools available to the model. - _return_metrics: Whether to return usage metrics. - add_security_risk_prediction: Add security_risk field to tool schemas. - on_token: Optional callback for streaming tokens. - **kwargs: Additional arguments passed to the LLM API. + event: A single Responses streaming event. + emit_deltas: When ``False`` the delta chunk is never built — skip + the allocation when there is no stream callback to receive it. Returns: - LLMResponse containing the model's response and metadata. + (output_item, delta_chunk) — either or both may be ``None``. + """ + output_item: Any | None = None + delta_chunk: ModelResponseStream | None = None + + # Collect finished output items + evt_type = getattr(event, "type", None) + if evt_type == ResponsesAPIStreamEvents.OUTPUT_ITEM_DONE: + item = getattr(event, "item", None) + if item is not None: + output_item = item + + if emit_deltas and isinstance( + event, + ( + OutputTextDeltaEvent, + RefusalDeltaEvent, + ReasoningSummaryTextDeltaEvent, + ), + ): + delta = event.delta + if delta: + delta_chunk = ModelResponseStream( + choices=[StreamingChoices(delta=Delta(content=delta))] + ) - Note: - Summary field is always added to tool schemas for transparency and - explainability of agent actions. + return output_item, delta_chunk + + def _finalize_stream_response( + self, + completed_response: Any, + collected_output_items: list[Any], + ) -> ResponsesAPIResponse: + """Validate and patch the completed response from a Responses stream. Raises: - ValueError: If streaming is requested (not supported). + LLMNoResponseError: If the stream finished without a completed + response or with an unexpected event type. + """ + if completed_response is None: + raise LLMNoResponseError( + "Responses stream finished without a completed response" + ) + if not isinstance(completed_response, ResponseCompletedEvent): + raise LLMNoResponseError( + f"Unexpected completed event: {type(completed_response)}" + ) - Example: - ```python - from openhands.sdk.llm import Message, TextContent + completed_resp = completed_response.response + # Patch empty output with items collected from stream + if not completed_resp.output and collected_output_items: + completed_resp.output = collected_output_items - messages = [Message(role="user", content=[TextContent(text="Hello")])] - response = llm.completion(messages) - print(response.content) - ``` + assert self._telemetry is not None + self._telemetry.on_response(completed_resp) + return completed_resp + + def _prepare_completion_params( + self, + messages: list[Message], + tools: Sequence[ToolDefinition] | None, + add_security_risk_prediction: bool, + kwargs: dict[str, Any], + ) -> tuple[ + list[dict[str, Any]], + list[ChatCompletionToolParam], + bool, + dict[str, Any], + dict[str, Any], + ]: + """Shared setup for :meth:`completion` and :meth:`acompletion`. + + Returns: + (formatted_messages, cc_tools, use_mock_tools, call_kwargs, + telemetry_ctx) """ - enable_streaming = bool(kwargs.get("stream", False)) or self.stream - if enable_streaming: - if on_token is None: - raise ValueError("Streaming requires an on_token callback") - kwargs["stream"] = True + # Defensive copy — this method mutates kwargs (e.g. kwargs["tools"]) + # and the caller should not observe those side-effects. + kwargs = dict(kwargs) # 1) serialize messages formatted_messages = self.format_messages_for_llm(messages) @@ -838,8 +955,9 @@ def completion( call_kwargs = select_chat_options(self, kwargs, has_tools=has_tools_flag) # 4) request context for telemetry (always include context_window for metrics) + # Always pass context_window so metrics are tracked even when + # logging is disabled. assert self._telemetry is not None - # Always pass context_window so metrics are tracked even when logging disabled telemetry_ctx: dict[str, Any] = { "context_window": self.effective_max_input_tokens or 0 } @@ -854,19 +972,186 @@ def completion( if tools and not use_native_fc: telemetry_ctx["raw_messages"] = original_fncall_msgs - # 5) do the call with retries - @self.retry_decorator( - num_retries=self.num_retries, - retry_exceptions=LLM_RETRY_EXCEPTIONS, - retry_min_wait=self.retry_min_wait, - retry_max_wait=self.retry_max_wait, - retry_multiplier=self.retry_multiplier, - retry_listener=self._retry_listener_fn, + return ( + formatted_messages, + cc_tools, + use_mock_tools, + call_kwargs, + telemetry_ctx, + ) + + def _prepare_responses_params( + self, + messages: list[Message], + tools: Sequence[ToolDefinition] | None, + include: list[str] | None, + store: bool | None, + add_security_risk_prediction: bool, + kwargs: dict[str, Any], + ) -> tuple[ + str | None, + list[dict[str, Any]], + list[Any] | None, + dict[str, Any], + dict[str, Any], + ]: + """Shared setup for :meth:`responses` and :meth:`aresponses`. + + Returns: + (instructions, input_items, resp_tools, call_kwargs, + telemetry_ctx) + """ + # Defensive copy — select_responses_options may mutate kwargs. + kwargs = dict(kwargs) + + # Build instructions + input list using dedicated Responses formatter + instructions, input_items = self.format_messages_for_responses(messages) + + # Convert Tool objects to Responses ToolParam + # (Responses path always supports function tools) + resp_tools = ( + [ + t.to_responses_tool( + add_security_risk_prediction=add_security_risk_prediction, + ) + for t in tools + ] + if tools + else None + ) + + # Normalize/override Responses kwargs consistently + call_kwargs = select_responses_options( + self, kwargs, include=include, store=store + ) + + # Request context for telemetry (always include context_window for metrics) + # Always pass context_window so metrics are tracked even when + # logging is disabled. + assert self._telemetry is not None + telemetry_ctx: dict[str, Any] = { + "context_window": self.effective_max_input_tokens or 0 + } + if self._telemetry.log_enabled: + telemetry_ctx.update( + { + "llm_path": "responses", + "instructions": instructions, + "input": input_items[:], + "tools": tools, + "kwargs": {k: v for k, v in call_kwargs.items()}, + } + ) + + return instructions, input_items, resp_tools, call_kwargs, telemetry_ctx + + def _validate_chat_response( + self, + resp: ModelResponse, + *, + use_mock_tools: bool, + formatted_messages: list[dict[str, Any]], + cc_tools: list[ChatCompletionToolParam], + add_security_risk_prediction: bool, + ) -> ModelResponse: + """Post-process a chat completion response inside the retry boundary. + + The raw (pre-mock) response is consumed internally by + ``Telemetry.on_response`` and is not returned to the caller. + + Raises: + LLMNoResponseError: If the response has no choices + (Gemini sometimes returns empty choices; raising here + inside the retry boundary ensures it is retried). + """ + raw_resp: ModelResponse | None = None + if use_mock_tools: + raw_resp = copy.deepcopy(resp) + resp = self.post_response_prompt_mock( + resp, + nonfncall_msgs=formatted_messages, + tools=cc_tools, + include_security_params=add_security_risk_prediction, + ) + + # 6) telemetry + assert self._telemetry is not None + self._telemetry.on_response(resp, raw_resp=raw_resp) + + # Ensure at least one choice. + # Gemini sometimes returns empty choices; we raise LLMNoResponseError here + # inside the retry boundary so it is retried. + if not resp.get("choices") or len(resp["choices"]) < 1: + raise LLMNoResponseError( + "Response choices is less than 1. Response: " + str(resp) + ) + return resp + + # ========================================================================= + # Chat Completion API + # ========================================================================= + + def completion( + self, + messages: list[Message], + tools: Sequence[ToolDefinition] | None = None, + _return_metrics: bool = False, + add_security_risk_prediction: bool = False, + on_token: TokenCallbackType | None = None, + **kwargs, + ) -> LLMResponse: + """Generate a completion from the language model. + + This is the method for getting responses from the model via Completion API. + It handles message formatting, tool calling, and response processing. + + Args: + messages: List of conversation messages. + tools: Optional list of tools available to the model. + _return_metrics: Whether to return usage metrics. + add_security_risk_prediction: Add security_risk field to tool schemas. + on_token: Optional callback for streaming tokens. + **kwargs: Additional arguments passed to the LLM API. + + Returns: + LLMResponse containing the model's response and metadata. + + Note: + Summary field is always added to tool schemas for transparency and + explainability of agent actions. + + Raises: + ValueError: If streaming is requested (not supported). + + Example: + ```python + from openhands.sdk.llm import Message, TextContent + + messages = [Message(role="user", content=[TextContent(text="Hello")])] + response = llm.completion(messages) + print(response.content) + ``` + """ + enable_streaming = bool(kwargs.get("stream", False)) or self.stream + if enable_streaming: + if on_token is None: + raise ValueError("Streaming requires an on_token callback") + kwargs["stream"] = True + + ( + formatted_messages, + cc_tools, + use_mock_tools, + call_kwargs, + telemetry_ctx, + ) = self._prepare_completion_params( + messages, tools, add_security_risk_prediction, kwargs ) - def _one_attempt(**retry_kwargs) -> ModelResponse: + + @self._make_retry_decorator() + def _one_attempt(**retry_kwargs: Any) -> ModelResponse: assert self._telemetry is not None self._telemetry.on_request(telemetry_ctx=telemetry_ctx) - # Merge retry-modified kwargs (like temperature) with call_kwargs final_kwargs = {**call_kwargs, **retry_kwargs} resp = self._transport_call( messages=formatted_messages, @@ -874,47 +1159,17 @@ def _one_attempt(**retry_kwargs) -> ModelResponse: enable_streaming=enable_streaming, on_token=on_token, ) - raw_resp: ModelResponse | None = None - if use_mock_tools: - raw_resp = copy.deepcopy(resp) - resp = self.post_response_prompt_mock( - resp, - nonfncall_msgs=formatted_messages, - tools=cc_tools, - include_security_params=add_security_risk_prediction, - ) - # 6) telemetry - self._telemetry.on_response(resp, raw_resp=raw_resp) - - # Ensure at least one choice. - # Gemini sometimes returns empty choices; we raise LLMNoResponseError here - # inside the retry boundary so it is retried. - if not resp.get("choices") or len(resp["choices"]) < 1: - raise LLMNoResponseError( - "Response choices is less than 1. Response: " + str(resp) - ) - + resp = self._validate_chat_response( + resp, + use_mock_tools=use_mock_tools, + formatted_messages=formatted_messages, + cc_tools=cc_tools, + add_security_risk_prediction=add_security_risk_prediction, + ) return resp try: - resp = _one_attempt() - - # Convert the first choice to an OpenHands Message - first_choice = resp["choices"][0] - message = Message.from_llm_chat_message(first_choice["message"]) - - # Get current metrics snapshot - metrics_snapshot = MetricsSnapshot( - model_name=self.metrics.model_name, - accumulated_cost=self.metrics.accumulated_cost, - max_budget_per_task=self.metrics.max_budget_per_task, - accumulated_token_usage=self.metrics.accumulated_token_usage, - ) - - # Create and return LLMResponse - return LLMResponse( - message=message, metrics=metrics_snapshot, raw_response=resp - ) + return self._build_completion_result(_one_attempt()) except Exception as e: return self._handle_error( e, @@ -950,94 +1205,38 @@ async def acompletion( raise ValueError("Streaming requires an on_token callback") kwargs["stream"] = True - formatted_messages = self.format_messages_for_llm(messages) - - use_native_fc = self.native_tool_calling - original_fncall_msgs = copy.deepcopy(formatted_messages) - - cc_tools: list[ChatCompletionToolParam] = [] - if tools: - cc_tools = [ - t.to_openai_tool( - add_security_risk_prediction=add_security_risk_prediction, - ) - for t in tools - ] + ( + formatted_messages, + cc_tools, + use_mock_tools, + call_kwargs, + telemetry_ctx, + ) = self._prepare_completion_params( + messages, tools, add_security_risk_prediction, kwargs + ) - use_mock_tools = self.should_mock_tool_calls(cc_tools) - if use_mock_tools: - formatted_messages, kwargs = self.pre_request_prompt_mock( - formatted_messages, - cc_tools or [], - kwargs, - include_security_params=add_security_risk_prediction, + @self._make_retry_decorator() + async def _one_attempt(**retry_kwargs: Any) -> ModelResponse: + assert self._telemetry is not None + self._telemetry.on_request(telemetry_ctx=telemetry_ctx) + final_kwargs = {**call_kwargs, **retry_kwargs} + resp = await self._atransport_call( + messages=formatted_messages, + **final_kwargs, + enable_streaming=enable_streaming, + on_token=on_token, ) - - kwargs["tools"] = cc_tools if (bool(cc_tools) and use_native_fc) else None - has_tools_flag = bool(cc_tools) and use_native_fc - call_kwargs = select_chat_options(self, kwargs, has_tools=has_tools_flag) - - assert self._telemetry is not None - telemetry_ctx: dict[str, Any] = { - "context_window": self.effective_max_input_tokens or 0 - } - if self._telemetry.log_enabled: - telemetry_ctx.update( - { - "messages": formatted_messages[:], - "tools": tools, - "kwargs": {k: v for k, v in call_kwargs.items()}, - } + resp = self._validate_chat_response( + resp, + use_mock_tools=use_mock_tools, + formatted_messages=formatted_messages, + cc_tools=cc_tools, + add_security_risk_prediction=add_security_risk_prediction, ) - if tools and not use_native_fc: - telemetry_ctx["raw_messages"] = original_fncall_msgs + return resp try: - resp: ModelResponse | None = None - async for attempt in self.async_retry( - num_retries=self.num_retries, - retry_exceptions=LLM_RETRY_EXCEPTIONS, - retry_min_wait=self.retry_min_wait, - retry_max_wait=self.retry_max_wait, - retry_multiplier=self.retry_multiplier, - retry_listener=self._retry_listener_fn, - ): - with attempt: - assert self._telemetry is not None - self._telemetry.on_request(telemetry_ctx=telemetry_ctx) - resp = await self._atransport_call( - messages=formatted_messages, - **call_kwargs, - enable_streaming=enable_streaming, - on_token=on_token, - ) - raw_resp: ModelResponse | None = None - if use_mock_tools: - raw_resp = copy.deepcopy(resp) - resp = self.post_response_prompt_mock( - resp, - nonfncall_msgs=formatted_messages, - tools=cc_tools, - include_security_params=add_security_risk_prediction, - ) - self._telemetry.on_response(resp, raw_resp=raw_resp) - if not resp.get("choices") or len(resp["choices"]) < 1: - raise LLMNoResponseError( - "Response choices is less than 1. Response: " + str(resp) - ) - - assert resp is not None - first_choice = resp["choices"][0] - message = Message.from_llm_chat_message(first_choice["message"]) - metrics_snapshot = MetricsSnapshot( - model_name=self.metrics.model_name, - accumulated_cost=self.metrics.accumulated_cost, - max_budget_per_task=self.metrics.max_budget_per_task, - accumulated_token_usage=self.metrics.accumulated_token_usage, - ) - return LLMResponse( - message=message, metrics=metrics_snapshot, raw_response=resp - ) + return self._build_completion_result(await _one_attempt()) except Exception as e: # Fallback is synchronous; cast the token callback since the # fallback LLM's sync path accepts TokenCallbackType. @@ -1087,180 +1286,81 @@ def responses( """ user_enable_streaming = bool(kwargs.get("stream", False)) or self.stream if user_enable_streaming: + # We allow on_token to be None for subscription mode if on_token is None and not self.is_subscription: - # We allow on_token to be None for subscription mode raise ValueError("Streaming requires an on_token callback") kwargs["stream"] = True - # Build instructions + input list using dedicated Responses formatter - instructions, input_items = self.format_messages_for_responses(messages) - - # Convert Tool objects to Responses ToolParam - # (Responses path always supports function tools) - resp_tools = ( - [ - t.to_responses_tool( - add_security_risk_prediction=add_security_risk_prediction, - ) - for t in tools - ] - if tools - else None - ) - - # Normalize/override Responses kwargs consistently - call_kwargs = select_responses_options( - self, kwargs, include=include, store=store + ( + instructions, + input_items, + resp_tools, + call_kwargs, + telemetry_ctx, + ) = self._prepare_responses_params( + messages, tools, include, store, add_security_risk_prediction, kwargs ) - # Request context for telemetry (always include context_window for metrics) - assert self._telemetry is not None - # Always pass context_window so metrics are tracked even when logging disabled - telemetry_ctx: dict[str, Any] = { - "context_window": self.effective_max_input_tokens or 0 - } - if self._telemetry.log_enabled: - telemetry_ctx.update( - { - "llm_path": "responses", - "instructions": instructions, - "input": input_items[:], - "tools": tools, - "kwargs": {k: v for k, v in call_kwargs.items()}, - } - ) - - # Perform call with retries - @self.retry_decorator( - num_retries=self.num_retries, - retry_exceptions=LLM_RETRY_EXCEPTIONS, - retry_min_wait=self.retry_min_wait, - retry_max_wait=self.retry_max_wait, - retry_multiplier=self.retry_multiplier, - retry_listener=self._retry_listener_fn, - ) - def _one_attempt(**retry_kwargs) -> ResponsesAPIResponse: + @self._make_retry_decorator() + def _one_attempt(**retry_kwargs: Any) -> ResponsesAPIResponse: assert self._telemetry is not None self._telemetry.on_request(telemetry_ctx=telemetry_ctx) final_kwargs = {**call_kwargs, **retry_kwargs} - with ( - self._litellm_modify_params_ctx(self.modify_params), - warnings.catch_warnings(), - ): - warnings.filterwarnings("ignore", category=DeprecationWarning) - typed_input: ResponseInputParam | str = ( - cast(ResponseInputParam, input_items) if input_items else "" - ) - api_key_value = self._get_litellm_api_key_value() - - ret = litellm_responses( - model=self.model, - input=typed_input, - instructions=instructions, - tools=resp_tools, - api_key=api_key_value, - api_base=self.base_url, - api_version=self.api_version, - timeout=self.timeout, - drop_params=self.drop_params, - seed=self.seed, - **{**self._aws_kwargs(), **final_kwargs}, - ) - if isinstance(ret, ResponsesAPIResponse): - if user_enable_streaming: - logger.warning( - "Responses streaming was requested, but the provider " - "returned a non-streaming response; no on_token deltas " - "will be emitted." - ) - self._telemetry.on_response(ret) - return ret - - # When stream=True, LiteLLM returns a streaming iterator rather than - # a single ResponsesAPIResponse. Drain the iterator and use the - # completed response. - if final_kwargs.get("stream", False): - if not isinstance(ret, SyncResponsesAPIStreamingIterator): - raise AssertionError( - f"Expected Responses stream iterator, got {type(ret)}" - ) + with self._litellm_modify_params_ctx(self.modify_params): + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=DeprecationWarning) + litellm_kwargs = self._build_responses_call_kwargs( + input_items, instructions, resp_tools, final_kwargs + ) + ret = litellm_responses(**litellm_kwargs) + + if isinstance(ret, ResponsesAPIResponse): + if user_enable_streaming: + logger.warning( + "Responses streaming was requested, but the " + "provider returned a non-streaming response; " + "no on_token deltas will be emitted." + ) + self._telemetry.on_response(ret) + return ret + + # When stream=True, LiteLLM returns a streaming + # iterator rather than a single ResponsesAPIResponse. + # Drain the iterator and use the completed response. + if final_kwargs.get("stream", False): + if not isinstance(ret, SyncResponsesAPIStreamingIterator): + raise AssertionError( + f"Expected Responses stream iterator, got {type(ret)}" + ) + stream_callback = on_token if user_enable_streaming else None + # Collect output items from streaming events. + # Some endpoints (e.g., Codex subscription) send + # output items as separate events but the final + # response.completed event has output=[]. We + # accumulate them here and patch the completed + # response if needed. + collected_output_items: list[Any] = [] + for event in ret: + if event is None: + continue + output_item, delta_chunk = self._process_stream_event( + event, emit_deltas=stream_callback is not None + ) + if output_item is not None: + collected_output_items.append(output_item) + if stream_callback is not None and delta_chunk is not None: + stream_callback(delta_chunk) - stream_callback = on_token if user_enable_streaming else None - # Collect output items from streaming events. - # Some endpoints (e.g., Codex subscription) send output - # items as separate events but the final response.completed - # event has output=[]. We accumulate them here and patch - # the completed response if needed. - collected_output_items: list[Any] = [] - for event in ret: - if event is None: - continue - # Collect finished output items - evt_type = getattr(event, "type", None) - if evt_type == ResponsesAPIStreamEvents.OUTPUT_ITEM_DONE: - item = getattr(event, "item", None) - if item is not None: - collected_output_items.append(item) - if stream_callback is None: - continue - if isinstance( - event, - ( - OutputTextDeltaEvent, - RefusalDeltaEvent, - ReasoningSummaryTextDeltaEvent, - ), - ): - delta = event.delta - if delta: - stream_callback( - ModelResponseStream( - choices=[ - StreamingChoices(delta=Delta(content=delta)) - ] - ) - ) - - completed_event = ret.completed_response - if completed_event is None: - raise LLMNoResponseError( - "Responses stream finished without a completed response" - ) - if not isinstance(completed_event, ResponseCompletedEvent): - raise LLMNoResponseError( - f"Unexpected completed event: {type(completed_event)}" + return self._finalize_stream_response( + ret.completed_response, collected_output_items ) - completed_resp = completed_event.response - - # Patch empty output with items collected from stream - if not completed_resp.output and collected_output_items: - completed_resp.output = collected_output_items - - self._telemetry.on_response(completed_resp) - return completed_resp - - raise AssertionError(f"Expected ResponsesAPIResponse, got {type(ret)}") + raise AssertionError( + f"Expected ResponsesAPIResponse, got {type(ret)}" + ) try: - resp: ResponsesAPIResponse = _one_attempt() - - # Parse output -> Message (typed) - # Cast to a typed sequence - # accepted by from_llm_responses_output - output_seq = cast(Sequence[Any], resp.output or []) - message = Message.from_llm_responses_output(output_seq) - - metrics_snapshot = MetricsSnapshot( - model_name=self.metrics.model_name, - accumulated_cost=self.metrics.accumulated_cost, - max_budget_per_task=self.metrics.max_budget_per_task, - accumulated_token_usage=self.metrics.accumulated_token_usage, - ) - - return LLMResponse( - message=message, metrics=metrics_snapshot, raw_response=resp - ) + return self._build_responses_result(_one_attempt()) except Exception as e: return self._handle_error( e, @@ -1296,168 +1396,84 @@ async def aresponses( """ user_enable_streaming = bool(kwargs.get("stream", False)) or self.stream if user_enable_streaming: + # We allow on_token to be None for subscription mode if on_token is None and not self.is_subscription: raise ValueError("Streaming requires an on_token callback") kwargs["stream"] = True - instructions, input_items = self.format_messages_for_responses(messages) - - resp_tools = ( - [ - t.to_responses_tool( - add_security_risk_prediction=add_security_risk_prediction, - ) - for t in tools - ] - if tools - else None - ) - - call_kwargs = select_responses_options( - self, kwargs, include=include, store=store + ( + instructions, + input_items, + resp_tools, + call_kwargs, + telemetry_ctx, + ) = self._prepare_responses_params( + messages, tools, include, store, add_security_risk_prediction, kwargs ) - assert self._telemetry is not None - telemetry_ctx: dict[str, Any] = { - "context_window": self.effective_max_input_tokens or 0 - } - if self._telemetry.log_enabled: - telemetry_ctx.update( - { - "llm_path": "responses", - "instructions": instructions, - "input": input_items[:], - "tools": tools, - "kwargs": {k: v for k, v in call_kwargs.items()}, - } - ) - - try: - completed: ResponsesAPIResponse | None = None - async for attempt in self.async_retry( - num_retries=self.num_retries, - retry_exceptions=LLM_RETRY_EXCEPTIONS, - retry_min_wait=self.retry_min_wait, - retry_max_wait=self.retry_max_wait, - retry_multiplier=self.retry_multiplier, - retry_listener=self._retry_listener_fn, - ): - with attempt: - assert self._telemetry is not None - self._telemetry.on_request(telemetry_ctx=telemetry_ctx) - final_kwargs = {**call_kwargs} - with ( - self._litellm_modify_params_ctx(self.modify_params), - warnings.catch_warnings(), - ): - warnings.filterwarnings("ignore", category=DeprecationWarning) - typed_input: ResponseInputParam | str = ( - cast(ResponseInputParam, input_items) if input_items else "" - ) - api_key_value = self._get_litellm_api_key_value() - - ret = await litellm_aresponses( - model=self.model, - input=typed_input, - instructions=instructions, - tools=resp_tools, - api_key=api_key_value, - api_base=self.base_url, - api_version=self.api_version, - timeout=self.timeout, - drop_params=self.drop_params, - seed=self.seed, - **{**self._aws_kwargs(), **final_kwargs}, - ) - if isinstance(ret, ResponsesAPIResponse): - if user_enable_streaming: - logger.warning( - "Responses streaming was requested, " - "but the provider returned a " - "non-streaming response; no on_token " - "deltas will be emitted." - ) - self._telemetry.on_response(ret) - completed = ret - elif final_kwargs.get("stream", False): - if not isinstance(ret, ResponsesAPIStreamingIterator): - raise AssertionError( - "Expected Responses async stream " - f"iterator, got {type(ret)}" - ) - - stream_cb = on_token if user_enable_streaming else None - collected_output_items: list[Any] = [] - async for event in ret: - if event is None: - continue - evt_type = getattr(event, "type", None) - if ( - evt_type - == ResponsesAPIStreamEvents.OUTPUT_ITEM_DONE - ): - item = getattr(event, "item", None) - if item is not None: - collected_output_items.append(item) - if stream_cb is None: - continue - if not isinstance( - event, - ( - OutputTextDeltaEvent, - RefusalDeltaEvent, - ReasoningSummaryTextDeltaEvent, - ), - ): - continue - if not event.delta: - continue - await _invoke_token_callback( - stream_cb, - ModelResponseStream( - choices=[ - StreamingChoices( - delta=Delta(content=event.delta) - ) - ] - ), - ) - - completed_event = ret.completed_response - if completed_event is None: - raise LLMNoResponseError( - "Responses stream finished without " - "a completed response" - ) - if not isinstance(completed_event, ResponseCompletedEvent): - raise LLMNoResponseError( - "Unexpected completed event: " - f"{type(completed_event)}" - ) - - completed_resp = completed_event.response - if not completed_resp.output and collected_output_items: - completed_resp.output = collected_output_items - - self._telemetry.on_response(completed_resp) - completed = completed_resp - else: + @self._make_retry_decorator() + async def _one_attempt( + **retry_kwargs: Any, + ) -> ResponsesAPIResponse: + assert self._telemetry is not None + self._telemetry.on_request(telemetry_ctx=telemetry_ctx) + final_kwargs = {**call_kwargs, **retry_kwargs} + with self._litellm_modify_params_ctx(self.modify_params): + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=DeprecationWarning) + litellm_kwargs = self._build_responses_call_kwargs( + input_items, instructions, resp_tools, final_kwargs + ) + ret = await litellm_aresponses(**litellm_kwargs) + + if isinstance(ret, ResponsesAPIResponse): + if user_enable_streaming: + logger.warning( + "Responses streaming was requested, but the " + "provider returned a non-streaming response; " + "no on_token deltas will be emitted." + ) + self._telemetry.on_response(ret) + return ret + + # When stream=True, LiteLLM returns a streaming + # iterator rather than a single ResponsesAPIResponse. + # Drain the iterator and use the completed response. + if final_kwargs.get("stream", False): + if not isinstance(ret, ResponsesAPIStreamingIterator): raise AssertionError( - f"Expected ResponsesAPIResponse, got {type(ret)}" + "Expected Responses async stream " + f"iterator, got {type(ret)}" + ) + stream_cb = on_token if user_enable_streaming else None + # Collect output items from streaming events. + # Some endpoints (e.g., Codex subscription) send + # output items as separate events but the final + # response.completed event has output=[]. We + # accumulate them here and patch the completed + # response if needed. + collected_output_items: list[Any] = [] + async for event in ret: + if event is None: + continue + output_item, delta_chunk = self._process_stream_event( + event, emit_deltas=stream_cb is not None ) + if output_item is not None: + collected_output_items.append(output_item) + if stream_cb is not None and delta_chunk is not None: + await _invoke_token_callback(stream_cb, delta_chunk) - assert completed is not None - output_seq = cast(Sequence[Any], completed.output or []) - message = Message.from_llm_responses_output(output_seq) - metrics_snapshot = MetricsSnapshot( - model_name=self.metrics.model_name, - accumulated_cost=self.metrics.accumulated_cost, - max_budget_per_task=self.metrics.max_budget_per_task, - accumulated_token_usage=self.metrics.accumulated_token_usage, - ) - return LLMResponse( - message=message, metrics=metrics_snapshot, raw_response=completed - ) + return self._finalize_stream_response( + ret.completed_response, collected_output_items + ) + + raise AssertionError( + f"Expected ResponsesAPIResponse, got {type(ret)}" + ) + + try: + return self._build_responses_result(await _one_attempt()) except Exception as e: _fb_token = cast("TokenCallbackType | None", on_token) return await self._ahandle_error( @@ -1517,65 +1533,63 @@ def _transport_call( **kwargs, ) -> ModelResponse: # litellm.modify_params is GLOBAL; guard it for thread-safety - with ( - self._litellm_modify_params_ctx(self.modify_params), - warnings.catch_warnings(), - ): - warnings.filterwarnings( - "ignore", category=DeprecationWarning, module="httpx.*" - ) - warnings.filterwarnings( - "ignore", - message=r".*content=.*upload.*", - category=DeprecationWarning, - ) - warnings.filterwarnings( - "ignore", - message=r"There is no current event loop", - category=DeprecationWarning, - ) - warnings.filterwarnings( - "ignore", - category=UserWarning, - ) - warnings.filterwarnings( - "ignore", - category=DeprecationWarning, - message="Accessing the 'model_fields' attribute.*", - ) - api_key_value = self._get_litellm_api_key_value() - - # When streaming, request usage in the final chunk so that - # detailed token breakdowns (prompt_tokens_details with - # cached_tokens, etc.) are not silently discarded by - # litellm's streaming handler. - if enable_streaming: - kwargs.setdefault("stream_options", {"include_usage": True}) - - # Some providers need renames handled in _normalize_call_kwargs. - ret = litellm_completion( - model=self.model, - api_key=api_key_value, - api_base=self.base_url, - api_version=self.api_version, - timeout=self.timeout, - drop_params=self.drop_params, - seed=self.seed, - messages=messages, - **{**self._aws_kwargs(), **kwargs}, - ) - if enable_streaming and on_token is not None: - assert isinstance(ret, CustomStreamWrapper) - chunks = [] - for chunk in ret: - on_token(chunk) - chunks.append(chunk) - ret = litellm.stream_chunk_builder(chunks, messages=messages) - - assert isinstance(ret, ModelResponse), ( - f"Expected ModelResponse, got {type(ret)}" - ) - return ret + with self._litellm_modify_params_ctx(self.modify_params): + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", category=DeprecationWarning, module="httpx.*" + ) + warnings.filterwarnings( + "ignore", + message=r".*content=.*upload.*", + category=DeprecationWarning, + ) + warnings.filterwarnings( + "ignore", + message=r"There is no current event loop", + category=DeprecationWarning, + ) + warnings.filterwarnings( + "ignore", + category=UserWarning, + ) + warnings.filterwarnings( + "ignore", + category=DeprecationWarning, + message="Accessing the 'model_fields' attribute.*", + ) + api_key_value = self._get_litellm_api_key_value() + + # When streaming, request usage in the final chunk so that + # detailed token breakdowns (prompt_tokens_details with + # cached_tokens, etc.) are not silently discarded by + # litellm's streaming handler. + if enable_streaming: + kwargs.setdefault("stream_options", {"include_usage": True}) + + # Some providers need renames handled in _normalize_call_kwargs. + ret = litellm_completion( + model=self.model, + api_key=api_key_value, + api_base=self.base_url, + api_version=self.api_version, + timeout=self.timeout, + drop_params=self.drop_params, + seed=self.seed, + messages=messages, + **{**self._aws_kwargs(), **kwargs}, + ) + if enable_streaming and on_token is not None: + assert isinstance(ret, CustomStreamWrapper) + chunks = [] + for chunk in ret: + on_token(chunk) + chunks.append(chunk) + ret = litellm.stream_chunk_builder(chunks, messages=messages) + + assert isinstance(ret, ModelResponse), ( + f"Expected ModelResponse, got {type(ret)}" + ) + return ret async def _atransport_call( self, @@ -1586,57 +1600,55 @@ async def _atransport_call( **kwargs, ) -> ModelResponse: """Async variant of :meth:`_transport_call`.""" - with ( - self._litellm_modify_params_ctx(self.modify_params), - warnings.catch_warnings(), - ): - warnings.filterwarnings( - "ignore", category=DeprecationWarning, module="httpx.*" - ) - warnings.filterwarnings( - "ignore", - message=r".*content=.*upload.*", - category=DeprecationWarning, - ) - warnings.filterwarnings( - "ignore", - message=r"There is no current event loop", - category=DeprecationWarning, - ) - warnings.filterwarnings("ignore", category=UserWarning) - warnings.filterwarnings( - "ignore", - category=DeprecationWarning, - message="Accessing the 'model_fields' attribute.*", - ) - api_key_value = self._get_litellm_api_key_value() - - if enable_streaming: - kwargs.setdefault("stream_options", {"include_usage": True}) - - ret = await litellm_acompletion( - model=self.model, - api_key=api_key_value, - api_base=self.base_url, - api_version=self.api_version, - timeout=self.timeout, - drop_params=self.drop_params, - seed=self.seed, - messages=messages, - **{**self._aws_kwargs(), **kwargs}, - ) - if enable_streaming and on_token is not None: - assert isinstance(ret, CustomStreamWrapper) - chunks = [] - async for chunk in ret: - await _invoke_token_callback(on_token, chunk) - chunks.append(chunk) - ret = litellm.stream_chunk_builder(chunks, messages=messages) - - assert isinstance(ret, ModelResponse), ( - f"Expected ModelResponse, got {type(ret)}" - ) - return ret + with self._litellm_modify_params_ctx(self.modify_params): + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", category=DeprecationWarning, module="httpx.*" + ) + warnings.filterwarnings( + "ignore", + message=r".*content=.*upload.*", + category=DeprecationWarning, + ) + warnings.filterwarnings( + "ignore", + message=r"There is no current event loop", + category=DeprecationWarning, + ) + warnings.filterwarnings("ignore", category=UserWarning) + warnings.filterwarnings( + "ignore", + category=DeprecationWarning, + message="Accessing the 'model_fields' attribute.*", + ) + api_key_value = self._get_litellm_api_key_value() + + if enable_streaming: + kwargs.setdefault("stream_options", {"include_usage": True}) + + ret = await litellm_acompletion( + model=self.model, + api_key=api_key_value, + api_base=self.base_url, + api_version=self.api_version, + timeout=self.timeout, + drop_params=self.drop_params, + seed=self.seed, + messages=messages, + **{**self._aws_kwargs(), **kwargs}, + ) + if enable_streaming and on_token is not None: + assert isinstance(ret, CustomStreamWrapper) + chunks = [] + async for chunk in ret: + await _invoke_token_callback(on_token, chunk) + chunks.append(chunk) + ret = litellm.stream_chunk_builder(chunks, messages=messages) + + assert isinstance(ret, ModelResponse), ( + f"Expected ModelResponse, got {type(ret)}" + ) + return ret @contextmanager def _litellm_modify_params_ctx(self, flag: bool): diff --git a/openhands-sdk/openhands/sdk/llm/utils/retry_mixin.py b/openhands-sdk/openhands/sdk/llm/utils/retry_mixin.py index 768664703e..db39ddd387 100644 --- a/openhands-sdk/openhands/sdk/llm/utils/retry_mixin.py +++ b/openhands-sdk/openhands/sdk/llm/utils/retry_mixin.py @@ -2,7 +2,6 @@ from typing import Any, cast from tenacity import ( - AsyncRetrying, RetryCallState, retry, retry_if_exception_type, @@ -94,37 +93,6 @@ def retry_decorator( ) return retry_decorator - def async_retry( - self, - num_retries: int = 5, - retry_exceptions: tuple[type[BaseException], ...] = (LLMNoResponseError,), - retry_min_wait: int = 8, - retry_max_wait: int = 64, - retry_multiplier: float = 2.0, - retry_listener: RetryListener | None = None, - ) -> AsyncRetrying: - """Return an ``AsyncRetrying`` instance for use in ``async for`` blocks. - - Usage:: - - async for attempt in self.async_retry(...): - with attempt: - result = await _do_work() - """ - before_sleep = self._build_before_sleep(num_retries, retry_listener) - - return AsyncRetrying( - before_sleep=before_sleep, - stop=stop_after_attempt(num_retries), - reraise=True, - retry=retry_if_exception_type(retry_exceptions), - wait=wait_exponential( - multiplier=retry_multiplier, - min=retry_min_wait, - max=retry_max_wait, - ), - ) - def log_retry_attempt(self, retry_state: RetryCallState) -> None: """Log retry attempts.""" diff --git a/tests/sdk/llm/test_llm_no_response_retry.py b/tests/sdk/llm/test_llm_no_response_retry.py index c8f5809554..96050ad007 100644 --- a/tests/sdk/llm/test_llm_no_response_retry.py +++ b/tests/sdk/llm/test_llm_no_response_retry.py @@ -1,7 +1,13 @@ -from unittest.mock import patch +from unittest.mock import AsyncMock, patch import pytest +from litellm.responses.streaming_iterator import ( + ResponsesAPIStreamingIterator, + SyncResponsesAPIStreamingIterator, +) +from litellm.types.llms.openai import ResponsesAPIResponse from litellm.types.utils import Choices, Message as LiteLLMMessage, ModelResponse, Usage +from openai.types.responses import ResponseOutputMessage, ResponseOutputText from pydantic import SecretStr from openhands.sdk.llm import LLM, LLMResponse, Message, TextContent @@ -106,3 +112,349 @@ def test_no_response_retry_bumps_temperature(mock_completion, base_llm: LLM) -> # Grab kwargs from the second call _, second_kwargs = mock_completion.call_args_list[1] assert second_kwargs.get("temperature") == 1.0 + + +# ------------------------------------------------------------------ +# Async acompletion tests +# ------------------------------------------------------------------ + + +@pytest.mark.asyncio +@patch( + "openhands.sdk.llm.llm.litellm_acompletion", + new_callable=AsyncMock, +) +async def test_async_no_response_retry_bumps_temperature( + mock_acompletion: AsyncMock, base_llm: LLM +) -> None: + """Async acompletion must apply the temperature bump on retry (B2 regression).""" + assert base_llm.temperature == 0.0 + + mock_acompletion.side_effect = [ + create_empty_choices_response("empty-1"), + create_mock_response("ok"), + ] + + resp = await base_llm.acompletion( + messages=[Message(role="user", content=[TextContent(text="hi")])] + ) + + assert isinstance(resp, LLMResponse) + assert mock_acompletion.call_count == 2 + _, second_kwargs = mock_acompletion.call_args_list[1] + assert second_kwargs.get("temperature") == 1.0 + + +@pytest.mark.asyncio +@patch( + "openhands.sdk.llm.llm.litellm_acompletion", + new_callable=AsyncMock, +) +async def test_async_no_response_retries_then_succeeds( + mock_acompletion: AsyncMock, base_llm: LLM +) -> None: + mock_acompletion.side_effect = [ + create_empty_choices_response("empty-1"), + create_mock_response("success"), + ] + + resp = await base_llm.acompletion( + messages=[Message(role="user", content=[TextContent(text="hi")])] + ) + + assert isinstance(resp, LLMResponse) + assert resp.message is not None + assert mock_acompletion.call_count == 2 + + +@pytest.mark.asyncio +@patch( + "openhands.sdk.llm.llm.litellm_acompletion", + new_callable=AsyncMock, +) +async def test_async_no_response_exhausts_retries( + mock_acompletion: AsyncMock, base_llm: LLM +) -> None: + mock_acompletion.side_effect = [ + create_empty_choices_response("empty-1"), + create_empty_choices_response("empty-2"), + ] + + with pytest.raises(LLMNoResponseError): + await base_llm.acompletion( + messages=[Message(role="user", content=[TextContent(text="hi")])] + ) + + assert mock_acompletion.call_count == base_llm.num_retries + + +# ------------------------------------------------------------------ +# Async aresponses tests +# ------------------------------------------------------------------ + + +def create_mock_responses_api_response( + text: str = "ok", +) -> ResponsesAPIResponse: + return ResponsesAPIResponse( + id="resp-1", + created_at=1, + output=[ + ResponseOutputMessage( + id="msg-1", + type="message", + role="assistant", + status="completed", + content=[ + ResponseOutputText(type="output_text", text=text, annotations=[]) + ], + ) + ], + model="gpt-4o", + object="response", + ) + + +@pytest.mark.asyncio +@patch( + "openhands.sdk.llm.llm.litellm_aresponses", + new_callable=AsyncMock, +) +async def test_async_aresponses_retry_bumps_temperature( + mock_aresponses: AsyncMock, base_llm: LLM +) -> None: + """aresponses must apply the temperature bump on retry (B2 regression).""" + assert base_llm.temperature == 0.0 + + mock_aresponses.side_effect = [ + LLMNoResponseError("empty response"), + create_mock_responses_api_response("ok"), + ] + + resp = await base_llm.aresponses( + messages=[Message(role="user", content=[TextContent(text="hi")])] + ) + + assert isinstance(resp, LLMResponse) + assert mock_aresponses.call_count == 2 + _, second_kwargs = mock_aresponses.call_args_list[1] + assert second_kwargs.get("temperature") == 1.0 + + +@pytest.mark.asyncio +@patch( + "openhands.sdk.llm.llm.litellm_aresponses", + new_callable=AsyncMock, +) +async def test_async_aresponses_retries_then_succeeds( + mock_aresponses: AsyncMock, base_llm: LLM +) -> None: + mock_aresponses.side_effect = [ + LLMNoResponseError("empty response"), + create_mock_responses_api_response("success"), + ] + + resp = await base_llm.aresponses( + messages=[Message(role="user", content=[TextContent(text="hi")])] + ) + + assert isinstance(resp, LLMResponse) + assert resp.message is not None + assert mock_aresponses.call_count == 2 + + +@pytest.mark.asyncio +@patch( + "openhands.sdk.llm.llm.litellm_aresponses", + new_callable=AsyncMock, +) +async def test_async_aresponses_exhausts_retries( + mock_aresponses: AsyncMock, base_llm: LLM +) -> None: + mock_aresponses.side_effect = [ + LLMNoResponseError("empty-1"), + LLMNoResponseError("empty-2"), + ] + + with pytest.raises(LLMNoResponseError): + await base_llm.aresponses( + messages=[Message(role="user", content=[TextContent(text="hi")])] + ) + + assert mock_aresponses.call_count == base_llm.num_retries + + +# ------------------------------------------------------------------ +# Sync responses tests (exercise the shared helper extraction) +# ------------------------------------------------------------------ + + +@patch("openhands.sdk.llm.llm.litellm_responses") +def test_responses_retry_bumps_temperature(mock_responses, base_llm: LLM) -> None: + """Sync responses must apply the temperature bump on retry after the + _prepare_responses_params / _build_responses_call_kwargs extraction.""" + assert base_llm.temperature == 0.0 + + mock_responses.side_effect = [ + LLMNoResponseError("empty response"), + create_mock_responses_api_response("ok"), + ] + + resp = base_llm.responses( + messages=[Message(role="user", content=[TextContent(text="hi")])] + ) + + assert isinstance(resp, LLMResponse) + assert mock_responses.call_count == 2 + _, second_kwargs = mock_responses.call_args_list[1] + assert second_kwargs.get("temperature") == 1.0 + + +@patch("openhands.sdk.llm.llm.litellm_responses") +def test_responses_retries_then_succeeds(mock_responses, base_llm: LLM) -> None: + mock_responses.side_effect = [ + LLMNoResponseError("empty response"), + create_mock_responses_api_response("success"), + ] + + resp = base_llm.responses( + messages=[Message(role="user", content=[TextContent(text="hi")])] + ) + + assert isinstance(resp, LLMResponse) + assert resp.message is not None + assert mock_responses.call_count == 2 + + +# ------------------------------------------------------------------ +# Streaming-path retry tests (stream completes without ResponseCompletedEvent) +# ------------------------------------------------------------------ + + +class _FakeSyncStreamIterator(SyncResponsesAPIStreamingIterator): + """Minimal sync stream iterator for testing stream-path failures. + + Mirrors :class:`_FakeAsyncStreamIterator` for the synchronous + ``responses`` path: inherits from ``SyncResponsesAPIStreamingIterator`` + so it passes the ``isinstance`` check inside ``responses._one_attempt``, + but skips the heavyweight parent ``__init__``. + """ + + def __init__( + self, + events: list, + completed_response=None, + ) -> None: + # Intentionally skip parent __init__; we only need iteration + # and the completed_response attribute. + self._events = list(events) + self.completed_response = completed_response + + def __iter__(self): + return self + + def __next__(self): + if not self._events: + raise StopIteration + return self._events.pop(0) + + +@patch("openhands.sdk.llm.llm.litellm_responses") +def test_responses_stream_path_retry_bumps_temperature(mock_responses) -> None: + """Sync streaming counterpart of the aresponses stream-path test: an + iterator that ends without a completed event raises LLMNoResponseError + inside the retry boundary, and the retry bumps temperature 0→1.0.""" + streaming_llm = LLM( + usage_id="test-stream-sync", + model="gpt-4o", + api_key=SecretStr("test_key"), + num_retries=2, + retry_min_wait=0, + retry_max_wait=0, + temperature=0.0, + stream=True, + ) + + mock_responses.side_effect = [ + _FakeSyncStreamIterator(events=[], completed_response=None), + create_mock_responses_api_response("ok"), + ] + + resp = streaming_llm.responses( + messages=[Message(role="user", content=[TextContent(text="hi")])], + on_token=lambda _chunk: None, + ) + + assert isinstance(resp, LLMResponse) + assert mock_responses.call_count == 2 + _, second_kwargs = mock_responses.call_args_list[1] + assert second_kwargs.get("temperature") == 1.0 + + +class _FakeAsyncStreamIterator(ResponsesAPIStreamingIterator): + """Minimal async stream iterator for testing stream-path failures. + + Inherits from ``ResponsesAPIStreamingIterator`` so it passes the + ``isinstance`` check inside ``aresponses._one_attempt``, but skips + the heavyweight parent ``__init__``. + """ + + def __init__( + self, + events: list, + completed_response=None, + ) -> None: + # Intentionally skip parent __init__; we only need iteration + # and the completed_response attribute. + self._events = list(events) + self.completed_response = completed_response + + def __aiter__(self): + return self + + async def __anext__(self): + if not self._events: + raise StopAsyncIteration + return self._events.pop(0) + + +@pytest.mark.asyncio +@patch( + "openhands.sdk.llm.llm.litellm_aresponses", + new_callable=AsyncMock, +) +async def test_async_aresponses_stream_path_retry_bumps_temperature( + mock_aresponses: AsyncMock, +) -> None: + """When a streaming response has no completed event, _finalize_stream_response + raises LLMNoResponseError inside the retry boundary. On retry the temperature + should be bumped from 0→1.0, just like the non-streaming path. + """ + streaming_llm = LLM( + usage_id="test-stream", + model="gpt-4o", + api_key=SecretStr("test_key"), + num_retries=2, + retry_min_wait=0, + retry_max_wait=0, + temperature=0.0, + stream=True, + ) + + # First call: return an iterator that ends without a completed event + # → _finalize_stream_response raises LLMNoResponseError + # Second call: return a valid non-streaming response (fast path) + mock_aresponses.side_effect = [ + _FakeAsyncStreamIterator(events=[], completed_response=None), + create_mock_responses_api_response("ok"), + ] + + resp = await streaming_llm.aresponses( + messages=[Message(role="user", content=[TextContent(text="hi")])], + on_token=lambda _chunk: None, + ) + + assert isinstance(resp, LLMResponse) + assert mock_aresponses.call_count == 2 + _, second_kwargs = mock_aresponses.call_args_list[1] + assert second_kwargs.get("temperature") == 1.0