Skip to content

fix: make RunnableRails.atransform return AsyncIterator#1763

Open
frankentini wants to merge 2 commits intoNVIDIA-NeMo:developfrom
frankentini:fix/runnable-rails-atransform-async-iter
Open

fix: make RunnableRails.atransform return AsyncIterator#1763
frankentini wants to merge 2 commits intoNVIDIA-NeMo:developfrom
frankentini:fix/runnable-rails-atransform-async-iter

Conversation

@frankentini
Copy link
Copy Markdown

@frankentini frankentini commented Apr 4, 2026

Summary

Fixes #1692RunnableRails.atransform() was returning a coroutine (via ainvoke) instead of an AsyncIterator, causing a TypeError: 'async for' requires an object with __aiter__ method, got coroutine when RunnableRails was used inside a RunnableSequence pipeline with astream().

Problem

The atransform method signature should match LangChain's Runnable.atransform protocol, which takes an AsyncIterator[Input] and returns an AsyncIterator[Output]. The previous implementation was:

async def atransform(self, input, config=None, **kwargs):
    return await self.ainvoke(input, config, **kwargs)

This returned a single awaited value (a coroutine result), not an async iterator. The same issue affected the synchronous transform method.

Fix

Both transform and atransform now:

  • Accept an iterator/async-iterator as input (matching the Runnable protocol)
  • Yield results via generator/async-generator syntax
  • Properly consume input items and process each through invoke/ainvoke

Tests

Added tests/runnable_rails/test_atransform_async_iter.py with three tests:

Summary by CodeRabbit

  • New Features

    • LangChain integration now supports streaming batch processing of multiple inputs simultaneously.
    • Transform methods can consume streams of data and yield results iteratively, enabling efficient pipeline integration.
  • Tests

    • Added comprehensive test coverage for stream processing functionality in pipelines.

…1692)

atransform() was returning a coroutine (via ainvoke) instead of an
AsyncIterator, causing 'async for requires __aiter__' TypeError when
RunnableRails is used inside a RunnableSequence pipeline with astream().

Similarly, transform() was returning a single value instead of an
Iterator.

Both methods now properly consume their input iterator/async-iterator
and yield results, conforming to the langchain_core Runnable protocol.

Fixes NVIDIA-NeMo#1692
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps bot commented Apr 4, 2026

Greptile Summary

This PR correctly fixes the TypeError from #1692 by converting transform/atransform to generator functions that return Iterator/AsyncIterator as required by LangChain's Runnable protocol. However, both implementations call invoke/ainvoke for each item in the upstream iterator rather than accumulating all chunks into a single complete input first — diverging from base class behaviour and producing incorrect results when an upstream step (e.g., a streaming LLM) emits multiple partial chunks.

  • P1 (transform): Iterates over chunks calling invoke on each; should accumulate all chunks via + then call invoke once (lines 865–866).
  • P1 (atransform): Same issue on the async side — each partial chunk processed independently rather than assembled first (lines 880–881).

Confidence Score: 3/5

Two P1 logic bugs remain: invoke/ainvoke called per upstream chunk rather than on the accumulated complete input

The primary TypeError from #1692 is fixed, but both transform and atransform diverge from the LangChain base class protocol by processing each streaming chunk as an independent invocation. This would silently apply guardrails to partial token fragments when an upstream step emits multiple chunks (e.g., a streaming LLM as an output guardrail), producing incorrect behaviour. Score is 3 because the bug is on the primary changed code path and would cause silent correctness failures in common pipeline configurations.

nemoguardrails/integrations/langchain/runnable_rails.py — lines 865–866 (transform) and 880–881 (atransform)

Important Files Changed

Filename Overview
nemoguardrails/integrations/langchain/runnable_rails.py transform/atransform correctly converted to generators (fixing the AsyncIterator TypeError), but both call invoke per-chunk instead of accumulating all chunks first per the LangChain protocol
tests/runnable_rails/test_atransform_async_iter.py New tests cover the AsyncIterator fix and single-chunk pipeline integration, but do not test multi-chunk upstream streaming, which would catch the invoke-per-chunk bug

Sequence Diagram

