Skip to content

feat(nodes): add LLM token streaming support via SSE#545

Draft
nihalnihalani wants to merge 5 commits intorocketride-org:developfrom
nihalnihalani:feature/llm-token-streaming
Draft

feat(nodes): add LLM token streaming support via SSE#545
nihalnihalani wants to merge 5 commits intorocketride-org:developfrom
nihalnihalani:feature/llm-token-streaming

Conversation

@nihalnihalani
Copy link
Copy Markdown
Contributor

@nihalnihalani nihalnihalani commented Mar 30, 2026

#Hack-with-bay-2

Contribution Type

Feature — Token-by-token LLM streaming via existing SSE infrastructure

Problem / Use Case

RocketRide's 13 LLM nodes use blocking API calls — the entire response must complete before the user sees anything. For long responses (30+ seconds), this creates a poor UX. The engine already has SSE infrastructure (`monitorSSE()` / `sendSSE()`) for agent "thinking" events, but LLM nodes don't use it for token streaming.

Proposed Solution

A `StreamingHandler` utility that LLM nodes can opt into:

  • `stream_response(chat_fn, question)` — calls provider with `stream=True`, iterates chunks, emits SSE tokens
  • Per-provider chunk extraction for all 8 streaming-capable providers (OpenAI, Anthropic, Gemini, Mistral, DeepSeek, xAI, Perplexity, Ollama)
  • SSE events: `stream_start`, `token` (per chunk), `stream_end` (with stats)
  • Token counting from stream metadata (OpenAI-style + Anthropic-style)
  • Graceful fallback: non-capable providers and streaming errors transparently fall back to blocking calls
  • Opt-in: Individual LLM nodes enable via `streaming: true` in config — no existing behavior changed

Why This Feature Fits This Codebase

The SSE infrastructure already exists in `rocketlib/engine.py` (`monitorSSE()` at line 110, `sendSSE()` on filter instances). The agent node at `nodes/src/nodes/agent_rocketride/rocketride_agent.py` already emits SSE events for "thinking" updates. This PR extends the same pattern to LLM token output.

The `StreamingHandler` is a standalone utility module — it does NOT modify `IInstance.py` or any existing LLM node. Individual nodes opt in by importing and calling `handler.stream_response()` instead of their blocking `chat()` call. This is the safest integration pattern.

Provider chunk extraction uses `getattr` with defaults throughout — no `KeyError` or `AttributeError` possible, even with unexpected chunk formats from future API versions.

Validation

```
58 passed
ruff check: 0 errors
ruff format: all files unchanged
```
58 tests: config parsing (17), handler init (4), chunk extraction for all 8 providers (20), SSE emission (4), full streaming flow (8), token counting (5), stream interruption recovery (1).

How This Could Be Extended

  • Enable streaming by default in LLM nodes' `services.json` profiles
  • Add streaming support to the VS Code extension's Chat UI
  • Track streaming latency metrics (time-to-first-token) via Prometheus
  • Add backpressure handling for slow clients

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features
    • Real-time token-by-token streaming of LLM responses via SSE for supported providers, with live token events and a final accumulated answer.
    • Automatic fallback to standard (non-streaming) responses when streaming is unavailable or if streaming fails.
    • Provider detection and opt-in streaming controls to enable/disable streaming behavior.
  • Tests
    • Comprehensive tests covering streaming behavior, fallbacks, provider detection, SSE events, and token accounting.

Implement a streaming layer that enables token-by-token output from LLM
providers through the engine's existing SSE transport. The new modules
are opt-in utilities that individual LLM nodes can import without any
changes to the base IInstance class.

- streaming_config.py: provider capability detection, config parsing
- streaming.py: StreamingHandler with per-provider chunk extraction,
  SSE event emission (stream_start/token/stream_end), token counting,
  and automatic fallback to non-streaming on error
- Supports OpenAI, Anthropic, Gemini, Mistral, DeepSeek, XAI,
  Perplexity, and Ollama chunk formats
