TRACY-28 Support streaming for Anthropic and Gemini#223
TRACY-28 Support streaming for Anthropic and Gemini#223georgiizorabov wants to merge 7 commits intomainfrom
Conversation
…removal and simplify `registerResponse` logic
…les, finish reasons, and usage attributes; updated validation logic accordingly
…ed streaming examples for OpenAI, Anthropic, and Gemini APIs
9723701 to
20caf2d
Compare
…r maintaining consistency
Vladislav0Art
left a comment
There was a problem hiding this comment.
Could you write the description of the streaming workflow in handleStreaming functions? It's multistep everywhere and requires quite complex parsing.
For documentation purposes, it's valuable to know what's going on in the streaming API of each provider.
| override fun isStreamingRequest(request: TracyHttpRequest): Boolean { | ||
| val body = request.body.asJson()?.jsonObject ?: return false | ||
| return body["stream"]?.jsonPrimitive?.booleanOrNull == true | ||
| } | ||
|
|
||
| override fun handleStreaming(span: Span, url: TracyHttpUrl, events: String): Unit = runCatching { |
There was a problem hiding this comment.
It would be nice to link the docs page about Anthropic streaming API.
| val event = runCatching { | ||
| Json.parseToJsonElement(data).jsonObject | ||
| }.getOrNull() ?: continue |
There was a problem hiding this comment.
nitpick: log a warning that an event was unparsable?
| message["usage"]?.jsonObject?.get("input_tokens")?.jsonPrimitive?.intOrNull?.let { | ||
| span.setAttribute(GEN_AI_USAGE_INPUT_TOKENS, it) | ||
| } |
There was a problem hiding this comment.
is this the only field of the usage object?
nitpick: I'd place the usage object into val usage before accessing its fields. Otherwise, right now, it looks clumsy.
| val index = event["index"]?.jsonPrimitive?.intOrNull ?: continue | ||
| val block = event["content_block"]?.jsonObject ?: continue | ||
| when (block["type"]?.jsonPrimitive?.content) { | ||
| "text" -> textBlocks[index] = StringBuilder() |
There was a problem hiding this comment.
note: it's kinda redundant to set it here it we do so anyway in "content_block_delta", but I think it's fine.
| for (index in (textBlocks.keys + toolCallBlocks.keys).sorted()) { | ||
| textBlocks[index]?.let { | ||
| span.setAttribute("gen_ai.completion.$index.type", "text") | ||
| span.setAttribute("gen_ai.completion.$index.content", it.toString().orRedactedOutput()) | ||
| } | ||
| toolCallBlocks[index]?.let { tc -> | ||
| span.setAttribute("gen_ai.completion.$index.type", "tool_use") | ||
| tc.id?.let { span.setAttribute("gen_ai.completion.$index.tool.call.id", it) } | ||
| tc.type?.let { span.setAttribute("gen_ai.completion.$index.tool.call.type", it) } | ||
| tc.name?.let { span.setAttribute("gen_ai.completion.$index.tool.name", it.orRedactedOutput()) } | ||
| if (tc.arguments.isNotEmpty()) { | ||
| span.setAttribute("gen_ai.completion.$index.tool.arguments", tc.arguments.toString().orRedactedOutput()) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Is this necessary to iterate over the blocks this way?
The span is something that doesn't expose any information on whether it has an order of the populated attributes. So, I guess it's no different from doing it in two loops:
for ((index, block) in textBlocks) {
span.setAttribute("gen_ai.completion.$index.type", "text")
span.setAttribute("gen_ai.completion.$index.content", block.toString().orRedactedOutput())
}
for ((index, block) in toolCallBlocks) {
span.setAttribute("gen_ai.completion.$index.type", "tool_use")
block.id?.let { span.setAttribute("gen_ai.completion.$index.tool.call.id", it) }
block.type?.let { span.setAttribute("gen_ai.completion.$index.tool.call.type", it) }
block.name?.let { span.setAttribute("gen_ai.completion.$index.tool.name", it.orRedactedOutput()) }
if (block.arguments.isNotEmpty()) {
span.setAttribute("gen_ai.completion.$index.tool.arguments", block.arguments.toString().orRedactedOutput())
}
}It visually separates the assignment and is simply less noisy. DWYT?
| span.setAttribute("gen_ai.completion.$index.content", content.toString().orRedacted(kind)) | ||
| } | ||
| for ((index, role) in roles) { | ||
| span.setAttribute("gen_ai.completion.$index.role", role) | ||
| } | ||
| for ((index, reason) in finishReasons) { | ||
| span.setAttribute("gen_ai.completion.$index.finish_reason", reason) |
There was a problem hiding this comment.
These 3 properties relate to each other. Is it possible to unite them into a single data class and have a single map of this data class instead of three for each property?
| for ((choiceIndex, tcMap) in toolCalls) { | ||
| for ((tcIndex, acc) in tcMap) { |
| val tcIndex = toolCallCounters.getOrPut(outputIndex) { 0 } | ||
| toolCallCounters[outputIndex] = tcIndex + 1 |
There was a problem hiding this comment.
To make sure:
Here and in other places: Are you sure you need to set tcIndex and not tcIndex + 1 (i.e., the newly assigned value of toolCallCounters[outputIndex]) below?
| } | ||
|
|
||
| "function_call" -> { | ||
| val tcIndex = toolCallCounters.getOrPut(outputIndex) { 0 } |
There was a problem hiding this comment.
tcIndex -> toolCallIndex
| for (line in events.lineSequence()) { | ||
| if (!line.startsWith("data:")) continue | ||
| val data = line.removePrefix("data:").trim() |
There was a problem hiding this comment.
Some implementations check for the event.type in when-cases. Why exactly does this implementation differ?
|
@georgiizorabov Could you please attach screenshots how tracing looks in langfuse with streaming enabled? For easy check you could checkout to tracing branch in |
Motivation and Context
Closes
TRACY-28Type of the changes
General checklist
mainas the base branchDevelopment checklist
Other information
Added streaming examples, improved
AGENTS.md, fixed OpenAI streaming