-
Notifications
You must be signed in to change notification settings - Fork 47
Feat: Pipeline tool #647
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Feat: Pipeline tool #647
Changes from all commits
cfb042c
080b026
0f69c5a
919b7f2
130c760
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| # ============================================================================= | ||
| # RocketRide Engine | ||
| # ============================================================================= | ||
| # MIT License | ||
| # Copyright (c) 2026 Aparavi Software AG | ||
| # | ||
| # Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| # of this software and associated documentation files (the "Software"), to deal | ||
| # in the Software without restriction, including without limitation the rights | ||
| # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| # copies of the Software, and to permit persons to whom the Software is | ||
| # furnished to do so, subject to the following conditions: | ||
| # | ||
| # The above copyright notice and this permission notice shall be included in | ||
| # all copies or substantial portions of the Software. | ||
| # | ||
| # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
| # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
| # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
| # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
| # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
| # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
| # SOFTWARE. | ||
| # ============================================================================= | ||
|
|
||
| """ | ||
| tool_pipe node — global (shared) state. | ||
|
|
||
| Reads the tool configuration at startup so IInstance can route tool | ||
| invocations to the correct output lane on every call. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from ai.common.config import Config | ||
| from rocketlib import IGlobalBase, OPEN_MODE, warning | ||
|
|
||
|
|
||
| VALID_RETURN_TYPES = {'text', 'answers', 'documents', 'table'} | ||
|
|
||
|
|
||
| class IGlobal(IGlobalBase): | ||
| """Global state for tool_pipe.""" | ||
|
|
||
| tool_description: str = '' | ||
| return_type: str = 'text' | ||
|
|
||
| def beginGlobal(self) -> None: | ||
| if self.IEndpoint.endpoint.openMode == OPEN_MODE.CONFIG: | ||
| return | ||
|
|
||
| cfg = Config.getNodeConfig(self.glb.logicalType, self.glb.connConfig) | ||
|
|
||
| self.tool_description = str(cfg.get('tool_description') or '').strip() | ||
| self.return_type = str(cfg.get('return_type') or '').strip() or 'text' | ||
|
|
||
| if self.return_type not in VALID_RETURN_TYPES: | ||
| raise Exception(f'tool_pipe: return_type must be one of {sorted(VALID_RETURN_TYPES)}') | ||
|
|
||
| def validateConfig(self) -> None: | ||
| try: | ||
| cfg = Config.getNodeConfig(self.glb.logicalType, self.glb.connConfig) | ||
| return_type = str(cfg.get('return_type') or '').strip() or 'text' | ||
| if return_type not in VALID_RETURN_TYPES: | ||
| warning(f'tool_pipe: return_type must be one of {sorted(VALID_RETURN_TYPES)}') | ||
| except Exception as e: | ||
| warning(str(e)) | ||
|
|
||
| def endGlobal(self) -> None: | ||
| pass | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,201 @@ | ||
| # ============================================================================= | ||
| # RocketRide Engine | ||
| # ============================================================================= | ||
| # MIT License | ||
| # Copyright (c) 2026 Aparavi Software AG | ||
| # | ||
| # Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| # of this software and associated documentation files (the "Software"), to deal | ||
| # in the Software without restriction, including without limitation the rights | ||
| # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| # copies of the Software, and to permit persons to whom the Software is | ||
| # furnished to do so, subject to the following conditions: | ||
| # | ||
| # The above copyright notice and this permission notice shall be included in | ||
| # all copies or substantial portions of the Software. | ||
| # | ||
| # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
| # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
| # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
| # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
| # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE OF ARISING FROM, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix the MIT license typo. Line 21 says 📝 Proposed fix-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE OF ARISING FROM,
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,🤖 Prompt for AI Agents |
||
| # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
| # SOFTWARE. | ||
| # ============================================================================= | ||
|
|
||
| """ | ||
| tool_pipe node instance. | ||
|
|
||
| Exposes a configurable tool to agents. When invoked, routes the input to all | ||
| connected output lanes. The write call is synchronous — by the time it returns, | ||
| all the downstream response nodes have already populated currentObject.response. | ||
| The new response entries are snapshotted and then removed so they don't leak | ||
| into the parent pipeline. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import json | ||
|
|
||
| from rocketlib import IInstanceBase, getObject, tool_function, debug | ||
| from rocketlib import IJson | ||
|
|
||
| from .IGlobal import IGlobal | ||
|
|
||
| # Result type descriptions keyed by return_type config value. | ||
| _RETURN_TYPE_DESCRIPTIONS = { | ||
| 'text': 'Plain text result from the pipeline.', | ||
| 'answers': 'LLM-generated answer string from the pipeline.', | ||
| 'documents': 'JSON-serialised array of document objects from the pipeline.', | ||
| 'table': 'JSON-serialised table data from the pipeline.', | ||
| } | ||
|
dylan-savage marked this conversation as resolved.
|
||
|
|
||
|
|
||
| class IInstance(IInstanceBase): | ||
| IGlobal: IGlobal | ||
|
|
||
| @tool_function( | ||
| input_schema={ | ||
| 'type': 'object', | ||
| 'required': ['data'], | ||
| 'properties': { | ||
| 'data': { | ||
| 'type': 'string', | ||
| 'description': 'The input to send to the pipeline.', | ||
| }, | ||
| }, | ||
| }, | ||
| description='Accepts a text input and returns the pipeline result.', | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does it really accepts INPUT? |
||
| output_schema=lambda self: { | ||
| 'type': 'object', | ||
| 'properties': { | ||
| 'result': { | ||
| 'type': 'string', | ||
| 'description': _RETURN_TYPE_DESCRIPTIONS.get( | ||
| self.IGlobal.return_type, | ||
| 'Result from the pipeline.', | ||
| ), | ||
| }, | ||
| }, | ||
| }, | ||
|
Comment on lines
+57
to
+80
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Description: Inspect rocketlib.tool_function and existing call sites for dynamic descriptions.
rg -n -C4 'def tool_function|class .*tool_function|description=' packages/server/engine-lib/rocketlib-python/lib/rocketlib --type=py
rg -n -C3 '@tool_function\(' nodes/src/nodes --type=pyRepository: rocketride-org/rocketride-server Length of output: 32897 🏁 Script executed: # Inspect tool_function decorator implementation
sed -n '65,120p' packages/server/engine-lib/rocketlib-python/lib/rocketlib/filters.pyRepository: rocketride-org/rocketride-server Length of output: 1578 🏁 Script executed: # Look for IGlobal definition in tool_pipe module
find nodes/src/nodes/tool_pipe -name 'IGlobal.py' -o -name 'IInstance.py' | xargs ls -laRepository: rocketride-org/rocketride-server Length of output: 249 🏁 Script executed: # Check IGlobal class for tool_description property
cat -n nodes/src/nodes/tool_pipe/IGlobal.py | head -50Repository: rocketride-org/rocketride-server Length of output: 2411 🏁 Script executed: # Read the exact file and lines to confirm
cat -n nodes/src/nodes/tool_pipe/IInstance.py | sed -n '54,82p'Repository: rocketride-org/rocketride-server Length of output: 1173 🏁 Script executed: # Find where __tool_meta__ is used/evaluated
rg -n '__tool_meta__|tool_meta__' packages/server/engine-lib/rocketlib-python/lib/rocketlib --type=py -A3Repository: rocketride-org/rocketride-server Length of output: 1917 🏁 Script executed: # Search for where description is retrieved from __tool_meta__ and used
rg -n "description" packages/server/engine-lib/rocketlib-python/lib/rocketlib --type=py | grep -i "meta\|query\|schema" | head -20Repository: rocketride-org/rocketride-server Length of output: 468 🏁 Script executed: # Read the full description evaluation logic
sed -n '710,740p' packages/server/engine-lib/rocketlib-python/lib/rocketlib/filters.pyRepository: rocketride-org/rocketride-server Length of output: 1290 🏁 Script executed: # Check the full context for the build_tools_descriptors function to see where user_desc comes from
sed -n '690,730p' packages/server/engine-lib/rocketlib-python/lib/rocketlib/filters.pyRepository: rocketride-org/rocketride-server Length of output: 2027 Wire
The Fix: Use a lambda to resolve the configured description- description='Accepts a text input and returns the pipeline result.',
+ description=lambda self: (
+ self.IGlobal.tool_description
+ or 'Accepts a text input and returns the pipeline result.'
+ ),🤖 Prompt for AI Agents |
||
| ) | ||
| def run_pipe(self, input_obj) -> dict: | ||
| """Run the connected pipeline with the given input and return its result.""" | ||
| args = _normalize_tool_input(input_obj) | ||
| data = args.get('data') | ||
| if not data: | ||
| raise ValueError('tool_pipe: tool requires a non-empty `data` parameter') | ||
|
|
||
| debug(f'tool_pipe: run_pipe invoked data_len={len(data)}') | ||
|
|
||
| entry = getObject() | ||
| opened = False | ||
| try: | ||
| self.instance.open(entry) | ||
| opened = True | ||
| self._send_to_connected_lane(data) | ||
| except Exception as exc: | ||
| debug(f'tool_pipe: sub-pipeline failed: {exc}') | ||
| raise | ||
| finally: | ||
| if opened: | ||
| self.instance.close() | ||
|
|
||
| response = _to_python_dict(entry.response) | ||
|
|
||
| if not response: | ||
| debug('tool_pipe: sub-pipeline returned no data — ensure a response node is connected at the end of the sub-pipeline') | ||
|
|
||
| result = _extract_return_value(response, self.IGlobal.return_type) | ||
|
|
||
| debug(f'tool_pipe: returning result return_type={self.IGlobal.return_type!r} result_len={len(result)}') | ||
| return {'result': result} | ||
|
|
||
| # ------------------------------------------------------------------ | ||
| # Lane routing | ||
| # ------------------------------------------------------------------ | ||
|
|
||
| def _send_to_connected_lane(self, data: str) -> None: | ||
| """Write data to all connected output lanes.""" | ||
| if self.instance.hasListener('questions'): | ||
| from ai.common.schema import Question | ||
|
|
||
| q = Question() | ||
| q.addQuestion(data) | ||
| self.instance.writeQuestions(q) | ||
|
|
||
| if self.instance.hasListener('documents'): | ||
| from ai.common.schema import Doc | ||
|
|
||
| self.instance.writeDocuments([Doc(page_content=data)]) | ||
|
|
||
| if self.instance.hasListener('table'): | ||
| self.instance.writeTable(data) | ||
|
|
||
| if self.instance.hasListener('text'): | ||
| self.instance.writeText(data) | ||
|
dylan-savage marked this conversation as resolved.
|
||
|
|
||
| if self.instance.hasListener('answers'): | ||
| from ai.common.schema import Answer | ||
|
|
||
| answer = Answer() | ||
| answer.setAnswer(data) | ||
| self.instance.writeAnswers(answer) | ||
|
|
||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Helpers | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
|
|
||
| def _to_python_dict(response) -> dict: | ||
| """Convert a C++ IJson/IDict response object to a plain Python dict.""" | ||
| if response is None: | ||
| return {} | ||
| try: | ||
| return IJson.toDict(response) | ||
| except Exception: | ||
| return {} | ||
|
|
||
|
|
||
| def _extract_return_value(result: dict, return_type: str) -> str: | ||
| """Extract the configured return lane value from the sub-pipeline result.""" | ||
| if not isinstance(result, dict): | ||
| return str(result) if result is not None else '' | ||
|
|
||
| value = result.get(return_type) | ||
|
|
||
| # answers is a list — return the first item | ||
| if return_type == 'answers' and isinstance(value, list): | ||
| return str(value[0]) if value else '' | ||
|
|
||
| # documents / table — serialise to JSON string for the agent | ||
| if isinstance(value, (list, dict)): | ||
| return json.dumps(value, ensure_ascii=False) | ||
|
|
||
| return str(value) if value is not None else '' | ||
|
|
||
|
|
||
| def _normalize_tool_input(input_obj): | ||
| """Normalise tool input into a plain dict.""" | ||
| if input_obj is None: | ||
| return {} | ||
| if hasattr(input_obj, 'model_dump') and callable(getattr(input_obj, 'model_dump')): | ||
| input_obj = input_obj.model_dump() | ||
| elif hasattr(input_obj, 'dict') and callable(getattr(input_obj, 'dict')): | ||
| input_obj = input_obj.dict() | ||
| if isinstance(input_obj, str): | ||
| try: | ||
| parsed = json.loads(input_obj) | ||
| if isinstance(parsed, dict): | ||
| input_obj = parsed | ||
| except Exception: | ||
| pass | ||
| if not isinstance(input_obj, dict): | ||
| return {} | ||
| if 'input' in input_obj and isinstance(input_obj['input'], dict): | ||
| inner = input_obj['input'] | ||
| extras = {k: v for k, v in input_obj.items() if k != 'input'} | ||
| input_obj = {**inner, **extras} | ||
| input_obj.pop('security_context', None) | ||
| return input_obj | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| # ============================================================================= | ||
| # RocketRide Engine | ||
| # ============================================================================= | ||
| # MIT License | ||
| # Copyright (c) 2026 Aparavi Software AG | ||
| # | ||
| # Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| # of this software and associated documentation files (the "Software"), to deal | ||
| # in the Software without restriction, including without limitation the rights | ||
| # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| # copies of the Software, and to permit persons to whom the Software is | ||
| # furnished to do so, subject to the following conditions: | ||
| # | ||
| # The above copyright notice and this permission notice shall be included in | ||
| # all copies or substantial portions of the Software. | ||
| # | ||
| # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
| # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
| # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
| # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
| # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
| # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
| # SOFTWARE. | ||
| # ============================================================================= | ||
|
|
||
| from os.path import dirname, join, realpath | ||
| from depends import depends | ||
|
|
||
| requirements = join(dirname(realpath(__file__)), 'requirements.txt') | ||
| depends(requirements) | ||
|
|
||
| from .IGlobal import IGlobal | ||
| from .IInstance import IInstance | ||
|
|
||
| __all__ = ['IGlobal', 'IInstance'] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| { | ||
| "title": "Pipeline Tool", | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would you mind add comments? |
||
| "protocol": "tool_pipe://", | ||
| "classType": ["tool"], | ||
| "capabilities": ["invoke"], | ||
| "register": "filter", | ||
| "node": "python", | ||
| "path": "nodes.tool_pipe", | ||
| "prefix": "tool_pipe", | ||
| "description": ["Exposes an inline pipeline as an agent tool.", "Connect this node's output lanes to any pipeline nodes on the same canvas.", "When an agent calls the tool, the input is routed to every connected output lane.", "End each connected branch with a response node to return results."], | ||
| "icon": "pipetool.svg", | ||
| "tile": ["run_pipe"], | ||
| "lanes": { | ||
| "_source": ["text", "questions", "documents", "table", "answers"] | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
| }, | ||
| "input": [ | ||
| { | ||
| "lane": "_source", | ||
| "output": [{ "lane": "text" }, { "lane": "questions" }, { "lane": "documents" }, { "lane": "table" }, { "lane": "answers" }] | ||
| } | ||
| ], | ||
| "preconfig": { | ||
| "default": "default", | ||
| "profiles": { | ||
| "default": { | ||
| "title": "Pipeline Tool", | ||
| "tool_description": "", | ||
| "return_type": "text" | ||
| } | ||
| } | ||
| }, | ||
| "fields": { | ||
| "tool_pipe.tool_description": { | ||
| "type": "string", | ||
| "title": "Tool Description", | ||
| "description": "Natural-language description the agent uses to decide when to call this tool", | ||
| "default": "" | ||
| }, | ||
| "tool_pipe.return_type": { | ||
| "type": "string", | ||
| "title": "Return Type", | ||
| "description": "Which response lane value to return to the agent", | ||
| "default": "text", | ||
| "enum": ["text", "answers", "documents", "table"] | ||
| } | ||
| }, | ||
| "shape": [ | ||
| { | ||
| "section": "Pipe", | ||
| "title": "Pipeline Tool", | ||
| "properties": ["type", "tool_pipe.tool_description", "tool_pipe.return_type"] | ||
| } | ||
| ] | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.