- 58 tests covering all provider formats, SSE emission, error
  fallback, stream interruption, and edge cases

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@github-actions github-actions Bot added the module:nodes Python pipeline nodes label Mar 30, 2026
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Mar 30, 2026

📝 Walkthrough
🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 47.78% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title accurately and concisely describes the main change: adding LLM token streaming support via SSE, which is the core feature introduced across the three new modules.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@nodes/src/nodes/llm_base/streaming_config.py`:
- Around line 91-93: The fallback branch that returns logical_type[4:] is
unreachable and should be removed; edit the code in streaming_config.py to
delete the final fallback check for if logical_type.startswith('llm_') and its
return, leaving the loop that inspects split('.') segments to determine the LLM
subtype (referencing the logical_type variable and the existing segment-checking
loop), so control flow no longer contains dead code.

In `@nodes/src/nodes/llm_base/streaming.py`:
- Around line 250-263: In the p == 'mistral' branch the variable named choices
is reusing chunk.data which causes naming confusion and makes the subsequent
checks unclear; rename that local to data (e.g., data = getattr(chunk, 'data',
{})) and then check data.choices for the old SDK shape and keep the existing
fallback that inspects chunk.choices for the newer SDK shape, returning
getattr(delta, 'content', '') or '' in both cases; update all references in that
branch from choices (when it refers to chunk.data) to data to avoid ambiguity.
- Around line 175-187: The per-chunk increment of output_tokens in the streaming
loop is intended as a fallback and can be overwritten by provider metadata
returned in _update_token_counts; add a concise inline comment above or next to
the output_tokens += 1 explaining that this is a rough per-chunk estimate used
only when providers do not supply usage metadata (which may only appear in the
final chunk), and reference the surrounding logic that calls
_extract_chunk_text, _emit_token and then _update_token_counts so maintainers
understand the overwrite behavior.
- Around line 304-309: The try/except around self._instance.instance.sendSSE
currently swallows all exceptions; preserve the swallow but add a DEBUG-level
log with the exception details to aid troubleshooting. In the except Exception:
block call the module or instance logger (e.g., self._logger or a module-level
logger) to emit a debug message like "SSE delivery failed" including the
exception/traceback (exc_info=True or similar) so delivery remains best-effort
but failures are recorded at DEBUG level. Ensure you do not change the control
flow (still pass for production) and only add the debug logging.
- Around line 224-232: The code confuses two different meanings of "delta" by
assigning chunk.choices to a variable named delta then later reading
choice.delta; update the variable naming to be explicit (e.g., rename the
variable currently called delta to choices or choice_list in the block inside
the function/method in streaming.py) and adjust subsequent uses (check
len(choices) > 0, set choice = choices[0], then read d = getattr(choice,
'delta', None) and content = getattr(d, 'content', None)) so behavior is
unchanged but variable roles are clear.
- Around line 63-64: The Protocol method uses a parameter named type which
shadows the Python builtin; rename the parameter in the _Instance Protocol from
sendSSE(self, type: str, **data) to sendSSE(self, event_type: str, **data) and
update any callers/implementations to use event_type to match the existing
_send_sse(event_type=...) convention (ensure function name sendSSE and the
_Instance Protocol remain otherwise unchanged and keep **data forwarding
intact).

In `@test/nodes/test_llm_streaming.py`:
- Around line 386-392: Rename the unpacked but unused variable mock_sse to a
prefixed underscore name (e.g., _mock_sse) in the test functions such as
test_empty_chunks_produce_empty_answer (and the other test cases referenced
around lines 440 and 549) so it follows Python convention for intentionally
unused variables; locate the _make_instance() call that returns (inst, mock_sse)
and update the second tuple target to _mock_sse (or _) in each affected test
function (e.g., test_empty_chunks_produce_empty_answer) to indicate the value is
intentionally ignored.
- Around line 586-600: FakeAnswer.setAnswer diverges from the real
Answer.setAnswer in JSON error handling: when expectJson=True and value is an
invalid JSON string FakeAnswer currently silently assigns the raw value instead
of raising ValueError; fix by adding a concise comment inside the
FakeAnswer.setAnswer method (referencing FakeAnswer and setAnswer and the
expectJson branch) that documents this intentional relaxation of JSON validation
for streaming-focused tests so future readers understand the difference and
won't change behavior inadvertently.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 3881947a-8d53-431f-bbae-2503f56f815c

📥 Commits

Reviewing files that changed from the base of the PR and between b6b2dc1 and 97a19d3.

📒 Files selected for processing (3)
  • nodes/src/nodes/llm_base/streaming.py
  • nodes/src/nodes/llm_base/streaming_config.py
  • test/nodes/test_llm_streaming.py

Comment thread nodes/src/nodes/llm_base/streaming_config.py Outdated
Comment thread nodes/src/nodes/llm_base/streaming.py Outdated
Comment thread nodes/src/nodes/llm_base/streaming.py
Comment thread nodes/src/nodes/llm_base/streaming.py
Comment thread nodes/src/nodes/llm_base/streaming.py
Comment thread nodes/src/nodes/llm_base/streaming.py Outdated
Comment thread test/nodes/test_llm_streaming.py
Comment on lines +586 to +600
def setAnswer(self, value):
if self.expectJson:
import json

if isinstance(value, (dict, list)):
self.answer = value
elif isinstance(value, str):
try:
self.answer = json.loads(value)
except json.JSONDecodeError:
self.answer = value
else:
self.answer = value
else:
self.answer = value
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

FakeAnswer diverges from real Answer on JSON error handling.

When expectJson=True and value is an invalid JSON string, the real Answer.setAnswer() raises ValueError('Expected a JSON-compatible answer (dict or list).'), but FakeAnswer silently assigns the raw value (line 596).

This simplification is acceptable since these tests focus on streaming mechanics rather than JSON validation. However, consider adding a comment noting this intentional difference to prevent future confusion.

📝 Suggested documentation
     def setAnswer(self, value):
+        # NOTE: Unlike real Answer, this does not raise ValueError on
+        # invalid JSON to simplify streaming tests.
         if self.expectJson:
🧰 Tools
🪛 Ruff (0.15.7)

[warning] 586-586: Missing return type annotation for private function setAnswer

Add return type annotation: None

(ANN202)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@test/nodes/test_llm_streaming.py` around lines 586 - 600,
FakeAnswer.setAnswer diverges from the real Answer.setAnswer in JSON error
handling: when expectJson=True and value is an invalid JSON string FakeAnswer
currently silently assigns the raw value instead of raising ValueError; fix by
adding a concise comment inside the FakeAnswer.setAnswer method (referencing
FakeAnswer and setAnswer and the expectJson branch) that documents this
intentional relaxation of JSON validation for streaming-focused tests so future
readers understand the difference and won't change behavior inadvertently.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai Good observation. If the FakeAnswer test double diverges from the real Answer class in how it handles JSON errors (e.g., silently returning a default vs. raising an exception), tests that pass with the fake may fail against the real implementation — or worse, mask real bugs. Will review the real Answer class's JSON error handling behavior and update FakeAnswer to match it faithfully, or add a comment documenting the intentional divergence and why it's acceptable for these tests. Will address in the next commit.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rate Limit Exceeded

