diff --git a/src/openai/_streaming.py b/src/openai/_streaming.py index 45c13cc11d..eb065a8d82 100644 --- a/src/openai/_streaming.py +++ b/src/openai/_streaming.py @@ -57,10 +57,12 @@ def __stream__(self) -> Iterator[_T]: response = self.response process_data = self._client._process_response_data iterator = self._iter_events() + done_seen = False try: for sse in iterator: if sse.data.startswith("[DONE]"): + done_seen = True break # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data @@ -106,7 +108,19 @@ def __stream__(self) -> Iterator[_T]: response=response, ) finally: - # Ensure the response is closed even if the consumer doesn't read all data + # Drain remaining events from the iterator so that the underlying + # response.iter_bytes() is fully consumed, including the HTTP/1.1 + # chunked transfer-encoding terminator (0\r\n\r\n). Without this, + # h11's their_state won't advance to DONE, causing httpcore to + # destroy the connection (TCP FIN) instead of returning it to the pool. + # See: https://github.com/openai/openai-python/issues/3440 + # + # Only drain when [DONE] was seen. On error paths (APIError, + # parse failures) we close immediately to avoid hanging on a + # stalled stream. + if done_seen: + for _ in iterator: + pass response.close() def __enter__(self) -> Self: @@ -167,10 +181,12 @@ async def __stream__(self) -> AsyncIterator[_T]: response = self.response process_data = self._client._process_response_data iterator = self._iter_events() + done_seen = False try: async for sse in iterator: if sse.data.startswith("[DONE]"): + done_seen = True break # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data @@ -216,7 +232,15 @@ async def __stream__(self) -> AsyncIterator[_T]: response=response, ) finally: - # Ensure the response is closed even if the consumer doesn't read all data + # Drain remaining events so the chunked terminator is consumed before close. + # See: https://github.com/openai/openai-python/issues/3440 + # + # Only drain when [DONE] was seen. On error paths (APIError, + # parse failures) we close immediately to avoid hanging on a + # stalled stream. + if done_seen: + async for _ in iterator: + pass await response.aclose() async def __aenter__(self) -> Self: diff --git a/src/openai/lib/_parsing/_responses.py b/src/openai/lib/_parsing/_responses.py index 232718cef6..ad2297e249 100644 --- a/src/openai/lib/_parsing/_responses.py +++ b/src/openai/lib/_parsing/_responses.py @@ -58,7 +58,7 @@ def parse_response( ) -> ParsedResponse[TextFormatT]: output_list: List[ParsedResponseOutputItem[TextFormatT]] = [] - for output in response.output: + for output in (response.output or []): if output.type == "message": content_list: List[ParsedContent[TextFormatT]] = [] for item in output.content: diff --git a/tests/lib/test_parsing_responses.py b/tests/lib/test_parsing_responses.py new file mode 100644 index 0000000000..aab8d66567 --- /dev/null +++ b/tests/lib/test_parsing_responses.py @@ -0,0 +1,87 @@ +"""Tests for parse_response handling of null/None output fields.""" + +from __future__ import annotations + +from openai._models import construct_type_unchecked +from openai._types import Omit +from openai.lib._parsing._responses import parse_response +from openai.types.responses import Response, ParsedResponse + + +def _make_response(output=None, **kwargs): + """Helper to construct a Response with a given output field.""" + base = { + "id": "resp_test123", + "created_at": 1234567890.0, + "model": "gpt-4o", + "object": "response", + "status": "completed", + "output": output, + "parallel_tool_calls": True, + "tool_choice": "auto", + "tools": [], + "temperature": 1.0, + "top_p": 1.0, + } + base.update(kwargs) + return construct_type_unchecked(type_=Response, value=base) + + +def test_parse_response_with_none_output(): + """Test that parse_response handles null output without crashing.""" + response = _make_response(output=None) + assert response.output is None + + result = parse_response( + text_format=None, + input_tools=None, + response=response, + ) + + assert isinstance(result, ParsedResponse) + assert result.output == [] + + +def test_parse_response_with_empty_list_output(): + """Test that parse_response handles empty list output correctly.""" + response = _make_response(output=[]) + assert response.output == [] + + result = parse_response( + text_format=None, + input_tools=None, + response=response, + ) + + assert isinstance(result, ParsedResponse) + assert result.output == [] + + +def test_parse_response_with_message_output(): + """Test that parse_response still works correctly with actual output items.""" + output_data = [ + { + "id": "msg_test123", + "type": "message", + "status": "completed", + "role": "assistant", + "content": [ + { + "type": "output_text", + "text": "Hello, world!", + "annotations": [], + } + ], + } + ] + response = _make_response(output=output_data) + + result = parse_response( + text_format=Omit(), + input_tools=None, + response=response, + ) + + assert isinstance(result, ParsedResponse) + assert len(result.output) == 1 + assert result.output[0].type == "message" \ No newline at end of file diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 04f8e51abd..dcdcf8962a 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -246,3 +246,72 @@ def make_event_iterator( return AsyncStream( cast_to=object, client=async_client, response=httpx.Response(200, content=to_aiter(content)) )._iter_events() + + +@pytest.mark.asyncio +@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"]) +async def test_stream_drains_before_close(sync: bool, client: OpenAI, async_client: AsyncOpenAI) -> None: + """Regression test for https://github.com/openai/openai-python/issues/3440 + + After [DONE], the response stream should be fully drained (including the + chunked transfer-encoding terminator) before close() is called. + """ + chunks = [ + b'data: {"choices":[{"delta":{"content":"hi"}}]}\n\n', + b'data: [DONE]\n\n', + ] + + if sync: + response = httpx.Response(200, content=iter(chunks)) + stream = Stream(cast_to=object, client=client, response=response) + for _ in stream: + pass + assert response.is_stream_consumed + else: + response = httpx.Response(200, content=to_aiter(iter(chunks))) + stream = AsyncStream(cast_to=object, client=async_client, response=response) + async for _ in stream: + pass + assert response.is_stream_consumed + + +@pytest.mark.asyncio +@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"]) +async def test_stream_no_drain_on_error(sync: bool, client: OpenAI, async_client: AsyncOpenAI) -> None: + """When an error occurs before [DONE], the drain loop should be skipped. + + This prevents hanging on stalled streams during error paths. + """ + stall_reached = False + dummy_request = httpx.Request("POST", "https://example.com/v1/chat/completions") + + def body() -> Iterator[bytes]: + nonlocal stall_reached + yield b'data: {"error":{"message":"bad request"}}\n\n' + # If drain loop runs, it will hang here + stall_reached = True + while True: + yield b"" + + if sync: + response = httpx.Response(200, content=body(), request=dummy_request) + stream = Stream(cast_to=object, client=client, response=response) + with pytest.raises(Exception, match="bad request"): + for _ in stream: + pass + assert not stall_reached + else: + + async def abody() -> AsyncIterator[bytes]: + nonlocal stall_reached + yield b'data: {"error":{"message":"bad request"}}\n\n' + stall_reached = True + while True: + yield b"" + + response = httpx.Response(200, content=abody(), request=dummy_request) + stream = AsyncStream(cast_to=object, client=async_client, response=response) + with pytest.raises(Exception, match="bad request"): + async for _ in stream: + pass + assert not stall_reached