sequenceDiagram
    participant Pipeline as RunnableSequence
    participant Upstream as UpstreamRunnable (e.g. streaming LLM)
    participant RR as RunnableRails.atransform

    Pipeline->>Upstream: astream(input)
    Upstream-->>RR: chunk1 (partial token)
    Note over RR: BUG: ainvoke(chunk1) ❌
    Upstream-->>RR: chunk2 (partial token)
    Note over RR: BUG: ainvoke(chunk2) ❌
    Upstream-->>RR: chunk3 (final token)
    Note over RR: BUG: ainvoke(chunk3) ❌

    Note over RR: CORRECT: accumulate chunk1+chunk2+chunk3,
then ainvoke(full_message) once ✅
    RR-->>Pipeline: yield output
Loading
Prompt To Fix All With AI
This is a comment left during a code review.
Path: nemoguardrails/integrations/langchain/runnable_rails.py
Line: 865-866

Comment:
**Each chunk invoked separately, not accumulated**

The LangChain `Runnable.transform` protocol passes upstream streaming chunks as the input iterator — these must be accumulated into one complete input before a single `invoke` call. The current loop calls `invoke` on each individual chunk, so if an upstream step (e.g., a streaming LLM used for output guardrails) emits `N` token chunks, guardrails will be applied `N` times on partial tokens instead of once on the complete message. The docstring itself says "the entire input must be collected before processing," but the code contradicts this.

The fix should mirror LangChain's base `Runnable` behaviour:
```suggestion
        final = None
        got_first = False
        for chunk in input_stream:
            if not got_first:
                final = chunk
                got_first = True
            else:
                try:
                    final = final + chunk
                except TypeError:
                    final = chunk
        if got_first:
            yield self.invoke(final, config, **kwargs)
```

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: nemoguardrails/integrations/langchain/runnable_rails.py
Line: 880-881

Comment:
**Each async chunk invoked separately, not accumulated**

Same problem as the synchronous `transform` above. `atransform` receives streaming chunks from the upstream runnable; all chunks must be accumulated (via `+`) into one complete message before calling `ainvoke` once. Processing each partial token independently would silently apply guardrails to fragments rather than the full assembled input.

```suggestion
        final = None
        got_first = False
        async for chunk in input_stream:
            if not got_first:
                final = chunk
                got_first = True
            else:
                try:
                    final = final + chunk
                except TypeError:
                    final = chunk
        if got_first:
            yield await self.ainvoke(final, config, **kwargs)
```

How can I resolve this? If you propose a fix, please make it concise.

Greploops — Automatically fix all review issues by running /greploops in Claude Code. It iterates: fix, push, re-review, repeat until 5/5 confidence.
Use the Greptile plugin for Claude Code to query reviews, search comments, and manage custom context directly from your terminal.

Reviews (2): Last reviewed commit: "style: remove unused asyncio import, ren..." | Re-trigger Greptile


"""Tests that RunnableRails.atransform returns an AsyncIterator (fixes #1692)."""

import asyncio
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Unused import

asyncio is imported but never referenced in any of the three test functions. All async execution is handled by pytest-asyncio via the @pytest.mark.asyncio decorator.

Suggested change
import asyncio

(remove line 18)

Prompt To Fix With AI
This is a comment left during a code review.
Path: tests/runnable_rails/test_atransform_async_iter.py
Line: 18

Comment:
**Unused import**

`asyncio` is imported but never referenced in any of the three test functions. All async execution is handled by `pytest-asyncio` via the `@pytest.mark.asyncio` decorator.

```suggestion
```
(remove line 18)

How can I resolve this? If you propose a fix, please make it concise.

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 4, 2026

📝 Walkthrough

Walkthrough

The RunnableRails.transform and RunnableRails.atransform methods have been updated to properly handle streaming. Both methods now accept and iterate over input streams (synchronous Iterator[Input] and asynchronous AsyncIterator[Input] respectively), invoking the rails operation for each item and yielding results back as streams, enabling correct pipeline composition with LangChain's Runnable protocol.

Changes

Cohort / File(s) Summary
Core streaming implementation
nemoguardrails/integrations/langchain/runnable_rails.py
Modified transform() to iterate over Iterator[Input] and yield results from invoke() for each element. Modified atransform() to iterate over AsyncIterator[Input] and yield results from ainvoke() asynchronously. Changes shift from treating inputs as single values to element-wise consumption and streaming.
Streaming protocol tests
tests/runnable_rails/test_atransform_async_iter.py
New test module verifying iterator/async iterator return types and behavior. Includes passthrough_rails fixture for minimal RunnableRails setup. Tests cover: synchronous iteration, async iteration, and pipeline composition with astream().

Sequence Diagram(s)

sequenceDiagram
    participant Pipeline as Pipeline<br/>(RunnableLambda | RunnableRails)
    participant RTR as RunnableRails
    participant Iter as AsyncIterator[Item]
    participant Rails as Rails Engine

    Pipeline->>RTR: atransform(async_iter, config)
    RTR->>Iter: iterate items
    loop For each item in stream
        Iter-->>RTR: item
        RTR->>Rails: ainvoke(item, config)
        Rails-->>RTR: result
        RTR->>Pipeline: yield result
    end
    Pipeline->>Pipeline: astream() consumes via async for
Loading

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~10 minutes

🚥 Pre-merge checks | ✅ 5 | ❌ 1

❌ Failed checks (1 inconclusive)

Check name Status Explanation Resolution
Test Results For Major Changes ❓ Inconclusive PR makes major breaking API changes to transform/atransform methods but test file location and existence cannot be verified from repository state. Verify test file tests/runnable_rails/test_atransform_async_iter.py exists, review test coverage for breaking changes, and confirm all tests pass.
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main fix: making RunnableRails.atransform return an AsyncIterator, which is the primary objective of this PR.
Linked Issues check ✅ Passed The PR implementation directly addresses issue #1692 by making atransform accept AsyncIterator[Input] and return AsyncIterator[Output], with both transform and atransform updated to yield element-wise results.
Out of Scope Changes check ✅ Passed All changes are in scope: RunnableRails.transform and atransform signature/implementation fixes, plus new tests validating the AsyncIterator return type and pipeline compatibility.
Docstring Coverage ✅ Passed Docstring coverage is 85.71% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@nemoguardrails/integrations/langchain/runnable_rails.py`:
- Line 855: Rename the parameter named "input" (e.g., the function signature
with "input: Iterator[Input]" at lines 855 and the occurrence at line 870) to a
non-builtin-shadowing name such as "inputs" or "input_iter"; update the type
hint (Iterator[Input]) accordingly and replace all internal references to that
parameter within the function/method body to the new name, and update any
internal calls or local variables that reference it so callers and usages remain
consistent.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 07d1e7cc-dd9e-4979-90d0-ff64a98e665f

📥 Commits

Reviewing files that changed from the base of the PR and between 7aeba43 and ee0cee6.

📒 Files selected for processing (2)
  • nemoguardrails/integrations/langchain/runnable_rails.py
  • tests/runnable_rails/test_atransform_async_iter.py

def transform(
self,
input: Input,
input: Iterator[Input],
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify no transform/atransform signatures still shadow builtin `input`.
rg -nP 'def\s+a?transform\(\s*self,\s*input\s*:' nemoguardrails/integrations/langchain/runnable_rails.py

Repository: NVIDIA-NeMo/Guardrails

Length of output: 48


🏁 Script executed:

sed -n '850,880p' nemoguardrails/integrations/langchain/runnable_rails.py

Repository: NVIDIA-NeMo/Guardrails

Length of output: 1179


Rename input parameters to avoid builtin shadowing (Ruff A002).

Lines 855 and 870 shadow Python's builtin input, which Ruff flags as errors. Renaming these parameters keeps lint clean without behavior changes.

Proposed fix
     def transform(
         self,
-        input: Iterator[Input],
+        inputs: Iterator[Input],
         config: Optional[RunnableConfig] = None,
         **kwargs: Optional[Any],
     ) -> Iterator[Output]:
         """Transform the input.

         Consumes the input iterator and yields the result of invoke for each
         item.  For RunnableRails the entire input must be collected before
         processing, so streaming granularity matches invoke.
         """
-        for item in input:
+        for item in inputs:
             yield self.invoke(item, config, **kwargs)

     async def atransform(
         self,
-        input: AsyncIterator[Input],
+        inputs: AsyncIterator[Input],
         config: Optional[RunnableConfig] = None,
         **kwargs: Optional[Any],
     ) -> AsyncIterator[Output]:
         """Transform the input asynchronously.

         Consumes the async input iterator and yields the result of ainvoke for
         each item.  For RunnableRails the entire input must be collected before
         processing, so streaming granularity matches ainvoke.
         """
-        async for item in input:
+        async for item in inputs:
             yield await self.ainvoke(item, config, **kwargs)
🧰 Tools
🪛 Ruff (0.15.9)

[error] 855-855: Function argument input is shadowing a Python builtin

(A002)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nemoguardrails/integrations/langchain/runnable_rails.py` at line 855, Rename
the parameter named "input" (e.g., the function signature with "input:
Iterator[Input]" at lines 855 and the occurrence at line 870) to a
non-builtin-shadowing name such as "inputs" or "input_iter"; update the type
hint (Iterator[Input]) accordingly and replace all internal references to that
parameter within the function/method body to the new name, and update any
internal calls or local variables that reference it so callers and usages remain
consistent.

…ltin shadow

Address review feedback:
- Remove unused asyncio import in test file (pytest.mark.asyncio does not need it)
- Rename input -> input_stream in transform/atransform to avoid shadowing builtin
Comment on lines +865 to +866
for item in input_stream:
yield self.invoke(item, config, **kwargs)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Each chunk invoked separately, not accumulated

The LangChain Runnable.transform protocol passes upstream streaming chunks as the input iterator — these must be accumulated into one complete input before a single invoke call. The current loop calls invoke on each individual chunk, so if an upstream step (e.g., a streaming LLM used for output guardrails) emits N token chunks, guardrails will be applied N times on partial tokens instead of once on the complete message. The docstring itself says "the entire input must be collected before processing," but the code contradicts this.

The fix should mirror LangChain's base Runnable behaviour:

Suggested change
for item in input_stream:
yield self.invoke(item, config, **kwargs)
final = None
got_first = False
for chunk in input_stream:
if not got_first:
final = chunk
got_first = True
else:
try:
final = final + chunk
except TypeError:
final = chunk
if got_first:
yield self.invoke(final, config, **kwargs)
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemoguardrails/integrations/langchain/runnable_rails.py
Line: 865-866

Comment:
**Each chunk invoked separately, not accumulated**

The LangChain `Runnable.transform` protocol passes upstream streaming chunks as the input iterator — these must be accumulated into one complete input before a single `invoke` call. The current loop calls `invoke` on each individual chunk, so if an upstream step (e.g., a streaming LLM used for output guardrails) emits `N` token chunks, guardrails will be applied `N` times on partial tokens instead of once on the complete message. The docstring itself says "the entire input must be collected before processing," but the code contradicts this.

The fix should mirror LangChain's base `Runnable` behaviour:
```suggestion
        final = None
        got_first = False
        for chunk in input_stream:
            if not got_first:
                final = chunk
                got_first = True
            else:
                try:
                    final = final + chunk
                except TypeError:
                    final = chunk
        if got_first:
            yield self.invoke(final, config, **kwargs)
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +880 to +881
async for item in input_stream:
yield await self.ainvoke(item, config, **kwargs)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Each async chunk invoked separately, not accumulated

Same problem as the synchronous transform above. atransform receives streaming chunks from the upstream runnable; all chunks must be accumulated (via +) into one complete message before calling ainvoke once. Processing each partial token independently would silently apply guardrails to fragments rather than the full assembled input.

Suggested change
async for item in input_stream:
yield await self.ainvoke(item, config, **kwargs)
final = None
got_first = False
async for chunk in input_stream:
if not got_first:
final = chunk
got_first = True
else:
try:
final = final + chunk
except TypeError:
final = chunk
if got_first:
yield await self.ainvoke(final, config, **kwargs)
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemoguardrails/integrations/langchain/runnable_rails.py
Line: 880-881

Comment:
**Each async chunk invoked separately, not accumulated**

Same problem as the synchronous `transform` above. `atransform` receives streaming chunks from the upstream runnable; all chunks must be accumulated (via `+`) into one complete message before calling `ainvoke` once. Processing each partial token independently would silently apply guardrails to fragments rather than the full assembled input.

```suggestion
        final = None
        got_first = False
        async for chunk in input_stream:
            if not got_first:
                final = chunk
                got_first = True
            else:
                try:
                    final = final + chunk
                except TypeError:
                    final = chunk
        if got_first:
            yield await self.ainvoke(final, config, **kwargs)
```

How can I resolve this? If you propose a fix, please make it concise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

bug: RunnableRails.atransform is not returning an async interator

1 participant