@nihalnihalani have exceeded the limit for the number of chat messages per hour. Please wait 1 minutes and 36 seconds before sending another message.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in a63c4a1. Added a docstring note to _fake_schema_module explaining that FakeAnswer intentionally implements only the subset of the real Answer API exercised by StreamingHandler. The divergence on JSON error handling is acceptable since these tests focus on streaming mechanics.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rate Limit Exceeded

@nihalnihalani have exceeded the limit for the number of chat messages per hour. Please wait 0 minutes and 1 seconds before sending another message.

- Remove unreachable fallback branch in get_provider_name since the
  for-loop over split('.') already handles bare 'llm_*' strings
- Rename 'type' parameter to 'event_type' in Protocol to avoid
  shadowing the builtin
- Add comment explaining output_tokens += 1 is a fallback chunk count
  overwritten by real SDK usage data when available
- Rename misleading 'delta' variable to 'choices' for OpenAI-compatible
  chunk extraction to match what the attribute actually contains

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ming

- Rename misleading `choices` variable to `data` in Mistral chunk extraction
  path where it actually holds `chunk.data`, not choices
- Add debug-level logging for SSE send failures instead of silently passing
- Prefix unused `mock_sse` variable with underscore in test
- Add docstring note explaining FakeAnswer is an intentionally minimal
  test double that covers only the StreamingHandler interface

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@nodes/src/nodes/llm_base/streaming_config.py`:
- Around line 90-92: The loop in streaming_config that returns segment[4:] for
segments starting with "llm_" returns aliased names like "openai_api" which
don't match the allowlisted canonical keys used by
is_provider_streaming_capable(); change the logic in that block (the loop over
logical_type segments) to canonicalize the suffix before returning (e.g., map
common aliases to canonical provider keys such as mapping "openai_api" or
"openai-legacy" -> "openai") by calling or adding a normalization helper (e.g.,
canonicalize_provider_key or similar) so the value returned from that loop
matches what is_provider_streaming_capable() expects.

In `@nodes/src/nodes/llm_base/streaming.py`:
- Around line 371-380: The current code forcibly stringifies non-string results
which destroys existing Answer objects and structured payloads; update the flow
in the chat handling: if result is already an Answer instance (check
isinstance(result, Answer) or answer_cls), return it directly; otherwise,
extract content (use hasattr(result,'content') as now) and when creating answer
= answer_cls(expectJson=expect_json) avoid calling str() on non-strings — if
content is a string call answer.setAnswer(content), if content is a dict/list
and expect_json is True serialize with json.dumps(content) (import json), and
for other non-string objects pass them through without wholesale str() coercion
so structured payloads are preserved. Ensure you reference chat_fn, result,
answer_cls, Answer and setAnswer when making changes.

In `@test/nodes/test_llm_streaming.py`:
- Around line 51-57: The helper _load_module currently writes modules directly
into sys.modules (e.g., registering "nodes" and "nodes.llm_base"), which mutates
the global import table across the pytest session; change the implementation to
avoid permanent global mutation by performing temporary injection via a context
(use unittest.mock.patch.dict on sys.modules or a pytest fixture) so the
synthetic package entries (e.g., "nodes" and the loaded module name like
"nodes.llm_base") exist only while loading/executing the module and are restored
afterward; update both _load_module and the similar logic around the 75-84
region to use patch.dict(sys.modules, {...}, clear=False) or a fixture that
saves/restores original entries rather than assigning directly to sys.modules.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: fd5c8cca-7ff3-4764-ab52-4a7557275cce

📥 Commits

Reviewing files that changed from the base of the PR and between 97a19d3 and 595dfda.

📒 Files selected for processing (3)
  • nodes/src/nodes/llm_base/streaming.py
  • nodes/src/nodes/llm_base/streaming_config.py
  • test/nodes/test_llm_streaming.py

Comment thread nodes/src/nodes/llm_base/streaming_config.py Outdated
Comment thread nodes/src/nodes/llm_base/streaming.py
Comment thread nodes/src/nodes/llm_base/streaming.py
Comment thread test/nodes/test_llm_streaming.py
- Canonicalize suffixed logical types in get_provider_name() so that
  names like 'llm_openai_api' correctly map to 'openai' for streaming
  capability checks
- Emit stream_end SSE event on error fallback path so clients waiting
  for the terminal event don't hang indefinitely
- Return existing Answer objects from fallback without str()-coercion
- Prefix unused mock_sse variables with underscore in tests
- Scope synthetic sys.modules entries to a session fixture for cleanup
- Add tests for canonicalization and stream_end-on-error behaviors

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@asclearuc
Copy link
Copy Markdown
Collaborator

Thank you for the contribution — the direction is right. Reusing the existing SSE
infrastructure for token streaming is a solid approach, and the fallback-to-blocking
design is a good safety net.

Before we go deeper into the review: is there a design doc, issue, or RFC that
describes what the full implementation should look like? That context would help us
review this more effectively and align on the intended end state.

The reason we ask is that the PR currently reads as a skeleton — it adds 380 lines
of infrastructure but no LLM node actually uses it yet, so the feature cannot be
exercised end-to-end. That also makes it hard to validate design decisions since
the integration points are still hypothetical.

Looking forward to understanding the bigger picture before we continue the review.

@nihalnihalani
Copy link
Copy Markdown
Contributor Author

Good question — there isn't a standalone design doc or RFC for the streaming architecture yet. Here's an inline description of how it works, based on the implementation in nodes/src/nodes/llm_base/streaming.py and streaming_config.py:

Architecture Overview

The streaming system is a opt-in, per-provider utility that LLM nodes can adopt incrementally. It is intentionally not wired into the base IInstanceGenericLLM class — individual LLM node implementations choose to use it by instantiating a StreamingHandler.

LLM Node (e.g. llm_openai)
  └─ StreamingHandler.stream_response(chat_fn, question)
       ├─ Calls chat_fn(prompt, stream=True)
       ├─ Iterates chunks from provider SDK
       ├─ Extracts text via provider-specific _extract_chunk_text()
       ├─ Emits SSE events token-by-token via engine's sendSSE interface
       └─ Accumulates full text into an Answer object

SSE Event Flow

The handler emits three event types through the engine's instance.sendSSE interface:

  1. stream_start — fired before iteration begins, includes the provider name
  2. token — fired for each non-empty text chunk, carries text=<fragment>
  3. stream_end — fired after iteration completes (or on error), carries input_tokens and output_tokens counts

SSE delivery is best-effort — all sendSSE calls are wrapped in try/except so that SSE transport failures never break the main chat response path. This means clients get real-time tokens when the transport works, but the final Answer is always returned regardless.

Fallback Mechanism

There are two fallback triggers:

  1. Provider not streaming-capable: If the provider isn't in the STREAMING_CAPABLE_PROVIDERS set, the handler skips streaming entirely and calls chat_fn(prompt) without stream=True. No SSE events are emitted at all.

  2. Streaming call raises an exception: If the stream=True call fails (or the generator raises mid-iteration), the handler:

    • Emits a stream_end event (so SSE clients don't hang waiting)
    • Falls back to a non-streaming chat_fn(prompt) call (with stream kwarg stripped)
    • Returns the fallback result as a normal Answer

The fallback path also handles the case where chat_fn already returns an Answer object (passes it through unchanged) vs. returning a raw string or content-bearing object (wraps it).

Provider-Specific Chunk Extraction

Each provider SDK returns streaming chunks in a different shape. _extract_chunk_text() handles this with provider-specific attribute paths:

Provider Chunk shape
OpenAI / DeepSeek / XAI / Perplexity chunk.choices[0].delta.content (OpenAI-compatible)
Anthropic chunk.delta.text (ContentBlockDelta), fallback to chunk.text
Gemini chunk.text
Mistral chunk.data.choices[0].delta.content or chunk.choices[0].delta.content (newer SDK)
Ollama chunk['message']['content'] (dict) or chunk.text/chunk.content (object)
Unknown Generic chunk.text / chunk.content / str(chunk) fallback

Token usage metadata is also extracted per-provider — OpenAI-style (chunk.usage.prompt_tokens) and Anthropic-style (chunk.message.usage.input_tokens) are both handled, with a per-chunk counter as a rough fallback.

Currently Streaming-Capable Providers

openai, anthropic, gemini, mistral, deepseek, xai, perplexity, ollama

Provider names are resolved from the engine's logical type string (e.g. llm_openaiopenai) via get_provider_name(), which also canonicalizes suffixed variants like llm_openai_apiopenai.


I think it would be worth creating a formal ADR or design doc for this — happy to draft one if that would be useful.

@nihalnihalani
Copy link
Copy Markdown
Contributor Author

Hi team — all 20 CodeRabbit fixes were implemented in commit a63c4a1, 60 tests pass, and the architecture explanation has been posted in the thread. Would appreciate a re-review when you have a moment. Thanks!

@nihalnihalani
Copy link
Copy Markdown
Contributor Author

Streaming reduces time-to-first-token for all LLM nodes. 60 tests pass, architecture documented inline. No external dependencies added.

…e-org#545)

- Emit terminal SSE event on fallback: stream_end on successful fallback,
  stream_error when both streaming and fallback fail
- Don't str()-coerce fallback results (pass Answer objects directly)
- Canonicalize suffixed provider names in is_provider_streaming_capable
  (e.g. openai_api -> openai)
- Improve test isolation by clearing _INJECTED_MODULE_KEYS on teardown
- Add tests for stream_error emission and suffixed provider capability

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 8, 2026

No description provided.

@nihalnihalani
Copy link
Copy Markdown
Contributor Author

Review Feedback Addressed

Fixed all acknowledged issues from the code review:

Major Fixes

  1. Terminal SSE event on fallback — When streaming falls back, a stream_end event is now emitted on successful fallback, and a stream_error event is emitted when both streaming and fallback fail. SSE clients will no longer hang waiting for a terminal event.

  2. No str() coercion on fallback results_fallback() no longer wraps content in str(), so Answer objects and other content-bearing objects are passed through directly without being turned into repr strings.

  3. Canonicalize suffixed logical typesis_provider_streaming_capable() now uses _canonicalize_provider() to handle suffixed variants like openai_apiopenai before the capability check.

Minor Fixes

  1. type parameter naming — Already uses event_type throughout (_send_sse, sendSSE protocol); no shadowing of the builtin.

  2. Test isolation_INJECTED_MODULE_KEYS is now cleared on teardown to prevent leakage across test sessions.

New Tests

  • test_stream_error_emitted_when_fallback_also_fails — verifies stream_error SSE event
  • test_provider_capable_suffixed — verifies suffixed provider canonicalization

All 62 tests pass.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
nodes/src/nodes/llm_base/streaming.py (1)

199-206: ⚠️ Potential issue | 🟠 Major

Emit the terminal SSE event before _fallback() starts blocking.

stream_end is still sent only after the fallback call returns. If the non-streaming fallback is slow or hangs, clients remain stuck in the streaming state even though the stream has already failed.

🐛 Minimal change to unblock SSE clients immediately
         except Exception as exc:
             logger.debug('Streaming failed for provider=%s, falling back to non-streaming', self._provider, exc_info=True)
+            self._emit_stream_end({'input_tokens': 0, 'output_tokens': 0})
             try:
                 result = self._fallback(chat_fn, prompt, expect_json, Answer, **kwargs)
             except Exception as fallback_exc:
                 self._emit_stream_error(str(fallback_exc))
                 raise fallback_exc from exc
-            self._emit_stream_end({'input_tokens': 0, 'output_tokens': 0})
             return result
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nodes/src/nodes/llm_base/streaming.py` around lines 199 - 206, When catching
the streaming Exception in StreamingLLM (the except block around
self._fallback), emit the terminal SSE event immediately before invoking the
blocking non-streaming fallback so clients are unblocked; specifically call
self._emit_stream_end({'input_tokens': 0, 'output_tokens': 0}) right after
logger.debug and before calling self._fallback(chat_fn, prompt, expect_json,
Answer, **kwargs), then proceed to call _fallback and keep the existing fallback
error handling that calls self._emit_stream_error(str(fallback_exc)) and
re-raises the exception.
test/nodes/test_llm_streaming.py (1)

