From 3c0dc5b8e0ecce6c7de77caf2225c7c956ecfb9d Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 30 May 2026 01:29:08 +0000 Subject: [PATCH] fix(harness-llm): surface OpenAI streaming usage before Finish OpenAI Chat Completions streaming ships the token-usage payload in a separate SSE chunk (`choices: []`) that arrives *after* the `finish_reason` chunk. The agent loop breaks out of the stream the moment it sees `Finish`, so the trailing `Usage` chunk was never consumed and usage accounting read zero on the default provider. Buffer the terminal `Finish` in `StreamAccumulator` and release it either when the trailing usage-only chunk is ingested (emitting `Usage` first) or when the stream closes. This matches the ordering the other three providers already produce. Adds a regression test and updates the existing accumulator tests to flush the buffered Finish on close. Closes #48 --- crates/harness-llm/src/openai.rs | 111 ++++++++++++++++++++++++++----- 1 file changed, 95 insertions(+), 16 deletions(-) diff --git a/crates/harness-llm/src/openai.rs b/crates/harness-llm/src/openai.rs index 93e6a25..2062eb5 100644 --- a/crates/harness-llm/src/openai.rs +++ b/crates/harness-llm/src/openai.rs @@ -303,10 +303,13 @@ impl LlmProvider for OpenAiProvider { } } - // Some gateways close the body without sending [DONE] — emit a - // Finish from whatever state we've accumulated. - if !acc.finished { - yield acc.finalise(); + // Release the buffered terminal `Finish` (held back so a + // trailing usage-only chunk is surfaced first). This also + // covers gateways that close the body without ever sending a + // `finish_reason` — `flush` synthesises one from accumulated + // state in that case. + if let Some(finish) = acc.flush() { + yield finish; } }; @@ -776,6 +779,14 @@ struct StreamAccumulator { tool_calls: Vec, finish_reason: Option, finished: bool, + /// The terminal `Finish` chunk, held back until the stream closes + /// (or a trailing usage-only chunk is ingested). OpenAI ships the + /// `usage` payload in a separate SSE chunk (`choices: []`) that + /// arrives *after* the `finish_reason` chunk; buffering `Finish` + /// here lets us emit `Usage` first, matching the other providers + /// and keeping the agent loop from breaking out before usage is + /// surfaced. See issue #48. + pending_finish: Option, /// Concatenated `reasoning_content` fragments from the model. /// Empty when the endpoint doesn't emit reasoning. Stamped onto /// the final assistant message so subsequent turns can replay @@ -796,9 +807,15 @@ impl StreamAccumulator { fn ingest(&mut self, chunk: StreamChunk) -> Result> { let mut out = Vec::new(); // Final usage chunks ship on their own, with `choices: []`. - // Surface them to the agent loop before we early-return. + // Surface them to the agent loop before we release any buffered + // `Finish` — OpenAI sends this usage-only chunk *after* the + // `finish_reason` chunk, so flushing the held `Finish` here + // guarantees `Usage` is emitted first. if let Some(usage) = chunk.usage { out.push(LlmChunk::Usage(usage.into_core())); + if let Some(finish) = self.pending_finish.take() { + out.push(finish); + } } let Some(choice) = chunk.choices.into_iter().next() else { return Ok(out); @@ -854,12 +871,29 @@ impl StreamAccumulator { if let Some(fr) = choice.finish_reason { self.finish_reason = Some(fr); - out.push(self.finalise()); + // Hold the terminal `Finish` back rather than emitting it + // inline; it is released either when a trailing usage-only + // chunk arrives (above) or when the stream closes (`flush`). + self.pending_finish = Some(self.finalise()); } Ok(out) } + /// Emit any chunk still buffered when the stream closes: a held + /// `Finish` (the usual case — no trailing usage chunk arrived), or a + /// freshly synthesised one if the gateway closed the body without + /// ever sending a `finish_reason`. + fn flush(&mut self) -> Option { + if let Some(finish) = self.pending_finish.take() { + Some(finish) + } else if !self.finished { + Some(self.finalise()) + } else { + None + } + } + fn finalise(&mut self) -> LlmChunk { self.finished = true; @@ -944,15 +978,18 @@ mod tests { "choices": [{ "delta": {}, "finish_reason": "stop" }] }))) .unwrap(); + // The terminal Finish is buffered and released on stream close. + assert!(out3.is_empty()); + let finish = acc.flush(); assert!(matches!(out1.as_slice(), [LlmChunk::ContentDelta(s)] if s == "Hel")); assert!(matches!(out2.as_slice(), [LlmChunk::ContentDelta(s)] if s == "lo")); - match &out3[..] { - [LlmChunk::Finish { + match finish { + Some(LlmChunk::Finish { message, finish_reason, response_id: _, - }] => { + }) => { assert!(matches!(finish_reason, FinishReason::Stop)); match message { Message::Assistant { @@ -993,12 +1030,14 @@ mod tests { "choices": [{ "delta": {}, "finish_reason": "stop" }] }))) .unwrap(); + assert!(out4.is_empty()); + let finish = acc.flush(); assert!(matches!(out1.as_slice(), [LlmChunk::ContentDelta(s)] if s == "Hel")); assert!(matches!(out2.as_slice(), [LlmChunk::ContentDelta(s)] if s == "lo")); assert!(matches!(out3.as_slice(), [LlmChunk::ContentDelta(s)] if s == " world")); - match &out4[..] { - [LlmChunk::Finish { message, .. }] => match message { + match finish { + Some(LlmChunk::Finish { message, .. }) => match message { Message::Assistant { content, .. } => { assert_eq!(content.as_deref(), Some("Hello world")); } @@ -1039,13 +1078,15 @@ mod tests { "choices": [{ "delta": {}, "finish_reason": "tool_calls" }] }))) .unwrap(); + assert!(out.is_empty()); + let finish = acc.flush(); - match &out[..] { - [LlmChunk::Finish { + match finish { + Some(LlmChunk::Finish { message, finish_reason, response_id: _, - }] => { + }) => { assert!(matches!(finish_reason, FinishReason::ToolCalls)); match message { Message::Assistant { tool_calls, .. } => { @@ -1276,8 +1317,9 @@ mod tests { "choices": [{ "delta": {}, "finish_reason": "tool_calls" }] }))) .unwrap(); - match &final_out[..] { - [LlmChunk::Finish { message, .. }] => match message { + assert!(final_out.is_empty()); + match acc.flush() { + Some(LlmChunk::Finish { message, .. }) => match message { Message::Assistant { tool_calls, .. } => { assert_eq!(tool_calls[0].name, "fs.read"); } @@ -1286,4 +1328,41 @@ mod tests { other => panic!("unexpected: {other:?}"), } } + + #[test] + fn usage_emitted_before_finish_when_usage_trails() { + // OpenAI ships the `finish_reason` chunk first, then a separate + // usage-only chunk (`choices: []`). The accumulator must surface + // `Usage` ahead of the terminal `Finish` so the agent loop, which + // breaks on `Finish`, doesn't drop the usage payload. (Issue #48.) + let mut acc = StreamAccumulator::default(); + + let finish_chunk = acc + .ingest(parse_chunk(json!({ + "choices": [{ "delta": { "content": "hi" }, "finish_reason": "stop" }] + }))) + .unwrap(); + // Content delta is emitted; Finish is buffered, not yet released. + assert!(matches!(finish_chunk.as_slice(), [LlmChunk::ContentDelta(s)] if s == "hi")); + + let usage_chunk = acc + .ingest(parse_chunk(json!({ + "choices": [], + "usage": { "prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15 } + }))) + .unwrap(); + + // Usage comes first, then the released Finish — in that order. + match usage_chunk.as_slice() { + [LlmChunk::Usage(usage), LlmChunk::Finish { finish_reason, .. }] => { + assert_eq!(usage.prompt_tokens, Some(10)); + assert_eq!(usage.completion_tokens, Some(5)); + assert!(matches!(finish_reason, FinishReason::Stop)); + } + other => panic!("unexpected: {other:?}"), + } + + // Nothing left to flush — the Finish was already released. + assert!(acc.flush().is_none()); + } }