TRACY-111: Parse providers' server-sent events and feed them to tracing adapters#227
TRACY-111: Parse providers' server-sent events and feed them to tracing adapters#227Vladislav0Art wants to merge 67 commits intomainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR updates Tracy’s tracing pipeline to correctly support Server-Sent Events (SSE) streaming responses by parsing event-stream chunks incrementally and feeding events into provider adapters, replacing the previous “concatenate all events into one string” approach.
Changes:
- Introduces an SSE parser (
SseParser) + tests, and routes streaming handling through per-event callbacks (SseEvent). - Refactors tracing adapter APIs (
LLMTracingAdapter,EndpointApiHandler) to support per-event streaming handling and span naming without request input. - Updates OkHttp/Ktor instrumentation to detect SSE via
text/event-streamand trace events as they arrive; updates OpenAI streaming handlers accordingly.
Reviewed changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/clients/OpenAIClient.kt | Updates imports after interceptor package move. |
| tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/images/ImagesCreateOpenAIApiEndpointHandler.kt | Migrates OpenAI image streaming handling to per-SSE-event processing. |
| tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/images/ImagesCreateEditOpenAIApiEndpointHandler.kt | Migrates OpenAI image edit streaming handling to per-SSE-event processing. |
| tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/ResponsesOpenAIApiEndpointHandler.kt | Reworks Responses API streaming to handle SSE events and parse final “completed” payload. |
| tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/OpenAIApiUtils.kt | Changes common response attribute helper to accept JsonObject directly. |
| tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/ChatCompletionsOpenAIApiEndpointHandler.kt | Reworks chat completion streaming to accumulate deltas per SSE event. |
| tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/OpenAILLMTracingAdapter.kt | Adapts OpenAI adapter to new span naming + per-event streaming API. |
| tracing/ktor/src/jvmMain/kotlin/org/jetbrains/ai/tracy/ktor/KtorProtocolAdapters.kt | Adjusts response view to carry TracyHttpResponseBody variants. |
| tracing/ktor/src/jvmMain/kotlin/org/jetbrains/ai/tracy/ktor/KtorHttpClient.kt | Detects SSE by content type and streams events into adapter via SseParser. |
| tracing/gemini/src/jvmMain/kotlin/org/jetbrains/ai/tracy/gemini/clients/GeminiClient.kt | Updates imports after interceptor package move. |
| tracing/gemini/src/jvmMain/kotlin/org/jetbrains/ai/tracy/gemini/adapters/handlers/GeminiImagenHandler.kt | Updates handler API to handleStreamingEvent (unsupported). |
| tracing/gemini/src/jvmMain/kotlin/org/jetbrains/ai/tracy/gemini/adapters/handlers/GeminiContentGenHandler.kt | Updates handler API to handleStreamingEvent (unsupported). |
| tracing/gemini/src/jvmMain/kotlin/org/jetbrains/ai/tracy/gemini/adapters/GeminiLLMTracingAdapter.kt | Adapts Gemini adapter to new span naming + per-event streaming API. |
| tracing/core/src/jvmTest/kotlin/org/jetbrains/ai/tracy/core/http/parsers/SseParserTest.kt | Adds test coverage for SSE parsing, including OpenAI examples. |
| tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/interceptors/OpenTelemetryOkHttpInterceptor.kt | Updates OkHttp interceptor to detect SSE responses and parse events incrementally. |
| tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/interceptors/Interceptors.kt | Moves interceptor utilities into core.interceptors package. |
| tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/http/protocol/TracyHttpResponse.kt | Adds EventStream and Empty response body variants. |
| tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/http/parsers/SseParser.kt | Adds spec-oriented SSE parser and SseEvent model. |
| tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/adapters/handlers/sse/SseHandling.kt | Adds standardized SSE handling failure Result + exception type. |
| tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/adapters/handlers/EndpointApiHandler.kt | Replaces handleStreaming with per-event handleStreamingEvent. |
| tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/adapters/LLMTracingAdapter.kt | Refactors adapter API for span naming and per-event SSE streaming registration. |
| tracing/anthropic/src/jvmTest/kotlin/org/jetbrains/ai/tracy/anthropic/AnthropicTracingTest.kt | Updates imports after interceptor package move. |
| tracing/anthropic/src/jvmMain/kotlin/org/jetbrains/ai/tracy/anthropic/clients/AnthropicAIClient.kt | Updates imports after interceptor package move. |
| tracing/anthropic/src/jvmMain/kotlin/org/jetbrains/ai/tracy/anthropic/adapters/AnthropicLLMTracingAdapter.kt | Adapts Anthropic adapter to new span naming + per-event streaming API (unsupported). |
| examples/src/main/kotlin/org/jetbrains/ai/tracy/examples/clients/OkHttpClientAutotracingExample.kt | Updates example imports after interceptor package move. |
Comments suppressed due to low confidence (1)
tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/interceptors/OpenTelemetryOkHttpInterceptor.kt:301
originalBodyWithTracedSSE.close()only ends the span and never closes the wrappedoriginalBody/ source. This can leak the underlying connection/resources and also leavesSseParsercleanup dependent on the source being closed elsewhere. Ensure the wrapper’sclose()closes the underlying body/source (and the parser) before ending the span (ideally by keeping a singleBufferedSourceinstance and closing that, or delegating tooriginalBody.close()).
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/adapters/LLMTracingAdapter.kt
Outdated
Show resolved
Hide resolved
tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/adapters/LLMTracingAdapter.kt
Outdated
Show resolved
Hide resolved
tracing/ktor/src/jvmMain/kotlin/org/jetbrains/ai/tracy/ktor/KtorHttpClient.kt
Outdated
Show resolved
Hide resolved
tracing/ktor/src/jvmMain/kotlin/org/jetbrains/ai/tracy/ktor/KtorHttpClient.kt
Outdated
Show resolved
Hide resolved
tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/http/parsers/SseParser.kt
Outdated
Show resolved
Hide resolved
tracing/ktor/src/jvmMain/kotlin/org/jetbrains/ai/tracy/ktor/KtorHttpClient.kt
Show resolved
Hide resolved
tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/adapters/LLMTracingAdapter.kt
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
let's change it to
body["temperature"]?.jsonPrimitive?.doubleOrNull?.let { span.setAttribute(GEN_AI_REQUEST_TEMPERATURE, it) }
and in similar places too?
There was a problem hiding this comment.
addressed in 8e44084.
NOTE: I modified the assignments present ONLY in this file, not everywhere.
tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/adapters/LLMTracingAdapter.kt
Outdated
Show resolved
Hide resolved
|
Also, please fix this: Agents.md |
| event: SseEvent, | ||
| index: Long | ||
| ): Result<Unit> { | ||
| return sseHandlingFailure("Unsupported") |
There was a problem hiding this comment.
will this throw the warning on every streaming event?
There was a problem hiding this comment.
Yes, it will. Should these unimplemented adapters be silenced instead?
There was a problem hiding this comment.
I think either once per trace if possible or mb silence, yes. WDYT?
@Vladislav0Art, @slawa4s, @agbragin
There was a problem hiding this comment.
I’d prefer logging it once per trace rather than silencing it
tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/adapters/LLMTracingAdapter.kt
Outdated
Show resolved
Hide resolved
2f1b8a8 to
6939d0b
Compare
addressed in 57c04b5. @georgiizorabov hi! I addressed your comments. Please, have a look. |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 26 out of 26 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (2)
tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/interceptors/OpenTelemetryOkHttpInterceptor.kt:300
withTracedSSE()overridesResponseBody.close()but never closes the underlyingoriginalBody/source. This can leak connections if the caller closes the response early (and it also preventsSseCapturingSource.close()/parser.close()from running). Ensureclose()closes the wrapped source/body (e.g., calloriginalBody.close()orsource().close()in afinallyblock) before ending the span.
tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/interceptors/OpenTelemetryOkHttpInterceptor.kt:385- SSE bytes are decoded to UTF-8 per
read()call viareadUtf8(bytesRead). If a multi-byte UTF-8 code point is split across reads, this will decode with replacement characters and corrupt the stream fed intoSseParser(potentially breaking JSON parsing for non-ASCII content). Consider using an incremental UTF-8 decoder with carry-over bytes between reads, or feed raw bytes into the parser and decode there with a streaming decoder.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/http/parsers/SseParser.kt
Show resolved
Hide resolved
| sseParser.feed( | ||
| input = buffer.decodeToString(0, bytesRead) | ||
| ) |
There was a problem hiding this comment.
buffer.decodeToString(0, bytesRead) decodes each read chunk independently. If a multi-byte UTF-8 sequence is split across reads, decoding will insert replacement characters and corrupt the SSE text fed into SseParser. Consider decoding with a streaming UTF-8 decoder (keeping trailing partial bytes between iterations) before calling sseParser.feed(...).
There was a problem hiding this comment.
@copilot apply changes based on this feedback
There was a problem hiding this comment.
Fixed in fd2636b. Replaced buffer.decodeToString(0, bytesRead) with a stateful CharsetDecoder (Charsets.UTF_8.newDecoder()) that uses ByteBuffer.compact() to carry over undecoded partial multi-byte sequences between reads. The byteBuffer has DEFAULT_BUFFER_SIZE + 3 capacity to absorb up to 3 leftover bytes from an incomplete 4-byte UTF-8 sequence, and charBuffer is sized identically (worst-case all-ASCII input produces as many chars as bytes).
|
Addressed |
| } | ||
|
|
||
| override fun getSpanName(request: TracyHttpRequest) = "Anthropic-generation" | ||
| override fun getSpanName() = "Anthropic-generation" |
There was a problem hiding this comment.
Could you, please, align span naming with GenAI conventions
Span names now follow {gen_ai.operation.name} {gen_ai.request.model}:
https://opentelemetry.io/docs/specs/semconv/gen-ai/anthropic/
There was a problem hiding this comment.
Addressed in 9fe5749
I didn't modify this "Anthropic-generation" name here because I have no information about the model and operation (both of which are only known at the stage of parsing the response body).
Notes:
- Now, it looks like:
See the trace.
- Pointed to naming problem in: TRACY-114 Audit attribute names used in Tracy to comply with GenAI Spec
| event: SseEvent, | ||
| index: Long | ||
| ): Result<Unit> { | ||
| return sseHandlingFailure("Unsupported") |
There was a problem hiding this comment.
I’d prefer logging it once per trace rather than silencing it
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 28 out of 28 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (6)
tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/interceptors/OpenTelemetryOkHttpInterceptor.kt:333
asResponseView()dereferencesresponse.bodywithout a safe call (response.body.contentType()), butbodyis nullable in OkHttp. This is a compile-time error and should be changed toresponse.body?.contentType()with appropriate handling when the body is absent.
tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/interceptors/OpenTelemetryOkHttpInterceptor.kt:298- The wrapped
ResponseBodyoverridesclose()but doesn't close the underlyingoriginalBody(nor theSseCapturingSource/ parser). This can leak the connection and also preventsSseParser.close()from flushing the final buffered event when the consumer closes the response without fully reading it. Delegate close tooriginalBody.close()(or the wrappedBufferedSource) and ensurespan.end()happens exactly once (e.g., in afinally).
tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/interceptors/OpenTelemetryOkHttpInterceptor.kt:384 readUtf8(bytesRead)decodes each read chunk independently. If a multi-byte UTF-8 sequence is split acrossSource.read()boundaries, this can introduce replacement characters / corrupted text before feeding it toSseParser. Consider using a statefulCharsetDecoder(similar to the Ktor implementation) or buffering trailing bytes between reads so SSE parsing remains correct for arbitrary chunking.
tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/adapters/LLMTracingAdapter.kt:66sseHandlingUnsupportedWarningPrintedis mutable adapter-wide state that is reset inregisterRequest. If a singleLLMTracingAdapterinstance is used by multiple concurrent requests (typical for a shared HTTP client), this becomes racy and can suppress/duplicate warnings across unrelated spans. Store the "warning already printed" flag on theSpan(e.g., a boolean attribute) or in a per-span structure instead of a shared field.
abstract class LLMTracingAdapter(private val genAISystem: String) {
private var sseHandlingUnsupportedWarningPrinted = false
fun registerRequest(span: Span, request: TracyHttpRequest): Unit = runCatching {
// new request -> new trace
sseHandlingUnsupportedWarningPrinted = false
// Pre-allocate in case the span reaches the limit
span.setAttribute(DROPPED_ATTRIBUTES_COUNT_ATTRIBUTE_KEY, 0L)
getRequestBodyAttributes(span, request)
span.setAttribute("gen_ai.api_base", "${request.url.scheme}://${request.url.host}")
tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/interceptors/OpenTelemetryOkHttpInterceptor.kt:14
JsonPrimitiveandKotlinLoggingimports appear to be unused in this file (JsonPrimitive only appears in KDoc). If the build enables "warnings as errors" / linting, this can fail CI; consider removing unused imports.
tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/interceptors/OpenTelemetryOkHttpInterceptor.kt:254response.bodyis nullable (seeval originalBody = originalResponse.body ?: ...below), but here it's dereferenced without a safe call. This will not compile and can also crash at runtime if a response has no body. Useresponse.body?.contentType()(and treat null as non-streaming).
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
tracing/ktor/src/jvmMain/kotlin/org/jetbrains/ai/tracy/ktor/KtorHttpClient.kt
Outdated
Show resolved
Hide resolved
| span: Span, | ||
| event: SseEvent, | ||
| index: Long, | ||
| ): Result<Unit> = runCatching { |
There was a problem hiding this comment.
Is there any point in outer catching?
There was a problem hiding this comment.
In case of any exceptions that I don't catch explicitly. For instance, .jsonArray throws if the JsonElement is not an array; Below, I don't treat this error explicitly.
Safety-wise, it's reasonable to have it.
| ): Result<Unit> = runCatching { | ||
| val data = runCatching { | ||
| Json.parseToJsonElement(event.data).jsonObject | ||
| }.getOrNull() ?: return@runCatching sseHandlingFailure("Cannot parse event data as JSON") |
There was a problem hiding this comment.
Please use direct return instead of return@runCatching. Also, sseHandlingFailure("Cannot parse event data as JSON") swallows the original exception. Use getOrElse { } (it is Throwable ) to preserve the
JsonDecodingException details.
| // peek at the bytes just written to sink | ||
| val text = sink.peek().apply { | ||
| skip(sink.size - bytesRead) | ||
| }.readUtf8(bytesRead) |
There was a problem hiding this comment.
readUtf8 is stateless. If it reads (multiple) one long UTF character (like emoji) it could be splitted across bytes, and symbol would be wrong. Please, use stateful decoder, like you did in Ktor
.../openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/OpenAILLMTracingAdapter.kt
Show resolved
Hide resolved
| * @param genAISystem The name of the GenAI system (e.g., "openai", "anthropic", "gemini") | ||
| */ | ||
| abstract class LLMTracingAdapter(private val genAISystem: String) { | ||
| private var sseHandlingUnsupportedWarningPrinted = false |
There was a problem hiding this comment.
Mutable state is shared across concurrent (possible) requests. SSE event index is stored in the span itself, so concurrent SSE streams on different spans are fine, but the sseHandlingUnsupportedWarningPrinted flag is adapter-level.
| byteBuffer.flip() | ||
| charBuffer.clear() | ||
|
|
||
| val endOfInput = originalBody.isClosedForRead |
There was a problem hiding this comment.
endOfInput is read from isClosedForRead inside the loop, but channel state can change between the loop condition check and this check. Mb pass false in utf8Decoder.decode and do final decode + flush?
There was a problem hiding this comment.
Addressed in both Ktor and OkHttp interceptors: 5687846
| // NOTE: we must first peek and only then await. | ||
| // otherwise there are cases when an empty body gets peeked | ||
| val peeked = response.rawContent.readBuffer.peek() | ||
| response.rawContent.awaitContent(Int.MAX_VALUE) |
There was a problem hiding this comment.
Let's document this ordering. It's non-obvious and relies on undocumented Ktor internals, so it'lt silently break if swapped.
There was a problem hiding this comment.
How exactly do you mean to document? I generated a descriptive comment in f0befc2
|
|
…E events & create `SseCapturingSource` once
NOTES:
1. `UTF8Decoder` is needed to correctly parse raw bytes into UTF-8 chunks.
2. `OkHttpResponseBody.source` MUST return the same instance on multiple invocations;
before, it was creating `SseCapturingSource` anew on each call.
|
Addressed in 0c9fe5e. |
|
@georgiizorabov Gosha, please also have a look, as the PR is quite big, so that not only Slawa reviews it. |
| tracingChannel.close(e) | ||
| } | ||
| } finally { | ||
| span.end() |
There was a problem hiding this comment.
wdyt about reordering these end() and close() calls? Im not sure if it really can create some problem (like close triggers adding an attribute to a closed span) but just in case + for consistency with okhttp?
There was a problem hiding this comment.
In this case, there is no effect regardless of the order. Still addressed in a010de8.
georgiizorabov
left a comment
There was a problem hiding this comment.
LGTM after fixing the comments
Motivation and Context
Closes
https://youtrack.jetbrains.com/issue/TRACY-111LLMTracingAdapter:getSpanNameno longer acceptsTracyHttpRequestinstance (it never used it)isStreamingRequestis removed: Ktor/OkHttp interceptors now decide whether the current response is of a streaming type by looking at its content type (should betext/event-stream)handleStreamingis removed: it handled a fully concatenated string of all events received from the server; removed in favor of per-event handling viaregisterResponseStreamEvent.registerResponseStreamEventis added: onepublicversion called by interceptors (implemented as factory method calling the same-named protected method); the other oneprotected, intended to be implemented by provider-specific adapters.EndpointApiHandler:handleStreamingreplaced withhandleStreamingEvent, which does per-event handling (see the rationale above)SseParserthat accepts UTF-8 decoded stream chunks and builds SSE events from them (implementation is SSE spec-compliant) + Wrote tests for the parser.SseParseris used in conjunction with a statefulUTF8Decoder(a wrapper over standardjava.nio.charset.CharsetDecoder) so that raw stream bytes are correctly converted to UTF8 strings and then passed to theSseParser; it is needed for correct handling of multi-byte UTF-8 sequences split across read boundaries.OpenAIApiUtils.setCommonResponseAttributes: now directly accepts response body asJsonObject(previously, acceptedTracyHttpResponse)SseParser.dispatchEvent()to resetretryValueon early return (emptydataBuffer), preventing aretry:field in a data-less event block from carrying over to the next event.Examples of traces with streaming requests:
Affects: #223
TODOs
TODO: test on a real server-sent-events stream (openai, anthropic, gemini - all event types)TODO: should be protected/internal? (aboutpopulateUnmappedAttributes)Test implementation on streaming tests: OpenAI, Gemini + manually investigate the traces structure.Manually send traces to Langfuse with SSE: see how it is rendered.ResponsesOpenAIApiEndpointHandlerTest.'test OpenAI responses API streaming'fails → fixedBreaking Changes
LLMTracingAdapter.isStreamingRequestremoved; streaming is now detected automatically from thetext/event-streamcontent type.LLMTracingAdapter.handleStreaming/EndpointApiHandler.handleStreamingremoved; replaced by per-eventregisterResponseStreamEvent/handleStreamingEvent.getSpanNameno longer receives aTracyHttpRequestparameter.