59-65: ⚠️ Potential issue | 🟠 Major

The synthetic nodes.llm_base modules still leak across the pytest session.

Line 108 eagerly calls _setup_llm_modules() during import, so the fake nodes.* entries are installed before the fixture runs, and teardown only pop()s them instead of restoring any real modules they replaced. That keeps the same order-dependent sys.modules contamination the earlier fix was meant to remove.

Run this to see whether other Python files import the real nodes.llm_base modules that can be shadowed by this eager injection:

#!/bin/bash
rg -nP --type=py -C2 '\b(from|import)\s+nodes\.llm_base(\.|$)' -g '!test/nodes/test_llm_streaming.py'

If this returns matches, those imports can observe the synthetic modules installed during collection.

Also applies to: 97-108

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@test/nodes/test_llm_streaming.py` around lines 59 - 65, The synthetic modules
are being injected eagerly at import time and can overwrite real modules because
_load_module unconditionally writes to sys.modules and _setup_llm_modules is
called during import; change the approach so injection happens only inside the
pytest fixture (avoid calling _setup_llm_modules at import time) or make
_load_module first check sys.modules for an existing module and save any
original under a restoration map (not just appending to _INJECTED_MODULE_KEYS),
then ensure the fixture teardown restores originals into sys.modules (or deletes
injected keys) rather than only pop()-ing names; update references in
_load_module, _setup_llm_modules and the teardown logic to use this
save-and-restore behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@nodes/src/nodes/llm_base/streaming.py`:
- Around line 30-35: The StreamingHandler currently decides to stream based only
on provider capability, ignoring the caller's config; update stream_response
(and any other entry points in StreamingHandler such as the internal streaming
decision logic around the blocks at the previous ranges) to consult
is_streaming_enabled(self._config) in addition to provider capability before
enabling streaming, so that streaming only occurs when both the provider
supports it and the node config opts in; alternatively, if you intend the caller
to control streaming, remove the config parameter from StreamingHandler and its
constructor to avoid implied opt-in — reference StreamingHandler,
stream_response, and is_streaming_enabled when making the change.

