Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions nodes/src/nodes/chunker/IGlobal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# =============================================================================
# MIT License
# Copyright (c) 2026 Aparavi Software AG
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# =============================================================================

# ------------------------------------------------------------------------------
# This class controls the data shared between all threads for the task
# ------------------------------------------------------------------------------
import os
from rocketlib import IGlobalBase, OPEN_MODE, warning
from ai.common.config import Config

from .chunker_strategies import ChunkingStrategy, RecursiveCharacterChunker, SentenceChunker, TokenChunker


# Strategy name → class mapping
_STRATEGY_MAP = {
'recursive': RecursiveCharacterChunker,
'sentence': SentenceChunker,
'token': TokenChunker,
}


class IGlobal(IGlobalBase):
strategy: ChunkingStrategy | None = None

def validateConfig(self):
"""Validate that tiktoken dependency is available (only needed for token strategy)."""
try:
from depends import depends

requirements = os.path.dirname(os.path.realpath(__file__)) + '/requirements.txt'
depends(requirements)
except Exception as e:
warning(str(e))

def beginGlobal(self):
# Are we in config mode or some other mode?
if self.IEndpoint.endpoint.openMode == OPEN_MODE.CONFIG:
# We are going to get a call to configureService but
# we don't actually need to load the strategy for that
pass
else:
# Get this node's config
config = Config.getNodeConfig(self.glb.logicalType, self.glb.connConfig)

# Read strategy parameters from config
strategy_name = config.get('strategy', 'recursive')
chunk_size = int(config.get('chunk_size', 1000))
chunk_overlap = int(config.get('chunk_overlap', 200))
encoding_name = config.get('encoding_name', 'cl100k_base')

# Build the appropriate strategy
if strategy_name == 'token':
self.strategy = TokenChunker(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
encoding_name=encoding_name,
)
elif strategy_name == 'sentence':
self.strategy = SentenceChunker(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
)
else:
# Default to recursive character chunker
self.strategy = RecursiveCharacterChunker(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
)

def endGlobal(self):
# Release the strategy
self.strategy = None
98 changes: 98 additions & 0 deletions nodes/src/nodes/chunker/IInstance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# =============================================================================
# MIT License
# Copyright (c) 2026 Aparavi Software AG
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# =============================================================================

# ------------------------------------------------------------------------------
# This class controls the data for each thread of the task
# ------------------------------------------------------------------------------
import copy
from typing import List

from rocketlib import IInstanceBase, Entry, debug
from ai.common.schema import Doc, DocMetadata

from .IGlobal import IGlobal


class IInstance(IInstanceBase):
"""Instance that chunks incoming documents and emits one document per chunk."""

IGlobal: IGlobal

chunkId: int = 0

def open(self, obj: Entry):
"""Reset chunk counter for each new object."""
self.chunkId = 0

def writeDocuments(self, documents: List[Doc]):
"""
Chunk each incoming document and emit multiple documents (one per chunk).

Each emitted document gets metadata with chunk_index, parent_id, and
total_chunks so downstream nodes can reconstruct the original document
if needed.
"""
if self.IGlobal.strategy is None:
debug('Chunker strategy not initialized; passing documents through')
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Silent return here drops all documents without any error signal. If strategy failed
to initialize this should be a hard failure:

    raise RuntimeError('Chunker strategy not initialized')

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asclearuc Good catch — you're absolutely right. A silent return here masks a real initialization failure and makes it impossible to debug when documents silently disappear from the pipeline. Replacing this with raise RuntimeError('Chunker strategy not initialized') is the correct approach: if the strategy failed to initialize, the pipeline should fail loudly rather than producing an empty result set that looks like "no documents found." Will fix this in the next push.

return

for document in documents:
# Extract text content
text = document.page_content or ''
if not text.strip():
continue

# Deep copy to avoid mutating the original
base_doc = copy.deepcopy(document)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

base_doc is a deepcopy of document, then on line 84 each chunk_doc is a deepcopy
of base_doc — that is N+1 full copies per document for every call to writeDocuments.
For large document batches this will be a significant performance bottleneck.

Consider avoiding deepcopy entirely: page_content is always replaced (line 85) and
chunkId is always overwritten (line 89/91), so the only field that needs isolation is
metadata. A shallow copy of the doc with an explicit copy of just the metadata object
would be significantly cheaper.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asclearuc Great performance observation. You're right — the double deepcopy (once for base_doc, then again for each chunk_doc) is unnecessarily expensive, especially with large document batches. Since page_content is always replaced on line 85 and chunkId is always overwritten on line 89/91, the only field that actually needs isolation is the metadata dict.

I'll refactor this to use a shallow copy of the document with an explicit copy.copy() (or dict copy) of just the metadata object. This avoids the overhead of recursively copying the entire document structure N+1 times per input document while still ensuring metadata isolation between chunks.


# Get the original object ID for parent tracking
parent_id = ''
if base_doc.metadata is not None:
parent_id = getattr(base_doc.metadata, 'objectId', '') or ''

# Chunk the text
chunks = self.IGlobal.strategy.chunk(text)
total_chunks = len(chunks)

if total_chunks == 0:
continue

# Build output documents
output_docs: List[Doc] = []
for chunk_data in chunks:
chunk_doc = copy.deepcopy(base_doc)
chunk_doc.page_content = chunk_data['text']

# Update metadata
if chunk_doc.metadata is not None:
chunk_doc.metadata.chunkId = self.chunkId
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When metadata already exists, only chunkId is updated. parent_id extracted
on line 72 is silently lost in this branch. The new chunk's metadata has no
reference back to the original document. Consider storing it:

    chunk_doc.metadata.parentId = parent_id

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asclearuc You're right — this is a data integrity issue. The parent_id extracted on line 72 is critical for downstream consumers that need to trace chunks back to their source document (e.g., for citation, deduplication, or context reconstruction). Losing it silently in the existing-metadata branch means any document that already has metadata will have no lineage reference.

Will add chunk_doc.metadata.parentId = parent_id in both branches so every chunk consistently carries a reference to its originating document, regardless of whether the metadata object was freshly created or already existed.

else:
chunk_doc.metadata = DocMetadata(objectId=parent_id, chunkId=self.chunkId)

self.chunkId += 1
output_docs.append(chunk_doc)

# Emit all chunks for this document
if output_docs:
debug(f'Chunker emitting {len(output_docs)} chunks for document (parent_id={parent_id})')
self.instance.writeDocuments(output_docs)
Empty file.
Loading
Loading