fix: make RunnableRails.atransform return AsyncIterator#1763
fix: make RunnableRails.atransform return AsyncIterator#1763frankentini wants to merge 2 commits intoNVIDIA-NeMo:developfrom
Conversation
…1692) atransform() was returning a coroutine (via ainvoke) instead of an AsyncIterator, causing 'async for requires __aiter__' TypeError when RunnableRails is used inside a RunnableSequence pipeline with astream(). Similarly, transform() was returning a single value instead of an Iterator. Both methods now properly consume their input iterator/async-iterator and yield results, conforming to the langchain_core Runnable protocol. Fixes NVIDIA-NeMo#1692
Greptile SummaryThis PR correctly fixes the
|
| Filename | Overview |
|---|---|
| nemoguardrails/integrations/langchain/runnable_rails.py | transform/atransform correctly converted to generators (fixing the AsyncIterator TypeError), but both call invoke per-chunk instead of accumulating all chunks first per the LangChain protocol |
| tests/runnable_rails/test_atransform_async_iter.py | New tests cover the AsyncIterator fix and single-chunk pipeline integration, but do not test multi-chunk upstream streaming, which would catch the invoke-per-chunk bug |
Sequence Diagram
sequenceDiagram
participant Pipeline as RunnableSequence
participant Upstream as UpstreamRunnable (e.g. streaming LLM)
participant RR as RunnableRails.atransform
Pipeline->>Upstream: astream(input)
Upstream-->>RR: chunk1 (partial token)
Note over RR: BUG: ainvoke(chunk1) ❌
Upstream-->>RR: chunk2 (partial token)
Note over RR: BUG: ainvoke(chunk2) ❌
Upstream-->>RR: chunk3 (final token)
Note over RR: BUG: ainvoke(chunk3) ❌
Note over RR: CORRECT: accumulate chunk1+chunk2+chunk3,
then ainvoke(full_message) once ✅
RR-->>Pipeline: yield output
Prompt To Fix All With AI
This is a comment left during a code review.
Path: nemoguardrails/integrations/langchain/runnable_rails.py
Line: 865-866
Comment:
**Each chunk invoked separately, not accumulated**
The LangChain `Runnable.transform` protocol passes upstream streaming chunks as the input iterator — these must be accumulated into one complete input before a single `invoke` call. The current loop calls `invoke` on each individual chunk, so if an upstream step (e.g., a streaming LLM used for output guardrails) emits `N` token chunks, guardrails will be applied `N` times on partial tokens instead of once on the complete message. The docstring itself says "the entire input must be collected before processing," but the code contradicts this.
The fix should mirror LangChain's base `Runnable` behaviour:
```suggestion
final = None
got_first = False
for chunk in input_stream:
if not got_first:
final = chunk
got_first = True
else:
try:
final = final + chunk
except TypeError:
final = chunk
if got_first:
yield self.invoke(final, config, **kwargs)
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: nemoguardrails/integrations/langchain/runnable_rails.py
Line: 880-881
Comment:
**Each async chunk invoked separately, not accumulated**
Same problem as the synchronous `transform` above. `atransform` receives streaming chunks from the upstream runnable; all chunks must be accumulated (via `+`) into one complete message before calling `ainvoke` once. Processing each partial token independently would silently apply guardrails to fragments rather than the full assembled input.
```suggestion
final = None
got_first = False
async for chunk in input_stream:
if not got_first:
final = chunk
got_first = True
else:
try:
final = final + chunk
except TypeError:
final = chunk
if got_first:
yield await self.ainvoke(final, config, **kwargs)
```
How can I resolve this? If you propose a fix, please make it concise.Greploops — Automatically fix all review issues by running /greploops in Claude Code. It iterates: fix, push, re-review, repeat until 5/5 confidence.
Use the Greptile plugin for Claude Code to query reviews, search comments, and manage custom context directly from your terminal.
Reviews (2): Last reviewed commit: "style: remove unused asyncio import, ren..." | Re-trigger Greptile
|
|
||
| """Tests that RunnableRails.atransform returns an AsyncIterator (fixes #1692).""" | ||
|
|
||
| import asyncio |
There was a problem hiding this comment.
asyncio is imported but never referenced in any of the three test functions. All async execution is handled by pytest-asyncio via the @pytest.mark.asyncio decorator.
| import asyncio |
(remove line 18)
Prompt To Fix With AI
This is a comment left during a code review.
Path: tests/runnable_rails/test_atransform_async_iter.py
Line: 18
Comment:
**Unused import**
`asyncio` is imported but never referenced in any of the three test functions. All async execution is handled by `pytest-asyncio` via the `@pytest.mark.asyncio` decorator.
```suggestion
```
(remove line 18)
How can I resolve this? If you propose a fix, please make it concise.
📝 WalkthroughWalkthroughThe Changes
Sequence Diagram(s)sequenceDiagram
participant Pipeline as Pipeline<br/>(RunnableLambda | RunnableRails)
participant RTR as RunnableRails
participant Iter as AsyncIterator[Item]
participant Rails as Rails Engine
Pipeline->>RTR: atransform(async_iter, config)
RTR->>Iter: iterate items
loop For each item in stream
Iter-->>RTR: item
RTR->>Rails: ainvoke(item, config)
Rails-->>RTR: result
RTR->>Pipeline: yield result
end
Pipeline->>Pipeline: astream() consumes via async for
Estimated code review effort🎯 2 (Simple) | ⏱️ ~10 minutes 🚥 Pre-merge checks | ✅ 5 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@nemoguardrails/integrations/langchain/runnable_rails.py`:
- Line 855: Rename the parameter named "input" (e.g., the function signature
with "input: Iterator[Input]" at lines 855 and the occurrence at line 870) to a
non-builtin-shadowing name such as "inputs" or "input_iter"; update the type
hint (Iterator[Input]) accordingly and replace all internal references to that
parameter within the function/method body to the new name, and update any
internal calls or local variables that reference it so callers and usages remain
consistent.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 07d1e7cc-dd9e-4979-90d0-ff64a98e665f
📒 Files selected for processing (2)
nemoguardrails/integrations/langchain/runnable_rails.pytests/runnable_rails/test_atransform_async_iter.py
| def transform( | ||
| self, | ||
| input: Input, | ||
| input: Iterator[Input], |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify no transform/atransform signatures still shadow builtin `input`.
rg -nP 'def\s+a?transform\(\s*self,\s*input\s*:' nemoguardrails/integrations/langchain/runnable_rails.pyRepository: NVIDIA-NeMo/Guardrails
Length of output: 48
🏁 Script executed:
sed -n '850,880p' nemoguardrails/integrations/langchain/runnable_rails.pyRepository: NVIDIA-NeMo/Guardrails
Length of output: 1179
Rename input parameters to avoid builtin shadowing (Ruff A002).
Lines 855 and 870 shadow Python's builtin input, which Ruff flags as errors. Renaming these parameters keeps lint clean without behavior changes.
Proposed fix
def transform(
self,
- input: Iterator[Input],
+ inputs: Iterator[Input],
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> Iterator[Output]:
"""Transform the input.
Consumes the input iterator and yields the result of invoke for each
item. For RunnableRails the entire input must be collected before
processing, so streaming granularity matches invoke.
"""
- for item in input:
+ for item in inputs:
yield self.invoke(item, config, **kwargs)
async def atransform(
self,
- input: AsyncIterator[Input],
+ inputs: AsyncIterator[Input],
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> AsyncIterator[Output]:
"""Transform the input asynchronously.
Consumes the async input iterator and yields the result of ainvoke for
each item. For RunnableRails the entire input must be collected before
processing, so streaming granularity matches ainvoke.
"""
- async for item in input:
+ async for item in inputs:
yield await self.ainvoke(item, config, **kwargs)🧰 Tools
🪛 Ruff (0.15.9)
[error] 855-855: Function argument input is shadowing a Python builtin
(A002)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@nemoguardrails/integrations/langchain/runnable_rails.py` at line 855, Rename
the parameter named "input" (e.g., the function signature with "input:
Iterator[Input]" at lines 855 and the occurrence at line 870) to a
non-builtin-shadowing name such as "inputs" or "input_iter"; update the type
hint (Iterator[Input]) accordingly and replace all internal references to that
parameter within the function/method body to the new name, and update any
internal calls or local variables that reference it so callers and usages remain
consistent.
…ltin shadow Address review feedback: - Remove unused asyncio import in test file (pytest.mark.asyncio does not need it) - Rename input -> input_stream in transform/atransform to avoid shadowing builtin
| for item in input_stream: | ||
| yield self.invoke(item, config, **kwargs) |
There was a problem hiding this comment.
Each chunk invoked separately, not accumulated
The LangChain Runnable.transform protocol passes upstream streaming chunks as the input iterator — these must be accumulated into one complete input before a single invoke call. The current loop calls invoke on each individual chunk, so if an upstream step (e.g., a streaming LLM used for output guardrails) emits N token chunks, guardrails will be applied N times on partial tokens instead of once on the complete message. The docstring itself says "the entire input must be collected before processing," but the code contradicts this.
The fix should mirror LangChain's base Runnable behaviour:
| for item in input_stream: | |
| yield self.invoke(item, config, **kwargs) | |
| final = None | |
| got_first = False | |
| for chunk in input_stream: | |
| if not got_first: | |
| final = chunk | |
| got_first = True | |
| else: | |
| try: | |
| final = final + chunk | |
| except TypeError: | |
| final = chunk | |
| if got_first: | |
| yield self.invoke(final, config, **kwargs) |
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemoguardrails/integrations/langchain/runnable_rails.py
Line: 865-866
Comment:
**Each chunk invoked separately, not accumulated**
The LangChain `Runnable.transform` protocol passes upstream streaming chunks as the input iterator — these must be accumulated into one complete input before a single `invoke` call. The current loop calls `invoke` on each individual chunk, so if an upstream step (e.g., a streaming LLM used for output guardrails) emits `N` token chunks, guardrails will be applied `N` times on partial tokens instead of once on the complete message. The docstring itself says "the entire input must be collected before processing," but the code contradicts this.
The fix should mirror LangChain's base `Runnable` behaviour:
```suggestion
final = None
got_first = False
for chunk in input_stream:
if not got_first:
final = chunk
got_first = True
else:
try:
final = final + chunk
except TypeError:
final = chunk
if got_first:
yield self.invoke(final, config, **kwargs)
```
How can I resolve this? If you propose a fix, please make it concise.| async for item in input_stream: | ||
| yield await self.ainvoke(item, config, **kwargs) |
There was a problem hiding this comment.
Each async chunk invoked separately, not accumulated
Same problem as the synchronous transform above. atransform receives streaming chunks from the upstream runnable; all chunks must be accumulated (via +) into one complete message before calling ainvoke once. Processing each partial token independently would silently apply guardrails to fragments rather than the full assembled input.
| async for item in input_stream: | |
| yield await self.ainvoke(item, config, **kwargs) | |
| final = None | |
| got_first = False | |
| async for chunk in input_stream: | |
| if not got_first: | |
| final = chunk | |
| got_first = True | |
| else: | |
| try: | |
| final = final + chunk | |
| except TypeError: | |
| final = chunk | |
| if got_first: | |
| yield await self.ainvoke(final, config, **kwargs) |
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemoguardrails/integrations/langchain/runnable_rails.py
Line: 880-881
Comment:
**Each async chunk invoked separately, not accumulated**
Same problem as the synchronous `transform` above. `atransform` receives streaming chunks from the upstream runnable; all chunks must be accumulated (via `+`) into one complete message before calling `ainvoke` once. Processing each partial token independently would silently apply guardrails to fragments rather than the full assembled input.
```suggestion
final = None
got_first = False
async for chunk in input_stream:
if not got_first:
final = chunk
got_first = True
else:
try:
final = final + chunk
except TypeError:
final = chunk
if got_first:
yield await self.ainvoke(final, config, **kwargs)
```
How can I resolve this? If you propose a fix, please make it concise.
Summary
Fixes #1692 —
RunnableRails.atransform()was returning a coroutine (viaainvoke) instead of anAsyncIterator, causing aTypeError: 'async for' requires an object with __aiter__ method, got coroutinewhenRunnableRailswas used inside aRunnableSequencepipeline withastream().Problem
The
atransformmethod signature should match LangChain'sRunnable.atransformprotocol, which takes anAsyncIterator[Input]and returns anAsyncIterator[Output]. The previous implementation was:This returned a single awaited value (a coroutine result), not an async iterator. The same issue affected the synchronous
transformmethod.Fix
Both
transformandatransformnow:Tests
Added
tests/runnable_rails/test_atransform_async_iter.pywith three tests:test_transform_returns_iterator: Verifiestransform()returns anIteratortest_atransform_returns_async_iterator: Verifiesatransform()returns anAsyncIteratortest_astream_through_pipeline: Reproduces the exact bug from bug: RunnableRails.atransform is not returning an async interator #1692 —RunnableRailsin a pipeline withastream()Summary by CodeRabbit
New Features
Tests