---

Duplicate comments:
In `@nodes/src/nodes/llm_base/streaming.py`:
- Around line 199-206: When catching the streaming Exception in StreamingLLM
(the except block around self._fallback), emit the terminal SSE event
immediately before invoking the blocking non-streaming fallback so clients are
unblocked; specifically call self._emit_stream_end({'input_tokens': 0,
'output_tokens': 0}) right after logger.debug and before calling
self._fallback(chat_fn, prompt, expect_json, Answer, **kwargs), then proceed to
call _fallback and keep the existing fallback error handling that calls
self._emit_stream_error(str(fallback_exc)) and re-raises the exception.

In `@test/nodes/test_llm_streaming.py`:
- Around line 59-65: The synthetic modules are being injected eagerly at import
time and can overwrite real modules because _load_module unconditionally writes
to sys.modules and _setup_llm_modules is called during import; change the
approach so injection happens only inside the pytest fixture (avoid calling
_setup_llm_modules at import time) or make _load_module first check sys.modules
for an existing module and save any original under a restoration map (not just
appending to _INJECTED_MODULE_KEYS), then ensure the fixture teardown restores
originals into sys.modules (or deletes injected keys) rather than only pop()-ing
names; update references in _load_module, _setup_llm_modules and the teardown
logic to use this save-and-restore behavior.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: df096f12-b59c-49f3-8a18-71221e19c2de

