Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions nodes/src/nodes/tool_pipe/IGlobal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# =============================================================================
# RocketRide Engine
# =============================================================================
# MIT License
# Copyright (c) 2026 Aparavi Software AG
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# =============================================================================

"""
tool_pipe node — global (shared) state.

Reads the tool configuration at startup so IInstance can route tool
invocations to the correct output lane on every call.
"""

from __future__ import annotations

from ai.common.config import Config
from rocketlib import IGlobalBase, OPEN_MODE, warning


VALID_RETURN_TYPES = {'text', 'answers', 'documents', 'table'}


class IGlobal(IGlobalBase):
"""Global state for tool_pipe."""

tool_description: str = ''
return_type: str = 'text'

def beginGlobal(self) -> None:
if self.IEndpoint.endpoint.openMode == OPEN_MODE.CONFIG:
return

cfg = Config.getNodeConfig(self.glb.logicalType, self.glb.connConfig)

self.tool_description = str(cfg.get('tool_description') or '').strip()
self.return_type = str(cfg.get('return_type') or '').strip() or 'text'

if self.return_type not in VALID_RETURN_TYPES:
raise Exception(f'tool_pipe: return_type must be one of {sorted(VALID_RETURN_TYPES)}')

def validateConfig(self) -> None:
try:
cfg = Config.getNodeConfig(self.glb.logicalType, self.glb.connConfig)
return_type = str(cfg.get('return_type') or '').strip() or 'text'
if return_type not in VALID_RETURN_TYPES:
warning(f'tool_pipe: return_type must be one of {sorted(VALID_RETURN_TYPES)}')
Comment thread
dylan-savage marked this conversation as resolved.
except Exception as e:
warning(str(e))

def endGlobal(self) -> None:
pass
201 changes: 201 additions & 0 deletions nodes/src/nodes/tool_pipe/IInstance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
# =============================================================================
# RocketRide Engine
# =============================================================================
# MIT License
# Copyright (c) 2026 Aparavi Software AG
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE OF ARISING FROM,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix the MIT license typo.

Line 21 says OTHERWISE OF ARISING FROM; it should match the standard header used in IGlobal.py.

