From d91d0d75c2846f56c73fcec66923ea5f26974991 Mon Sep 17 00:00:00 2001 From: C1-BA-B1-F3 Date: Fri, 26 Jun 2026 03:15:19 +0800 Subject: [PATCH 1/3] fix: handle null response.output in parse_response When a backend sends 'response.output: null' in a response.completed event (e.g., chatgpt.com Codex backend), parse_response would crash with TypeError: 'NoneType' object is not iterable. This one-line fix coerces None to an empty list, allowing the stream to complete gracefully. Consumers that track output_item.done events can still backfill from their collected items. Fixes #3325 --- src/openai/lib/_parsing/_responses.py | 2 +- tests/lib/test_parsing_responses.py | 87 +++++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 1 deletion(-) create mode 100644 tests/lib/test_parsing_responses.py diff --git a/src/openai/lib/_parsing/_responses.py b/src/openai/lib/_parsing/_responses.py index 232718cef6..ad2297e249 100644 --- a/src/openai/lib/_parsing/_responses.py +++ b/src/openai/lib/_parsing/_responses.py @@ -58,7 +58,7 @@ def parse_response( ) -> ParsedResponse[TextFormatT]: output_list: List[ParsedResponseOutputItem[TextFormatT]] = [] - for output in response.output: + for output in (response.output or []): if output.type == "message": content_list: List[ParsedContent[TextFormatT]] = [] for item in output.content: diff --git a/tests/lib/test_parsing_responses.py b/tests/lib/test_parsing_responses.py new file mode 100644 index 0000000000..aab8d66567 --- /dev/null +++ b/tests/lib/test_parsing_responses.py @@ -0,0 +1,87 @@ +"""Tests for parse_response handling of null/None output fields.""" + +from __future__ import annotations + +from openai._models import construct_type_unchecked +from openai._types import Omit +from openai.lib._parsing._responses import parse_response +from openai.types.responses import Response, ParsedResponse + + +def _make_response(output=None, **kwargs): + """Helper to construct a Response with a given output field.""" + base = { + "id": "resp_test123", + "created_at": 1234567890.0, + "model": "gpt-4o", + "object": "response", + "status": "completed", + "output": output, + "parallel_tool_calls": True, + "tool_choice": "auto", + "tools": [], + "temperature": 1.0, + "top_p": 1.0, + } + base.update(kwargs) + return construct_type_unchecked(type_=Response, value=base) + + +def test_parse_response_with_none_output(): + """Test that parse_response handles null output without crashing.""" + response = _make_response(output=None) + assert response.output is None + + result = parse_response( + text_format=None, + input_tools=None, + response=response, + ) + + assert isinstance(result, ParsedResponse) + assert result.output == [] + + +def test_parse_response_with_empty_list_output(): + """Test that parse_response handles empty list output correctly.""" + response = _make_response(output=[]) + assert response.output == [] + + result = parse_response( + text_format=None, + input_tools=None, + response=response, + ) + + assert isinstance(result, ParsedResponse) + assert result.output == [] + + +def test_parse_response_with_message_output(): + """Test that parse_response still works correctly with actual output items.""" + output_data = [ + { + "id": "msg_test123", + "type": "message", + "status": "completed", + "role": "assistant", + "content": [ + { + "type": "output_text", + "text": "Hello, world!", + "annotations": [], + } + ], + } + ] + response = _make_response(output=output_data) + + result = parse_response( + text_format=Omit(), + input_tools=None, + response=response, + ) + + assert isinstance(result, ParsedResponse) + assert len(result.output) == 1 + assert result.output[0].type == "message" \ No newline at end of file From 7dc030f8de39bf1b874ebaeb22f85d4ca59294e8 Mon Sep 17 00:00:00 2001 From: C1-BA-B1-F3 Date: Fri, 26 Jun 2026 03:29:38 +0800 Subject: [PATCH 2/3] fix: drain remaining SSE events after [DONE] to prevent TCP FIN After receiving the [DONE] SSE event, Stream.__stream__ and AsyncStream.__stream__ now drain remaining events from the iterator before calling response.close(). This ensures the HTTP/1.1 chunked transfer-encoding terminator (0\r\n\r\n) is consumed, allowing h11's their_state to advance to DONE so httpcore returns the connection to the pool gracefully instead of destroying it with TCP FIN. Regression from 6132922c which removed the drain. Fixes #3440 --- src/openai/_streaming.py | 14 ++++++++++++-- tests/test_streaming.py | 27 +++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/src/openai/_streaming.py b/src/openai/_streaming.py index 45c13cc11d..2251792ae7 100644 --- a/src/openai/_streaming.py +++ b/src/openai/_streaming.py @@ -106,7 +106,14 @@ def __stream__(self) -> Iterator[_T]: response=response, ) finally: - # Ensure the response is closed even if the consumer doesn't read all data + # Drain remaining events from the iterator so that the underlying + # response.iter_bytes() is fully consumed, including the HTTP/1.1 + # chunked transfer-encoding terminator (0\r\n\r\n). Without this, + # h11's their_state won't advance to DONE, causing httpcore to + # destroy the connection (TCP FIN) instead of returning it to the pool. + # See: https://github.com/openai/openai-python/issues/3440 + for _ in iterator: + pass response.close() def __enter__(self) -> Self: @@ -216,7 +223,10 @@ async def __stream__(self) -> AsyncIterator[_T]: response=response, ) finally: - # Ensure the response is closed even if the consumer doesn't read all data + # Drain remaining events so the chunked terminator is consumed before close. + # See: https://github.com/openai/openai-python/issues/3440 + async for _ in iterator: + pass await response.aclose() async def __aenter__(self) -> Self: diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 04f8e51abd..c4f3399627 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -246,3 +246,30 @@ def make_event_iterator( return AsyncStream( cast_to=object, client=async_client, response=httpx.Response(200, content=to_aiter(content)) )._iter_events() + + +@pytest.mark.asyncio +@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"]) +async def test_stream_drains_before_close(sync: bool, client: OpenAI, async_client: AsyncOpenAI) -> None: + """Regression test for https://github.com/openai/openai-python/issues/3440 + + After [DONE], the response stream should be fully drained (including the + chunked transfer-encoding terminator) before close() is called. + """ + chunks = [ + b'data: {"choices":[{"delta":{"content":"hi"}}]}\n\n', + b'data: [DONE]\n\n', + ] + + if sync: + response = httpx.Response(200, content=iter(chunks)) + stream = Stream(cast_to=object, client=client, response=response) + for _ in stream: + pass + assert response.is_stream_consumed + else: + response = httpx.Response(200, content=to_aiter(iter(chunks))) + stream = AsyncStream(cast_to=object, client=async_client, response=response) + async for _ in stream: + pass + assert response.is_stream_consumed From 3cf58feceea69dfaff4b26fd226e0c70efb163f1 Mon Sep 17 00:00:00 2001 From: C1-BA-B1-F3 Date: Fri, 26 Jun 2026 06:31:24 +0800 Subject: [PATCH 3/3] fix: gate drain loop on done_seen flag Address PR review feedback: only drain remaining SSE events after [DONE] is received. Error paths (APIError, parse failures) now close the response immediately without waiting for a stalled stream. - Added done_seen flag to Stream.__stream__ and AsyncStream.__stream__ - Set done_seen = True only when [DONE] is actually received - Drain loop now gated behind if done_seen - Added test_stream_no_drain_on_error regression test --- src/openai/_streaming.py | 22 +++++++++++++++++---- tests/test_streaming.py | 42 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 4 deletions(-) diff --git a/src/openai/_streaming.py b/src/openai/_streaming.py index 2251792ae7..eb065a8d82 100644 --- a/src/openai/_streaming.py +++ b/src/openai/_streaming.py @@ -57,10 +57,12 @@ def __stream__(self) -> Iterator[_T]: response = self.response process_data = self._client._process_response_data iterator = self._iter_events() + done_seen = False try: for sse in iterator: if sse.data.startswith("[DONE]"): + done_seen = True break # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data @@ -112,8 +114,13 @@ def __stream__(self) -> Iterator[_T]: # h11's their_state won't advance to DONE, causing httpcore to # destroy the connection (TCP FIN) instead of returning it to the pool. # See: https://github.com/openai/openai-python/issues/3440 - for _ in iterator: - pass + # + # Only drain when [DONE] was seen. On error paths (APIError, + # parse failures) we close immediately to avoid hanging on a + # stalled stream. + if done_seen: + for _ in iterator: + pass response.close() def __enter__(self) -> Self: @@ -174,10 +181,12 @@ async def __stream__(self) -> AsyncIterator[_T]: response = self.response process_data = self._client._process_response_data iterator = self._iter_events() + done_seen = False try: async for sse in iterator: if sse.data.startswith("[DONE]"): + done_seen = True break # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data @@ -225,8 +234,13 @@ async def __stream__(self) -> AsyncIterator[_T]: finally: # Drain remaining events so the chunked terminator is consumed before close. # See: https://github.com/openai/openai-python/issues/3440 - async for _ in iterator: - pass + # + # Only drain when [DONE] was seen. On error paths (APIError, + # parse failures) we close immediately to avoid hanging on a + # stalled stream. + if done_seen: + async for _ in iterator: + pass await response.aclose() async def __aenter__(self) -> Self: diff --git a/tests/test_streaming.py b/tests/test_streaming.py index c4f3399627..dcdcf8962a 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -273,3 +273,45 @@ async def test_stream_drains_before_close(sync: bool, client: OpenAI, async_clie async for _ in stream: pass assert response.is_stream_consumed + + +@pytest.mark.asyncio +@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"]) +async def test_stream_no_drain_on_error(sync: bool, client: OpenAI, async_client: AsyncOpenAI) -> None: + """When an error occurs before [DONE], the drain loop should be skipped. + + This prevents hanging on stalled streams during error paths. + """ + stall_reached = False + dummy_request = httpx.Request("POST", "https://example.com/v1/chat/completions") + + def body() -> Iterator[bytes]: + nonlocal stall_reached + yield b'data: {"error":{"message":"bad request"}}\n\n' + # If drain loop runs, it will hang here + stall_reached = True + while True: + yield b"" + + if sync: + response = httpx.Response(200, content=body(), request=dummy_request) + stream = Stream(cast_to=object, client=client, response=response) + with pytest.raises(Exception, match="bad request"): + for _ in stream: + pass + assert not stall_reached + else: + + async def abody() -> AsyncIterator[bytes]: + nonlocal stall_reached + yield b'data: {"error":{"message":"bad request"}}\n\n' + stall_reached = True + while True: + yield b"" + + response = httpx.Response(200, content=abody(), request=dummy_request) + stream = AsyncStream(cast_to=object, client=async_client, response=response) + with pytest.raises(Exception, match="bad request"): + async for _ in stream: + pass + assert not stall_reached