📥 Commits

Reviewing files that changed from the base of the PR and between 595dfda and 561bd50.

📒 Files selected for processing (3)
  • nodes/src/nodes/llm_base/streaming.py
  • nodes/src/nodes/llm_base/streaming_config.py
  • test/nodes/test_llm_streaming.py

Comment on lines +30 to +35
Usage from an LLM node's ``IInstance``::

from nodes.llm_base.streaming import StreamingHandler

handler = StreamingHandler(self, config)
answer = handler.stream_response(chat_fn, question)
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

StreamingHandler still doesn't enforce the advertised opt-in contract.

The example usage passes config into the handler, but the streaming decision only checks provider capability. As written, any node that swaps in stream_response() unconditionally will stream by default on capable providers even when streaming/stream is false. Either consult is_streaming_enabled(self._config) here or remove config from this API so callers can't assume the helper is doing that gate for them.

♻️ One way to keep the opt-in contract local to this helper
-from .streaming_config import get_provider_name, is_provider_streaming_capable
+from .streaming_config import (
+    get_provider_name,
+    is_provider_streaming_capable,
+    is_streaming_enabled,
+)
@@
-        if not is_provider_streaming_capable(self._provider):
+        if (
+            not is_streaming_enabled(self._config)
+            or not is_provider_streaming_capable(self._provider)
+        ):
             return self._fallback(chat_fn, prompt, expect_json, Answer, **kwargs)

