Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 95 additions & 16 deletions crates/harness-llm/src/openai.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,13 @@ impl LlmProvider for OpenAiProvider {
}
}

// Some gateways close the body without sending [DONE] — emit a
// Finish from whatever state we've accumulated.
if !acc.finished {
yield acc.finalise();
// Release the buffered terminal `Finish` (held back so a
// trailing usage-only chunk is surfaced first). This also
// covers gateways that close the body without ever sending a
// `finish_reason` — `flush` synthesises one from accumulated
// state in that case.
if let Some(finish) = acc.flush() {
yield finish;
}
};

Expand Down Expand Up @@ -776,6 +779,14 @@ struct StreamAccumulator {
tool_calls: Vec<ToolCallBuilder>,
finish_reason: Option<String>,
finished: bool,
/// The terminal `Finish` chunk, held back until the stream closes
/// (or a trailing usage-only chunk is ingested). OpenAI ships the
/// `usage` payload in a separate SSE chunk (`choices: []`) that
/// arrives *after* the `finish_reason` chunk; buffering `Finish`
/// here lets us emit `Usage` first, matching the other providers
/// and keeping the agent loop from breaking out before usage is
/// surfaced. See issue #48.
pending_finish: Option<LlmChunk>,
/// Concatenated `reasoning_content` fragments from the model.
/// Empty when the endpoint doesn't emit reasoning. Stamped onto
/// the final assistant message so subsequent turns can replay
Expand All @@ -796,9 +807,15 @@ impl StreamAccumulator {
fn ingest(&mut self, chunk: StreamChunk) -> Result<Vec<LlmChunk>> {
let mut out = Vec::new();
// Final usage chunks ship on their own, with `choices: []`.
// Surface them to the agent loop before we early-return.
// Surface them to the agent loop before we release any buffered
// `Finish` — OpenAI sends this usage-only chunk *after* the
// `finish_reason` chunk, so flushing the held `Finish` here
// guarantees `Usage` is emitted first.
if let Some(usage) = chunk.usage {
out.push(LlmChunk::Usage(usage.into_core()));
if let Some(finish) = self.pending_finish.take() {
out.push(finish);
}
}
let Some(choice) = chunk.choices.into_iter().next() else {
return Ok(out);
Expand Down Expand Up @@ -854,12 +871,29 @@ impl StreamAccumulator {

if let Some(fr) = choice.finish_reason {
self.finish_reason = Some(fr);
out.push(self.finalise());
// Hold the terminal `Finish` back rather than emitting it
// inline; it is released either when a trailing usage-only
// chunk arrives (above) or when the stream closes (`flush`).
self.pending_finish = Some(self.finalise());
}

Ok(out)
}

/// Emit any chunk still buffered when the stream closes: a held
/// `Finish` (the usual case — no trailing usage chunk arrived), or a
/// freshly synthesised one if the gateway closed the body without
/// ever sending a `finish_reason`.
fn flush(&mut self) -> Option<LlmChunk> {
if let Some(finish) = self.pending_finish.take() {
Some(finish)
} else if !self.finished {
Some(self.finalise())
} else {
None
}
}

fn finalise(&mut self) -> LlmChunk {
self.finished = true;

Expand Down Expand Up @@ -944,15 +978,18 @@ mod tests {
"choices": [{ "delta": {}, "finish_reason": "stop" }]
})))
.unwrap();
// The terminal Finish is buffered and released on stream close.
assert!(out3.is_empty());
let finish = acc.flush();

assert!(matches!(out1.as_slice(), [LlmChunk::ContentDelta(s)] if s == "Hel"));
assert!(matches!(out2.as_slice(), [LlmChunk::ContentDelta(s)] if s == "lo"));
match &out3[..] {
[LlmChunk::Finish {
match finish {
Some(LlmChunk::Finish {
message,
finish_reason,
response_id: _,
}] => {
}) => {
assert!(matches!(finish_reason, FinishReason::Stop));
match message {
Message::Assistant {
Expand Down Expand Up @@ -993,12 +1030,14 @@ mod tests {
"choices": [{ "delta": {}, "finish_reason": "stop" }]
})))
.unwrap();
assert!(out4.is_empty());
let finish = acc.flush();

assert!(matches!(out1.as_slice(), [LlmChunk::ContentDelta(s)] if s == "Hel"));
assert!(matches!(out2.as_slice(), [LlmChunk::ContentDelta(s)] if s == "lo"));
assert!(matches!(out3.as_slice(), [LlmChunk::ContentDelta(s)] if s == " world"));
match &out4[..] {
[LlmChunk::Finish { message, .. }] => match message {
match finish {
Some(LlmChunk::Finish { message, .. }) => match message {
Message::Assistant { content, .. } => {
assert_eq!(content.as_deref(), Some("Hello world"));
}
Expand Down Expand Up @@ -1039,13 +1078,15 @@ mod tests {
"choices": [{ "delta": {}, "finish_reason": "tool_calls" }]
})))
.unwrap();
assert!(out.is_empty());
let finish = acc.flush();

match &out[..] {
[LlmChunk::Finish {
match finish {
Some(LlmChunk::Finish {
message,
finish_reason,
response_id: _,
}] => {
}) => {
assert!(matches!(finish_reason, FinishReason::ToolCalls));
match message {
Message::Assistant { tool_calls, .. } => {
Expand Down Expand Up @@ -1276,8 +1317,9 @@ mod tests {
"choices": [{ "delta": {}, "finish_reason": "tool_calls" }]
})))
.unwrap();
match &final_out[..] {
[LlmChunk::Finish { message, .. }] => match message {
assert!(final_out.is_empty());
match acc.flush() {
Some(LlmChunk::Finish { message, .. }) => match message {
Message::Assistant { tool_calls, .. } => {
assert_eq!(tool_calls[0].name, "fs.read");
}
Expand All @@ -1286,4 +1328,41 @@ mod tests {
other => panic!("unexpected: {other:?}"),
}
}

#[test]
fn usage_emitted_before_finish_when_usage_trails() {
// OpenAI ships the `finish_reason` chunk first, then a separate
// usage-only chunk (`choices: []`). The accumulator must surface
// `Usage` ahead of the terminal `Finish` so the agent loop, which
// breaks on `Finish`, doesn't drop the usage payload. (Issue #48.)
let mut acc = StreamAccumulator::default();

let finish_chunk = acc
.ingest(parse_chunk(json!({
"choices": [{ "delta": { "content": "hi" }, "finish_reason": "stop" }]
})))
.unwrap();
// Content delta is emitted; Finish is buffered, not yet released.
assert!(matches!(finish_chunk.as_slice(), [LlmChunk::ContentDelta(s)] if s == "hi"));

let usage_chunk = acc
.ingest(parse_chunk(json!({
"choices": [],
"usage": { "prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15 }
})))
.unwrap();

// Usage comes first, then the released Finish — in that order.
match usage_chunk.as_slice() {
[LlmChunk::Usage(usage), LlmChunk::Finish { finish_reason, .. }] => {
assert_eq!(usage.prompt_tokens, Some(10));
assert_eq!(usage.completion_tokens, Some(5));
assert!(matches!(finish_reason, FinishReason::Stop));
}
other => panic!("unexpected: {other:?}"),
}

// Nothing left to flush — the Finish was already released.
assert!(acc.flush().is_none());
}
}
Loading