From 75b895d3d4a00e73de93489105bc4c253035dc49 Mon Sep 17 00:00:00 2001 From: C1-BA-B1-F3 Date: Fri, 26 Jun 2026 07:20:59 +0800 Subject: [PATCH] fix: merge duplicate-index entries in streaming delta accumulator When the first streamed chunk contains multiple tool_calls entries with the same (e.g. id/name + start of arguments, both at index 0), the accumulator stored them as separate list entries. Subsequent chunks only merged into the first one at that position, leaving the duplicate stranded with incomplete arguments. Fix: add that merges list entries sharing the same integer key before first storage. Applied to both (chat completions) and (assistants API). Fixes #3201 --- src/openai/lib/streaming/_assistants.py | 29 ++++- src/openai/lib/streaming/_deltas.py | 29 ++++- tests/test_accumulate_delta.py | 135 ++++++++++++++++++++++++ 3 files changed, 191 insertions(+), 2 deletions(-) create mode 100644 tests/test_accumulate_delta.py diff --git a/src/openai/lib/streaming/_assistants.py b/src/openai/lib/streaming/_assistants.py index 6efb3ca3f1..385575a387 100644 --- a/src/openai/lib/streaming/_assistants.py +++ b/src/openai/lib/streaming/_assistants.py @@ -977,10 +977,37 @@ def accumulate_event( return current_message_snapshot, new_content +def _normalize_indexed_list(items: list[object]) -> list[object]: + """Merge list entries that share the same `index` key. + + Some providers send multiple delta entries with the same index in a single + chunk (e.g. first tool_call chunk contains both the id/name AND the start + of arguments, both at index 0). Without merging, the second entry is + stranded and never accumulated into. + """ + by_index: dict[int, dict[object, object]] = {} + order: list[int] = [] + for item in items: + if not is_dict(item): + return items # non-dict list → nothing to normalise + idx = item.get("index") # type: ignore[union-attr] + if not isinstance(idx, int): + return items # no integer index → nothing to normalise + if idx not in by_index: + by_index[idx] = item # type: ignore[assignment] + order.append(idx) + else: + by_index[idx] = accumulate_delta(by_index[idx], item) # type: ignore[arg-type] + return [by_index[i] for i in order] # type: ignore[misc] + + def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) -> dict[object, object]: for key, delta_value in delta.items(): if key not in acc: - acc[key] = delta_value + if is_list(delta_value) and delta_value: + acc[key] = _normalize_indexed_list(delta_value) + else: + acc[key] = delta_value continue acc_value = acc[key] diff --git a/src/openai/lib/streaming/_deltas.py b/src/openai/lib/streaming/_deltas.py index a5e1317612..cec24a7f42 100644 --- a/src/openai/lib/streaming/_deltas.py +++ b/src/openai/lib/streaming/_deltas.py @@ -3,10 +3,37 @@ from ..._utils import is_dict, is_list +def _normalize_indexed_list(items: list[object]) -> list[object]: + """Merge list entries that share the same `index` key. + + Some providers send multiple delta entries with the same index in a single + chunk (e.g. first tool_call chunk contains both the id/name AND the start + of arguments, both at index 0). Without merging, the second entry is + stranded and never accumulated into. + """ + by_index: dict[int, dict[object, object]] = {} + order: list[int] = [] + for item in items: + if not is_dict(item): + return items # non-dict list → nothing to normalise + idx = item.get("index") # type: ignore[union-attr] + if not isinstance(idx, int): + return items # no integer index → nothing to normalise + if idx not in by_index: + by_index[idx] = item # type: ignore[assignment] + order.append(idx) + else: + by_index[idx] = accumulate_delta(by_index[idx], item) # type: ignore[arg-type] + return [by_index[i] for i in order] # type: ignore[misc] + + def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) -> dict[object, object]: for key, delta_value in delta.items(): if key not in acc: - acc[key] = delta_value + if is_list(delta_value) and delta_value: + acc[key] = _normalize_indexed_list(delta_value) + else: + acc[key] = delta_value continue acc_value = acc[key] diff --git a/tests/test_accumulate_delta.py b/tests/test_accumulate_delta.py new file mode 100644 index 0000000000..b6112a962a --- /dev/null +++ b/tests/test_accumulate_delta.py @@ -0,0 +1,135 @@ +"""Tests for accumulate_delta handling of duplicate-index entries. + +Issue #3201: when the first streamed chunk contains multiple tool_calls entries +with the same `index`, the accumulator stores them as separate list entries. +Subsequent chunks only merge into the first one, leaving the second stranded. +""" + +from __future__ import annotations + +from openai.lib.streaming._deltas import accumulate_delta + + +def test_single_index_in_first_chunk(): + """Normal case: one entry per index in the first chunk.""" + acc: dict[object, object] = {} + # First chunk: one tool_call at index 0 + acc = accumulate_delta(acc, { + "tool_calls": [ + {"index": 0, "id": "call_1", "function": {"name": "list_files"}, "type": "function"}, + ] + }) + # Second chunk: arguments for index 0 + acc = accumulate_delta(acc, { + "tool_calls": [ + {"index": 0, "function": {"arguments": ' {"path": "."}'}}, + ] + }) + + tool_calls = acc["tool_calls"] + assert isinstance(tool_calls, list) + assert len(tool_calls) == 1 + assert tool_calls[0]["id"] == "call_1" + assert tool_calls[0]["function"]["name"] == "list_files" + assert tool_calls[0]["function"]["arguments"] == ' {"path": "."}' + + +def test_duplicate_index_in_first_chunk(): + """Issue #3201: first chunk has two entries with the same index.""" + acc: dict[object, object] = {} + # First chunk: two entries both at index 0 (id+name, then start of arguments) + acc = accumulate_delta(acc, { + "tool_calls": [ + {"index": 0, "id": "call_1", "function": {"name": "list_files"}, "type": "function"}, + {"index": 0, "function": {"arguments": ' {"'}}, + ] + }) + # Second chunk: rest of arguments for index 0 + acc = accumulate_delta(acc, { + "tool_calls": [ + {"index": 0, "function": {"arguments": 'path": "."}'}}, + ] + }) + + tool_calls = acc["tool_calls"] + assert isinstance(tool_calls, list) + assert len(tool_calls) == 1, f"Expected 1 tool_call, got {len(tool_calls)}: {tool_calls}" + assert tool_calls[0]["id"] == "call_1" + assert tool_calls[0]["function"]["name"] == "list_files" + assert tool_calls[0]["function"]["arguments"] == ' {"path": "."}' + + +def test_duplicate_index_preserves_type_and_id(): + """Ensure merged entry keeps id and type from the first occurrence.""" + acc: dict[object, object] = {} + acc = accumulate_delta(acc, { + "tool_calls": [ + {"index": 0, "id": "call_abc", "type": "function", "function": {"name": "search"}}, + {"index": 0, "function": {"arguments": '{"q":'}}, + ] + }) + + tool_calls = acc["tool_calls"] + assert len(tool_calls) == 1 + assert tool_calls[0]["id"] == "call_abc" + assert tool_calls[0]["type"] == "function" + assert tool_calls[0]["function"]["name"] == "search" + assert tool_calls[0]["function"]["arguments"] == '{"q":' + + +def test_multiple_distinct_indices_in_first_chunk(): + """Multiple entries with different indices should stay separate.""" + acc: dict[object, object] = {} + acc = accumulate_delta(acc, { + "tool_calls": [ + {"index": 0, "id": "call_1", "function": {"name": "fn_a"}, "type": "function"}, + {"index": 1, "id": "call_2", "function": {"name": "fn_b"}, "type": "function"}, + ] + }) + + tool_calls = acc["tool_calls"] + assert isinstance(tool_calls, list) + assert len(tool_calls) == 2 + assert tool_calls[0]["id"] == "call_1" + assert tool_calls[1]["id"] == "call_2" + + +def test_multiple_indices_with_duplicates_in_first_chunk(): + """Mix of duplicate and distinct indices.""" + acc: dict[object, object] = {} + acc = accumulate_delta(acc, { + "tool_calls": [ + {"index": 0, "id": "call_1", "function": {"name": "fn_a"}, "type": "function"}, + {"index": 0, "function": {"arguments": '{"a":'}}, + {"index": 1, "id": "call_2", "function": {"name": "fn_b"}, "type": "function"}, + {"index": 1, "function": {"arguments": '{"b":'}}, + ] + }) + # More args for both + acc = accumulate_delta(acc, { + "tool_calls": [ + {"index": 0, "function": {"arguments": '"x"}'}}, + {"index": 1, "function": {"arguments": '"y"}'}}, + ] + }) + + tool_calls = acc["tool_calls"] + assert isinstance(tool_calls, list) + assert len(tool_calls) == 2 + assert tool_calls[0]["id"] == "call_1" + assert tool_calls[0]["function"]["arguments"] == '{"a":"x"}' + assert tool_calls[1]["id"] == "call_2" + assert tool_calls[1]["function"]["arguments"] == '{"b":"y"}' + + +def test_non_indexed_lists_unchanged(): + """Lists without integer `index` fields should pass through normally.""" + acc: dict[object, object] = {} + acc = accumulate_delta(acc, { + "content": ["hello", "world"] + }) + acc = accumulate_delta(acc, { + "content": ["!"] + }) + + assert acc["content"] == ["hello", "world", "!"]