-
Notifications
You must be signed in to change notification settings - Fork 47
feat(nodes): add text chunking strategies and hybrid search for RAG pipelines #514
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,92 @@ | ||
| # ============================================================================= | ||
| # MIT License | ||
| # Copyright (c) 2026 Aparavi Software AG | ||
| # | ||
| # Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| # of this software and associated documentation files (the "Software"), to deal | ||
| # in the Software without restriction, including without limitation the rights | ||
| # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| # copies of the Software, and to permit persons to whom the Software is | ||
| # furnished to do so, subject to the following conditions: | ||
| # | ||
| # The above copyright notice and this permission notice shall be included in | ||
| # all copies or substantial portions of the Software. | ||
| # | ||
| # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
| # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
| # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
| # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
| # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
| # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
| # SOFTWARE. | ||
| # ============================================================================= | ||
|
|
||
| # ------------------------------------------------------------------------------ | ||
| # This class controls the data shared between all threads for the task | ||
| # ------------------------------------------------------------------------------ | ||
| import os | ||
| from rocketlib import IGlobalBase, OPEN_MODE, warning | ||
| from ai.common.config import Config | ||
|
|
||
| from .chunker_strategies import ChunkingStrategy, RecursiveCharacterChunker, SentenceChunker, TokenChunker | ||
|
|
||
|
|
||
| # Strategy name → class mapping | ||
| _STRATEGY_MAP = { | ||
| 'recursive': RecursiveCharacterChunker, | ||
| 'sentence': SentenceChunker, | ||
| 'token': TokenChunker, | ||
| } | ||
|
|
||
|
|
||
| class IGlobal(IGlobalBase): | ||
| strategy: ChunkingStrategy | None = None | ||
|
|
||
| def validateConfig(self): | ||
| """Validate that tiktoken dependency is available (only needed for token strategy).""" | ||
| try: | ||
| from depends import depends | ||
|
|
||
| requirements = os.path.dirname(os.path.realpath(__file__)) + '/requirements.txt' | ||
| depends(requirements) | ||
| except Exception as e: | ||
| warning(str(e)) | ||
|
|
||
| def beginGlobal(self): | ||
| # Are we in config mode or some other mode? | ||
| if self.IEndpoint.endpoint.openMode == OPEN_MODE.CONFIG: | ||
| # We are going to get a call to configureService but | ||
| # we don't actually need to load the strategy for that | ||
| pass | ||
| else: | ||
| # Get this node's config | ||
| config = Config.getNodeConfig(self.glb.logicalType, self.glb.connConfig) | ||
|
|
||
| # Read strategy parameters from config | ||
| strategy_name = config.get('strategy', 'recursive') | ||
| chunk_size = int(config.get('chunk_size', 1000)) | ||
| chunk_overlap = int(config.get('chunk_overlap', 200)) | ||
| encoding_name = config.get('encoding_name', 'cl100k_base') | ||
|
|
||
| # Build the appropriate strategy | ||
| if strategy_name == 'token': | ||
| self.strategy = TokenChunker( | ||
| chunk_size=chunk_size, | ||
| chunk_overlap=chunk_overlap, | ||
| encoding_name=encoding_name, | ||
| ) | ||
| elif strategy_name == 'sentence': | ||
| self.strategy = SentenceChunker( | ||
| chunk_size=chunk_size, | ||
| chunk_overlap=chunk_overlap, | ||
| ) | ||
| else: | ||
| # Default to recursive character chunker | ||
| self.strategy = RecursiveCharacterChunker( | ||
| chunk_size=chunk_size, | ||
| chunk_overlap=chunk_overlap, | ||
| ) | ||
|
|
||
| def endGlobal(self): | ||
| # Release the strategy | ||
| self.strategy = None |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,98 @@ | ||
| # ============================================================================= | ||
| # MIT License | ||
| # Copyright (c) 2026 Aparavi Software AG | ||
| # | ||
| # Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| # of this software and associated documentation files (the "Software"), to deal | ||
| # in the Software without restriction, including without limitation the rights | ||
| # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| # copies of the Software, and to permit persons to whom the Software is | ||
| # furnished to do so, subject to the following conditions: | ||
| # | ||
| # The above copyright notice and this permission notice shall be included in | ||
| # all copies or substantial portions of the Software. | ||
| # | ||
| # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
| # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
| # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
| # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
| # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
| # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
| # SOFTWARE. | ||
| # ============================================================================= | ||
|
|
||
| # ------------------------------------------------------------------------------ | ||
| # This class controls the data for each thread of the task | ||
| # ------------------------------------------------------------------------------ | ||
| import copy | ||
| from typing import List | ||
|
|
||
| from rocketlib import IInstanceBase, Entry, debug | ||
| from ai.common.schema import Doc, DocMetadata | ||
|
|
||
| from .IGlobal import IGlobal | ||
|
|
||
|
|
||
| class IInstance(IInstanceBase): | ||
| """Instance that chunks incoming documents and emits one document per chunk.""" | ||
|
|
||
| IGlobal: IGlobal | ||
|
|
||
| chunkId: int = 0 | ||
|
|
||
| def open(self, obj: Entry): | ||
| """Reset chunk counter for each new object.""" | ||
| self.chunkId = 0 | ||
|
|
||
| def writeDocuments(self, documents: List[Doc]): | ||
| """ | ||
| Chunk each incoming document and emit multiple documents (one per chunk). | ||
|
|
||
| Each emitted document gets metadata with chunk_index, parent_id, and | ||
| total_chunks so downstream nodes can reconstruct the original document | ||
| if needed. | ||
| """ | ||
| if self.IGlobal.strategy is None: | ||
| debug('Chunker strategy not initialized; passing documents through') | ||
| return | ||
|
|
||
| for document in documents: | ||
| # Extract text content | ||
| text = document.page_content or '' | ||
| if not text.strip(): | ||
| continue | ||
|
|
||
| # Deep copy to avoid mutating the original | ||
| base_doc = copy.deepcopy(document) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Consider avoiding deepcopy entirely:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @asclearuc Great performance observation. You're right — the double deepcopy (once for I'll refactor this to use a shallow copy of the document with an explicit |
||
|
|
||
| # Get the original object ID for parent tracking | ||
| parent_id = '' | ||
| if base_doc.metadata is not None: | ||
| parent_id = getattr(base_doc.metadata, 'objectId', '') or '' | ||
|
|
||
| # Chunk the text | ||
| chunks = self.IGlobal.strategy.chunk(text) | ||
| total_chunks = len(chunks) | ||
|
|
||
| if total_chunks == 0: | ||
| continue | ||
|
|
||
| # Build output documents | ||
| output_docs: List[Doc] = [] | ||
| for chunk_data in chunks: | ||
| chunk_doc = copy.deepcopy(base_doc) | ||
| chunk_doc.page_content = chunk_data['text'] | ||
|
|
||
| # Update metadata | ||
| if chunk_doc.metadata is not None: | ||
| chunk_doc.metadata.chunkId = self.chunkId | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When metadata already exists, only chunk_doc.metadata.parentId = parent_id
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @asclearuc You're right — this is a data integrity issue. The Will add |
||
| else: | ||
| chunk_doc.metadata = DocMetadata(objectId=parent_id, chunkId=self.chunkId) | ||
|
|
||
| self.chunkId += 1 | ||
| output_docs.append(chunk_doc) | ||
|
|
||
| # Emit all chunks for this document | ||
| if output_docs: | ||
| debug(f'Chunker emitting {len(output_docs)} chunks for document (parent_id={parent_id})') | ||
| self.instance.writeDocuments(output_docs) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Silent return here drops all documents without any error signal. If strategy failed
to initialize this should be a hard failure:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@asclearuc Good catch — you're absolutely right. A silent
returnhere masks a real initialization failure and makes it impossible to debug when documents silently disappear from the pipeline. Replacing this withraise RuntimeError('Chunker strategy not initialized')is the correct approach: if the strategy failed to initialize, the pipeline should fail loudly rather than producing an empty result set that looks like "no documents found." Will fix this in the next push.