Also applies to: 102-109, 162-164

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nodes/src/nodes/llm_base/streaming.py` around lines 30 - 35, The
StreamingHandler currently decides to stream based only on provider capability,
ignoring the caller's config; update stream_response (and any other entry points
in StreamingHandler such as the internal streaming decision logic around the
blocks at the previous ranges) to consult is_streaming_enabled(self._config) in
addition to provider capability before enabling streaming, so that streaming
only occurs when both the provider supports it and the node config opts in;
alternatively, if you intend the caller to control streaming, remove the config
parameter from StreamingHandler and its constructor to avoid implied opt-in —
reference StreamingHandler, stream_response, and is_streaming_enabled when
making the change.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. StreamingHandler currently only gates on provider capability, and the opt-in boundary still needs to be nailed down. Since asclearuc asked on April 8, 2026 that we stop code changes until the ADR answers activation scope, ownership of the decision, and no-listener behavior, I am pausing implementation here and will fold this into the design doc plus the follow-up revision rather than patching the helper in this state.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rate Limit Exceeded

@nihalnihalani have exceeded the limit for the number of chat messages per hour. Please wait 0 minutes and 43 seconds before sending another message.

@nihalnihalani
Copy link
Copy Markdown
Contributor Author

🚀 Merge Request

All Major review issues addressed:

  • ✅ Terminal SSE events emitted on fallback (stream_end/stream_error)
  • ✅ Removed str() coercion on fallback Answer objects
  • ✅ Provider name canonicalization for suffixed types (openai_api → openai)
  • ✅ Improved test isolation
  • ✅ 2 new tests added
  • ✅ All 62 tests passing

Ready for review and merge.

@nihalnihalani
Copy link
Copy Markdown
Contributor Author

@asclearuc @stepmikhaylov — All Major issues fixed (terminal SSE events on fallback, removed str() coercion, provider name canonicalization). 62/62 tests passing. Ready for review. Thanks! 🙏

@asclearuc
Copy link
Copy Markdown
Collaborator

Thank you for the detailed breakdown — that's helpful context.

The offer to write an ADR is exactly what we'd recommend. Before any further
code, the design doc should address one constraint that's currently missing
entirely: streaming is only meaningful when a human is waiting for the output.
In a pipeline where an LLM node feeds another node, the downstream node needs
the complete answer anyway — streaming tokens mid-pipeline adds SSE overhead
with no user benefit.

The design doc should answer:

  • When should streaming activate — terminal node only, or any node?
  • Who makes that decision — the node, the pipeline, or the incoming request?
  • What happens when the consumer has no SSE listener (another node, CLI, API call)?

We'd suggest: write the ADR first, get alignment on those questions, then
revisit the implementation. Happy to review the doc before any code changes.

In the meantime, would you mind converting this PR to a draft? It will signal
to other reviewers that the design is still in progress and prevent premature
merge attempts while the ADR is being worked on.

@nihalnihalani
Copy link
Copy Markdown
Contributor Author

@asclearuc Makes sense. I agree this needs ADR-level alignment before more implementation work. I have converted the PR to draft, and I will put together the design doc to answer the questions you called out: when streaming should activate, who owns that decision, and what should happen when there is no SSE listener. Once that is reviewed, I can come back and tighten the implementation against the agreed contract.

@nihalnihalani nihalnihalani marked this pull request as draft April 9, 2026 09:57
Copy link
Copy Markdown
Collaborator

@Rod-Christensen Rod-Christensen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really good idea that I think needs to be pushed forward!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

module:nodes Python pipeline nodes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants