Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions nodes/src/nodes/chunker/IGlobal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# =============================================================================
# MIT License
# Copyright (c) 2026 Aparavi Software AG
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# =============================================================================

# ------------------------------------------------------------------------------
# This class controls the data shared between all threads for the task
# ------------------------------------------------------------------------------
import os
from rocketlib import IGlobalBase, OPEN_MODE, warning
from ai.common.config import Config

from .chunker_strategies import ChunkingStrategy, RecursiveCharacterChunker, SentenceChunker, TokenChunker


class IGlobal(IGlobalBase):
strategy: ChunkingStrategy | None = None

def validateConfig(self):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Match the engine's validateConfig signature.

The lifecycle hook is validateConfig(self, syntaxOnly: bool). Overriding it as def validateConfig(self) means the first framework call with syntaxOnly will raise TypeError before validation runs.

🔧 Suggested fix
-    def validateConfig(self):
+    def validateConfig(self, syntaxOnly: bool) -> None:
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nodes/src/nodes/chunker/IGlobal.py` at line 37, The override of
validateConfig currently uses def validateConfig(self) but must match the engine
signature validateConfig(self, syntaxOnly: bool); update the
IGlobal.validateConfig method to accept the syntaxOnly: bool parameter, preserve
existing logic, and if the method calls the parent implementation or other
helpers, pass the syntaxOnly flag through (e.g., when invoking
super().validateConfig or other validators) so calling the framework with a
syntaxOnly argument no longer raises a TypeError.

"""Validate that tiktoken dependency is available (only needed for token strategy)."""
try:
config = Config.getNodeConfig(self.glb.logicalType, self.glb.connConfig)
strategy_name = config.get('strategy', 'recursive')
except Exception: # noqa: BLE001
# If config isn't available yet, install proactively
strategy_name = 'token'

if strategy_name == 'token':
try:
from depends import depends

requirements = os.path.dirname(os.path.realpath(__file__)) + '/requirements.txt'
depends(requirements)
except Exception as e: # noqa: BLE001 - intentional broad catch for dependency probing
warning(str(e))

def beginGlobal(self):
"""Initialize the configured chunking strategy for runtime execution."""
# Are we in config mode or some other mode?
if self.IEndpoint.endpoint.openMode == OPEN_MODE.CONFIG:
# We are going to get a call to configureService but
# we don't actually need to load the strategy for that
pass
else:
# Get this node's config
config = Config.getNodeConfig(self.glb.logicalType, self.glb.connConfig)

# Read strategy parameters from config
strategy_name = config.get('strategy', 'recursive')
chunk_size = int(config.get('chunk_size', 1000))
chunk_overlap = int(config.get('chunk_overlap', 200))
encoding_name = config.get('encoding_name', 'cl100k_base')

if chunk_size <= 0:
raise ValueError(f'chunk_size must be positive, got {chunk_size}')
if chunk_overlap < 0:
raise ValueError(f'chunk_overlap must be non-negative, got {chunk_overlap}')
if chunk_overlap >= chunk_size:
raise ValueError(f'chunk_overlap ({chunk_overlap}) must be less than chunk_size ({chunk_size})')

# Build the appropriate strategy
if strategy_name == 'token':
self.strategy = TokenChunker(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
encoding_name=encoding_name,
)
elif strategy_name == 'sentence':
self.strategy = SentenceChunker(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
)
else:
# Default to recursive character chunker
self.strategy = RecursiveCharacterChunker(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
)

def endGlobal(self):
"""Release the configured chunking strategy."""
# Release the strategy
self.strategy = None
Comment on lines +55 to +101
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add PEP 257 docstrings to the lifecycle hooks.

beginGlobal() and endGlobal() are public methods in a nodes/**/*.py module, but they currently have no docstrings.

📝 Suggested change
     def beginGlobal(self):
+        """Initialize the configured chunking strategy for runtime execution."""
         # Are we in config mode or some other mode?
         if self.IEndpoint.endpoint.openMode == OPEN_MODE.CONFIG:
             # We are going to get a call to configureService but
             # we don't actually need to load the strategy for that
             pass
@@
     def endGlobal(self):
+        """Release the configured chunking strategy."""
         # Release the strategy
         self.strategy = None

As per coding guidelines, nodes/**/*.py: Python pipeline nodes: use single quotes, ruff for linting/formatting, PEP 257 docstrings, target Python 3.10+.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def beginGlobal(self):
# Are we in config mode or some other mode?
if self.IEndpoint.endpoint.openMode == OPEN_MODE.CONFIG:
# We are going to get a call to configureService but
# we don't actually need to load the strategy for that
pass
else:
# Get this node's config
config = Config.getNodeConfig(self.glb.logicalType, self.glb.connConfig)
# Read strategy parameters from config
strategy_name = config.get('strategy', 'recursive')
chunk_size = int(config.get('chunk_size', 1000))
chunk_overlap = int(config.get('chunk_overlap', 200))
encoding_name = config.get('encoding_name', 'cl100k_base')
if chunk_size <= 0:
raise ValueError(f'chunk_size must be positive, got {chunk_size}')
if chunk_overlap < 0:
raise ValueError(f'chunk_overlap must be non-negative, got {chunk_overlap}')
if chunk_overlap >= chunk_size:
raise ValueError(f'chunk_overlap ({chunk_overlap}) must be less than chunk_size ({chunk_size})')
# Build the appropriate strategy
if strategy_name == 'token':
self.strategy = TokenChunker(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
encoding_name=encoding_name,
)
elif strategy_name == 'sentence':
self.strategy = SentenceChunker(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
)
else:
# Default to recursive character chunker
self.strategy = RecursiveCharacterChunker(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
)
def endGlobal(self):
# Release the strategy
self.strategy = None
def beginGlobal(self):
"""Initialize the configured chunking strategy for runtime execution."""
# Are we in config mode or some other mode?
if self.IEndpoint.endpoint.openMode == OPEN_MODE.CONFIG:
# We are going to get a call to configureService but
# we don't actually need to load the strategy for that
pass
else:
# Get this node's config
config = Config.getNodeConfig(self.glb.logicalType, self.glb.connConfig)
# Read strategy parameters from config
strategy_name = config.get('strategy', 'recursive')
chunk_size = int(config.get('chunk_size', 1000))
chunk_overlap = int(config.get('chunk_overlap', 200))
encoding_name = config.get('encoding_name', 'cl100k_base')
if chunk_size <= 0:
raise ValueError(f'chunk_size must be positive, got {chunk_size}')
if chunk_overlap < 0:
raise ValueError(f'chunk_overlap must be non-negative, got {chunk_overlap}')
if chunk_overlap >= chunk_size:
raise ValueError(f'chunk_overlap ({chunk_overlap}) must be less than chunk_size ({chunk_size})')
# Build the appropriate strategy
if strategy_name == 'token':
self.strategy = TokenChunker(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
encoding_name=encoding_name,
)
elif strategy_name == 'sentence':
self.strategy = SentenceChunker(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
)
else:
# Default to recursive character chunker
self.strategy = RecursiveCharacterChunker(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
)
def endGlobal(self):
"""Release the configured chunking strategy."""
# Release the strategy
self.strategy = None
🧰 Tools
🪛 Ruff (0.15.9)

[warning] 64-64: Avoid specifying long messages outside the exception class

(TRY003)


[warning] 66-66: Avoid specifying long messages outside the exception class

(TRY003)


[warning] 68-68: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nodes/src/nodes/chunker/IGlobal.py` around lines 47 - 91, The lifecycle hooks
beginGlobal and endGlobal are missing PEP 257 docstrings; add concise
triple-single-quoted ('''...''') docstrings to both methods: beginGlobal should
have a one-line summary plus a short second line describing that it loads node
config and initializes the chunking strategy (mention the strategy selection:
'token', 'sentence', default 'recursive') and note it raises ValueError for bad
chunk sizes; endGlobal should have a one-line summary saying it releases/cleans
up the strategy (sets self.strategy to None). Use single quotes for the
docstrings to match project style and ensure ruff/formatting passes for Python
3.10+.

100 changes: 100 additions & 0 deletions nodes/src/nodes/chunker/IInstance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# =============================================================================
# MIT License
# Copyright (c) 2026 Aparavi Software AG
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# =============================================================================

# ------------------------------------------------------------------------------
# This class controls the data for each thread of the task
# ------------------------------------------------------------------------------
import copy

from rocketlib import IInstanceBase, Entry, debug
from ai.common.schema import Doc, DocMetadata

from .IGlobal import IGlobal


class IInstance(IInstanceBase):
"""Instance that chunks incoming documents and emits one document per chunk."""

IGlobal: IGlobal

chunkId: int = 0

def open(self, obj: Entry):
"""Reset chunk counter for each new object."""
self.chunkId = 0
Comment on lines +40 to +44
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use a per-document chunk index instead of the entry-scoped counter.

chunk_data['metadata'] already contains the chunk's index within its source document, but this code discards it and increments self.chunkId across the whole entry. If writeDocuments() receives multiple documents, the first chunk of the second document will no longer be 0, and the strategy's start_char/end_char metadata is lost before emission.

🔧 Suggested change
-            for chunk_data in chunks:
+            for chunk_idx, chunk_data in enumerate(chunks):
                 # Shallow copy of document, explicit copy of metadata only
                 chunk_doc = copy.copy(document)
                 if document.metadata is not None:
                     chunk_doc.metadata = copy.copy(document.metadata)
                 else:
@@
-                chunk_doc.metadata.chunkId = self.chunkId
+                chunk_doc.metadata.chunkId = chunk_idx
                 chunk_doc.metadata.parentId = parent_id
+                chunk_doc.metadata.startChar = chunk_data['metadata']['start_char']
+                chunk_doc.metadata.endChar = chunk_data['metadata']['end_char']
+                chunk_doc.metadata.totalChunks = total_chunks
 
-                self.chunkId += 1
                 output_docs.append(chunk_doc)

Also applies to: 76-92

🧰 Tools
🪛 Ruff (0.15.7)

[warning] 43-43: Unused method argument: obj

(ARG002)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nodes/src/nodes/chunker/IInstance.py` around lines 41 - 45, The code uses a
single entry-scoped counter self.chunkId (defined on IInstance and reset in
open) and overwrites per-chunk metadata, losing per-document chunk indices and
start_char/end_char; update the logic so emitted chunk metadata uses the
per-document index from chunk_data['metadata'] (e.g., metadata.get('chunk') or
metadata['chunk']) instead of incrementing self.chunkId, or if you prefer an
explicit per-document counter, initialize/reset a document-scoped counter in
IInstance.open and use that only for chunks of the current document; adjust the
emission code (the block referenced around lines 76-92) to read and preserve
metadata['start_char'] and metadata['end_char'] and to reference either
metadata['chunk'] or the reset per-document counter rather than self.chunkId.


def writeDocuments(self, documents: list[Doc]):
"""
Chunk each incoming document and emit multiple documents (one per chunk).

Each emitted document gets metadata with chunkId, parentId, chunk_index,
start_char, end_char, and total_chunks so downstream nodes can
reconstruct the original document if needed.
"""
if self.IGlobal.strategy is None:
raise RuntimeError('Chunker strategy not initialized')

for document in documents:
# Extract text content
text = document.page_content or ''
if not text.strip():
continue

# Get the original object ID for parent tracking
parent_id = ''
if document.metadata is not None:
parent_id = getattr(document.metadata, 'objectId', '') or ''

# Chunk the text
chunks = self.IGlobal.strategy.chunk(text)
total_chunks = len(chunks)

if total_chunks == 0:
continue

# Build output documents
output_docs: list[Doc] = []
for chunk_data in chunks:
# Shallow copy of document, explicit copy of metadata only
chunk_doc = copy.copy(document)
chunk_doc.metadata = copy.copy(document.metadata) if document.metadata else DocMetadata()
chunk_doc.page_content = chunk_data['text']

# Update metadata (always non-None after the copy/create above)
chunk_doc.metadata.chunkId = self.chunkId
chunk_doc.metadata.parentId = parent_id

# Propagate strategy metadata (chunk_index, start_char, end_char)
strategy_meta = chunk_data.get('metadata', {})
chunk_doc.metadata.chunk_index = strategy_meta.get('chunk_index', 0)
chunk_doc.metadata.start_char = strategy_meta.get('start_char', 0)
chunk_doc.metadata.end_char = strategy_meta.get('end_char', 0)
chunk_doc.metadata.total_chunks = total_chunks

self.chunkId += 1
output_docs.append(chunk_doc)

# Emit all chunks for this document
if output_docs:
debug(f'Chunker emitting {len(output_docs)} chunks for document (parent_id={parent_id})')
self.instance.writeDocuments(output_docs)
Empty file.
Loading
Loading