📝 Proposed fix
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE OF ARISING FROM,
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nodes/src/nodes/tool_pipe/IInstance.py` at line 21, The MIT license header in
IInstance.py contains a typo ("OTHERWISE OF ARISING FROM"); update the license
phrase to match the standard MIT header used in IGlobal.py by replacing
"OTHERWISE OF ARISING FROM," with the correct wording "OR OTHERWISE, ARISING
FROM," so the header text is consistent; locate the license block at the top of
IInstance.py (module IInstance) and correct that phrase only.

# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# =============================================================================

"""
tool_pipe node instance.

Exposes a configurable tool to agents. When invoked, routes the input to all
connected output lanes. The write call is synchronous — by the time it returns,
all the downstream response nodes have already populated currentObject.response.
The new response entries are snapshotted and then removed so they don't leak
into the parent pipeline.
"""

from __future__ import annotations

import json

from rocketlib import IInstanceBase, getObject, tool_function, debug
from rocketlib import IJson

from .IGlobal import IGlobal

# Result type descriptions keyed by return_type config value.
_RETURN_TYPE_DESCRIPTIONS = {
'text': 'Plain text result from the pipeline.',
'answers': 'LLM-generated answer string from the pipeline.',
'documents': 'JSON-serialised array of document objects from the pipeline.',
'table': 'JSON-serialised table data from the pipeline.',
}
Comment thread
dylan-savage marked this conversation as resolved.


class IInstance(IInstanceBase):
IGlobal: IGlobal

@tool_function(
input_schema={
'type': 'object',
'required': ['data'],
'properties': {
'data': {
'type': 'string',
'description': 'The input to send to the pipeline.',
},
},
},
description='Accepts a text input and returns the pipeline result.',
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it really accepts INPUT?
according to services.json it is of type _source

output_schema=lambda self: {
'type': 'object',
'properties': {
'result': {
'type': 'string',
'description': _RETURN_TYPE_DESCRIPTIONS.get(
self.IGlobal.return_type,
'Result from the pipeline.',
),
},
},
},
Comment on lines +57 to +80
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: Inspect rocketlib.tool_function and existing call sites for dynamic descriptions.

rg -n -C4 'def tool_function|class .*tool_function|description=' packages/server/engine-lib/rocketlib-python/lib/rocketlib --type=py
rg -n -C3 '@tool_function\(' nodes/src/nodes --type=py

Repository: rocketride-org/rocketride-server

Length of output: 32897


🏁 Script executed:

# Inspect tool_function decorator implementation
sed -n '65,120p' packages/server/engine-lib/rocketlib-python/lib/rocketlib/filters.py

Repository: rocketride-org/rocketride-server

Length of output: 1578


🏁 Script executed:

# Look for IGlobal definition in tool_pipe module
find nodes/src/nodes/tool_pipe -name 'IGlobal.py' -o -name 'IInstance.py' | xargs ls -la

Repository: rocketride-org/rocketride-server

Length of output: 249


🏁 Script executed:

# Check IGlobal class for tool_description property
cat -n nodes/src/nodes/tool_pipe/IGlobal.py | head -50

Repository: rocketride-org/rocketride-server

Length of output: 2411


🏁 Script executed:

# Read the exact file and lines to confirm
cat -n nodes/src/nodes/tool_pipe/IInstance.py | sed -n '54,82p'

Repository: rocketride-org/rocketride-server

Length of output: 1173


🏁 Script executed:

# Find where __tool_meta__ is used/evaluated
rg -n '__tool_meta__|tool_meta__' packages/server/engine-lib/rocketlib-python/lib/rocketlib --type=py -A3

Repository: rocketride-org/rocketride-server

Length of output: 1917


🏁 Script executed:

# Search for where description is retrieved from __tool_meta__ and used
rg -n "description" packages/server/engine-lib/rocketlib-python/lib/rocketlib --type=py | grep -i "meta\|query\|schema" | head -20

Repository: rocketride-org/rocketride-server

Length of output: 468


🏁 Script executed:

# Read the full description evaluation logic
sed -n '710,740p' packages/server/engine-lib/rocketlib-python/lib/rocketlib/filters.py

Repository: rocketride-org/rocketride-server

Length of output: 1290


🏁 Script executed:

# Check the full context for the build_tools_descriptors function to see where user_desc comes from
sed -n '690,730p' packages/server/engine-lib/rocketlib-python/lib/rocketlib/filters.py

Repository: rocketride-org/rocketride-server

Length of output: 2027


Wire self.IGlobal.tool_description into the tool registration so agents see user-configured descriptions.

IGlobal.tool_description is populated from services.json, but the agent-facing tool_function description at line 68 remains hard-coded. This mismatch prevents configured descriptions from reaching agents.

The tool_function decorator supports callable descriptions — the same lambda pattern is already used for output_schema on line 69 and in other agent nodes across the codebase. Apply the fix:

Fix: Use a lambda to resolve the configured description
-        description='Accepts a text input and returns the pipeline result.',
+        description=lambda self: (
+            self.IGlobal.tool_description
+            or 'Accepts a text input and returns the pipeline result.'
+        ),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nodes/src/nodes/tool_pipe/IInstance.py` around lines 57 - 80, The tool
registration currently uses a static string for the tool_function description;
instead wire in the configured description by passing a callable that returns
self.IGlobal.tool_description (similar to the existing output_schema lambda).
Update the tool_function call in IInstance (the decorator block around
tool_function) to set description=lambda self: self.IGlobal.tool_description so
agents receive the user-configured description.

)
def run_pipe(self, input_obj) -> dict:
"""Run the connected pipeline with the given input and return its result."""
args = _normalize_tool_input(input_obj)
data = args.get('data')
if not data:
raise ValueError('tool_pipe: tool requires a non-empty `data` parameter')

debug(f'tool_pipe: run_pipe invoked data_len={len(data)}')

entry = getObject()
opened = False
try:
self.instance.open(entry)
opened = True
self._send_to_connected_lane(data)
except Exception as exc:
debug(f'tool_pipe: sub-pipeline failed: {exc}')
raise
finally:
if opened:
self.instance.close()

response = _to_python_dict(entry.response)

if not response:
debug('tool_pipe: sub-pipeline returned no data — ensure a response node is connected at the end of the sub-pipeline')

result = _extract_return_value(response, self.IGlobal.return_type)

debug(f'tool_pipe: returning result return_type={self.IGlobal.return_type!r} result_len={len(result)}')
return {'result': result}

# ------------------------------------------------------------------
# Lane routing
# ------------------------------------------------------------------

def _send_to_connected_lane(self, data: str) -> None:
"""Write data to all connected output lanes."""
if self.instance.hasListener('questions'):
from ai.common.schema import Question

q = Question()
q.addQuestion(data)
self.instance.writeQuestions(q)

if self.instance.hasListener('documents'):
from ai.common.schema import Doc

self.instance.writeDocuments([Doc(page_content=data)])

if self.instance.hasListener('table'):
self.instance.writeTable(data)

if self.instance.hasListener('text'):
self.instance.writeText(data)
Comment thread
dylan-savage marked this conversation as resolved.

if self.instance.hasListener('answers'):
from ai.common.schema import Answer

answer = Answer()
answer.setAnswer(data)
self.instance.writeAnswers(answer)


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------


def _to_python_dict(response) -> dict:
"""Convert a C++ IJson/IDict response object to a plain Python dict."""
if response is None:
return {}
try:
return IJson.toDict(response)
except Exception:
return {}


def _extract_return_value(result: dict, return_type: str) -> str:
"""Extract the configured return lane value from the sub-pipeline result."""
if not isinstance(result, dict):
return str(result) if result is not None else ''

value = result.get(return_type)

# answers is a list — return the first item
if return_type == 'answers' and isinstance(value, list):
return str(value[0]) if value else ''

# documents / table — serialise to JSON string for the agent
if isinstance(value, (list, dict)):
return json.dumps(value, ensure_ascii=False)

return str(value) if value is not None else ''


def _normalize_tool_input(input_obj):
"""Normalise tool input into a plain dict."""
if input_obj is None:
return {}
if hasattr(input_obj, 'model_dump') and callable(getattr(input_obj, 'model_dump')):
input_obj = input_obj.model_dump()
elif hasattr(input_obj, 'dict') and callable(getattr(input_obj, 'dict')):
input_obj = input_obj.dict()
if isinstance(input_obj, str):
try:
parsed = json.loads(input_obj)
if isinstance(parsed, dict):
input_obj = parsed
except Exception:
pass
if not isinstance(input_obj, dict):
return {}
if 'input' in input_obj and isinstance(input_obj['input'], dict):
inner = input_obj['input']
extras = {k: v for k, v in input_obj.items() if k != 'input'}
input_obj = {**inner, **extras}
input_obj.pop('security_context', None)
return input_obj
35 changes: 35 additions & 0 deletions nodes/src/nodes/tool_pipe/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# =============================================================================
# RocketRide Engine
# =============================================================================
# MIT License
# Copyright (c) 2026 Aparavi Software AG
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# =============================================================================

from os.path import dirname, join, realpath
from depends import depends

requirements = join(dirname(realpath(__file__)), 'requirements.txt')
depends(requirements)

from .IGlobal import IGlobal
from .IInstance import IInstance

__all__ = ['IGlobal', 'IInstance']
Empty file.
54 changes: 54 additions & 0 deletions nodes/src/nodes/tool_pipe/services.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
{
"title": "Pipeline Tool",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you mind add comments?
it is simple - say to claude to format it like openai's JSON file (or any other one)

"protocol": "tool_pipe://",
"classType": ["tool"],
"capabilities": ["invoke"],
"register": "filter",
"node": "python",
"path": "nodes.tool_pipe",
"prefix": "tool_pipe",
"description": ["Exposes an inline pipeline as an agent tool.", "Connect this node's output lanes to any pipeline nodes on the same canvas.", "When an agent calls the tool, the input is routed to every connected output lane.", "End each connected branch with a response node to return results."],
"icon": "pipetool.svg",
"tile": ["run_pipe"],
"lanes": {
"_source": ["text", "questions", "documents", "table", "answers"]
Comment thread
coderabbitai[bot] marked this conversation as resolved.
},
"input": [
{
"lane": "_source",
"output": [{ "lane": "text" }, { "lane": "questions" }, { "lane": "documents" }, { "lane": "table" }, { "lane": "answers" }]
}
],
"preconfig": {
"default": "default",
"profiles": {
"default": {
"title": "Pipeline Tool",
"tool_description": "",
"return_type": "text"
}
}
},
"fields": {
"tool_pipe.tool_description": {
"type": "string",
"title": "Tool Description",
"description": "Natural-language description the agent uses to decide when to call this tool",
"default": ""
},
"tool_pipe.return_type": {
"type": "string",
"title": "Return Type",
"description": "Which response lane value to return to the agent",
"default": "text",
"enum": ["text", "answers", "documents", "table"]
}
},
"shape": [
{
"section": "Pipe",
"title": "Pipeline Tool",
"properties": ["type", "tool_pipe.tool_description", "tool_pipe.return_type"]
}
]
}
5 changes: 5 additions & 0 deletions packages/server/engine-lib/engLib/store/stack.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,11 @@ Error IServiceEndpoint::generatePipelineStack() noexcept {
// Add this component, someone is invoking it
if (auto ccode = addComponent(component)) return ccode;

// Also walk downstream data-flow connections from this
// control component so that sub-pipeline nodes (connected
// via input lanes from this node) are included in the stack
if (auto ccode = walkComponents(compId)) return ccode;

// We added it, so start over again
bAdded = true;
break;
Expand Down
1 change: 1 addition & 0 deletions packages/shared-ui/src/assets/nodes/pipetool.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ import deepagentIcon from '../../../assets/nodes/deepagent.svg';
import twelvelabsIcon from '../../../assets/nodes/twelvelabs.svg';
import blandAiIcon from '../../../assets/nodes/bland-ai.svg';
import githubIcon from '../../../assets/nodes/github.svg';
import pipetoolIcon from '../../../assets/nodes/pipetool.svg';

/**
* Static lookup table mapping icon names (without file extensions) to their
Expand Down Expand Up @@ -200,6 +201,7 @@ const iconMap: Record<string, string> = {
twelvelabs: twelvelabsIcon,
'bland-ai': blandAiIcon,
github: githubIcon,
pipetool: pipetoolIcon,
};

/**
Expand Down
Loading