From c96e9725e1abe3cc3ecdd2c61b5ea521a58ca677 Mon Sep 17 00:00:00 2001 From: Pradeepto Bhattacharya Date: Sat, 7 Mar 2026 15:29:06 +0000 Subject: [PATCH 1/4] Add Docling chunking strategies and skip document processing for Docling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce two new chunking strategies — doclingHierarchicalChunker and doclingHybridChunker — that use the Docling Serve API to chunk documents directly from raw files via POST /v1/chunk/{type}/source. Unlike LangChain strategies which chunk pre-converted markdown, Docling's chunk endpoint performs document conversion and chunking in a single call. This means document processing (DocumentProcessor CR) is unnecessary when using Docling chunking strategies. Changes: - Add Docling chunking client (pkg/docling/chunk.go) with synchronous ChunkFile() method, request/response types, and 5-minute HTTP timeout - Add ChunkingStrategyDoclingHierarchical and ChunkingStrategyDoclingHybrid strategy constants with their config structs - Skip DocumentProcessor CR creation in UnstructuredDataProduct controller when a Docling chunking strategy is selected; trigger ChunksGenerator directly instead - Add raw file processing path in ChunksGenerator controller so it can iterate over raw files (not converted files) for Docling strategies - Fix nil-safety in ChunksFileMetadata.Equal() to handle missing ConvertedFileMetadata when document processing is skipped - Add unit tests for Docling chunking client, IsDoclingChunkingStrategy, and nil-safe metadata comparison --- api/v1alpha1/unstructureddataproduct_types.go | 28 +- .../unstructureddataproduct_types_test.go | 67 ++++ api/v1alpha1/zz_generated.deepcopy.go | 42 +++ ...dataverse.redhat.com_chunksgenerators.yaml | 14 + ...e.redhat.com_unstructureddataproducts.yaml | 14 + .../controller/chunksgenerator_controller.go | 230 ++++++++++++- .../unstructureddataproduct_controller.go | 96 +++--- pkg/docling/chunk.go | 175 ++++++++++ pkg/docling/chunk_test.go | 314 ++++++++++++++++++ pkg/unstructured/chunks_file.go | 7 +- pkg/unstructured/chunks_file_test.go | 147 ++++++++ 11 files changed, 1076 insertions(+), 58 deletions(-) create mode 100644 api/v1alpha1/unstructureddataproduct_types_test.go create mode 100644 pkg/docling/chunk.go create mode 100644 pkg/docling/chunk_test.go create mode 100644 pkg/unstructured/chunks_file_test.go diff --git a/api/v1alpha1/unstructureddataproduct_types.go b/api/v1alpha1/unstructureddataproduct_types.go index 6e526689..04c62707 100644 --- a/api/v1alpha1/unstructureddataproduct_types.go +++ b/api/v1alpha1/unstructureddataproduct_types.go @@ -27,15 +27,21 @@ type ( ) const ( - SourceTypeS3 UnstructuredDataSourceType = "s3" - DestinationTypeInternalStage UnstructuredDataDestinationType = "snowflakeInternalStage" - ChunkingStrategyRecursiveCharacter ChunkingStrategy = "recursiveCharacterTextSplitter" - ChunkingStrategyMarkdown ChunkingStrategy = "markdownTextSplitter" - ChunkingStrategyToken ChunkingStrategy = "tokenTextSplitter" + SourceTypeS3 UnstructuredDataSourceType = "s3" + DestinationTypeInternalStage UnstructuredDataDestinationType = "snowflakeInternalStage" + ChunkingStrategyRecursiveCharacter ChunkingStrategy = "recursiveCharacterTextSplitter" + ChunkingStrategyMarkdown ChunkingStrategy = "markdownTextSplitter" + ChunkingStrategyToken ChunkingStrategy = "tokenTextSplitter" + ChunkingStrategyDoclingHierarchical ChunkingStrategy = "doclingHierarchicalChunker" + ChunkingStrategyDoclingHybrid ChunkingStrategy = "doclingHybridChunker" UnstructuredDataProductCondition = "UnstructuredDataProductReady" ) +func IsDoclingChunkingStrategy(strategy ChunkingStrategy) bool { + return strategy == ChunkingStrategyDoclingHierarchical || strategy == ChunkingStrategyDoclingHybrid +} + type DocumentProcessorConfig struct { Type string `json:"type,omitempty"` DoclingConfig DoclingConfig `json:"doclingConfig,omitempty"` @@ -59,6 +65,8 @@ type ChunksGeneratorConfig struct { RecursiveCharacterSplitterConfig RecursiveCharacterSplitterConfig `json:"recursiveCharacterSplitterConfig,omitempty"` MarkdownSplitterConfig MarkdownSplitterConfig `json:"markdownSplitterConfig,omitempty"` TokenSplitterConfig TokenSplitterConfig `json:"tokenSplitterConfig,omitempty"` + DoclingHierarchicalChunkerConfig DoclingHierarchicalChunkerConfig `json:"doclingHierarchicalChunkerConfig,omitempty"` + DoclingHybridChunkerConfig DoclingHybridChunkerConfig `json:"doclingHybridChunkerConfig,omitempty"` } type RecursiveCharacterSplitterConfig struct { @@ -86,6 +94,16 @@ type TokenSplitterConfig struct { DisallowedSpecial []string `json:"disallowedSpecial,omitempty"` } +type DoclingHierarchicalChunkerConfig struct { + MergeListItems *bool `json:"mergeListItems,omitempty"` +} + +type DoclingHybridChunkerConfig struct { + Tokenizer string `json:"tokenizer,omitempty"` + MaxTokens int `json:"maxTokens,omitempty"` + MergePeers *bool `json:"mergePeers,omitempty"` +} + // UnstructuredDataProductSpec defines the desired state of UnstructuredDataProduct type UnstructuredDataProductSpec struct { SourceConfig SourceConfig `json:"sourceConfig,omitempty"` diff --git a/api/v1alpha1/unstructureddataproduct_types_test.go b/api/v1alpha1/unstructureddataproduct_types_test.go new file mode 100644 index 00000000..7ba7898b --- /dev/null +++ b/api/v1alpha1/unstructureddataproduct_types_test.go @@ -0,0 +1,67 @@ +/* +Copyright 2026. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import "testing" + +func TestIsDoclingChunkingStrategy(t *testing.T) { + tests := []struct { + name string + strategy ChunkingStrategy + want bool + }{ + { + name: "docling hierarchical", + strategy: ChunkingStrategyDoclingHierarchical, + want: true, + }, + { + name: "docling hybrid", + strategy: ChunkingStrategyDoclingHybrid, + want: true, + }, + { + name: "recursive character", + strategy: ChunkingStrategyRecursiveCharacter, + want: false, + }, + { + name: "markdown", + strategy: ChunkingStrategyMarkdown, + want: false, + }, + { + name: "token", + strategy: ChunkingStrategyToken, + want: false, + }, + { + name: "empty string", + strategy: "", + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := IsDoclingChunkingStrategy(tt.strategy) + if got != tt.want { + t.Errorf("IsDoclingChunkingStrategy(%q) = %v, want %v", tt.strategy, got, tt.want) + } + }) + } +} diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 91cf9575..b29ce2be 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -58,6 +58,8 @@ func (in *ChunksGeneratorConfig) DeepCopyInto(out *ChunksGeneratorConfig) { in.RecursiveCharacterSplitterConfig.DeepCopyInto(&out.RecursiveCharacterSplitterConfig) out.MarkdownSplitterConfig = in.MarkdownSplitterConfig in.TokenSplitterConfig.DeepCopyInto(&out.TokenSplitterConfig) + in.DoclingHierarchicalChunkerConfig.DeepCopyInto(&out.DoclingHierarchicalChunkerConfig) + in.DoclingHybridChunkerConfig.DeepCopyInto(&out.DoclingHybridChunkerConfig) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ChunksGeneratorConfig. @@ -284,6 +286,46 @@ func (in *DoclingConfig) DeepCopy() *DoclingConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DoclingHierarchicalChunkerConfig) DeepCopyInto(out *DoclingHierarchicalChunkerConfig) { + *out = *in + if in.MergeListItems != nil { + in, out := &in.MergeListItems, &out.MergeListItems + *out = new(bool) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DoclingHierarchicalChunkerConfig. +func (in *DoclingHierarchicalChunkerConfig) DeepCopy() *DoclingHierarchicalChunkerConfig { + if in == nil { + return nil + } + out := new(DoclingHierarchicalChunkerConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DoclingHybridChunkerConfig) DeepCopyInto(out *DoclingHybridChunkerConfig) { + *out = *in + if in.MergePeers != nil { + in, out := &in.MergePeers, &out.MergePeers + *out = new(bool) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DoclingHybridChunkerConfig. +func (in *DoclingHybridChunkerConfig) DeepCopy() *DoclingHybridChunkerConfig { + if in == nil { + return nil + } + out := new(DoclingHybridChunkerConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DocumentProcessor) DeepCopyInto(out *DocumentProcessor) { *out = *in diff --git a/config/crd/bases/operator.dataverse.redhat.com_chunksgenerators.yaml b/config/crd/bases/operator.dataverse.redhat.com_chunksgenerators.yaml index 333c825d..79f45427 100644 --- a/config/crd/bases/operator.dataverse.redhat.com_chunksgenerators.yaml +++ b/config/crd/bases/operator.dataverse.redhat.com_chunksgenerators.yaml @@ -48,6 +48,20 @@ spec: properties: config: properties: + doclingHierarchicalChunkerConfig: + properties: + mergeListItems: + type: boolean + type: object + doclingHybridChunkerConfig: + properties: + maxTokens: + type: integer + mergePeers: + type: boolean + tokenizer: + type: string + type: object markdownSplitterConfig: properties: chunkOverlap: diff --git a/config/crd/bases/operator.dataverse.redhat.com_unstructureddataproducts.yaml b/config/crd/bases/operator.dataverse.redhat.com_unstructureddataproducts.yaml index a6d8f803..47a37ab7 100644 --- a/config/crd/bases/operator.dataverse.redhat.com_unstructureddataproducts.yaml +++ b/config/crd/bases/operator.dataverse.redhat.com_unstructureddataproducts.yaml @@ -50,6 +50,20 @@ spec: properties: chunksGeneratorConfig: properties: + doclingHierarchicalChunkerConfig: + properties: + mergeListItems: + type: boolean + type: object + doclingHybridChunkerConfig: + properties: + maxTokens: + type: integer + mergePeers: + type: boolean + tokenizer: + type: string + type: object markdownSplitterConfig: properties: chunkOverlap: diff --git a/internal/controller/chunksgenerator_controller.go b/internal/controller/chunksgenerator_controller.go index 24337b45..bc6ffb5f 100644 --- a/internal/controller/chunksgenerator_controller.go +++ b/internal/controller/chunksgenerator_controller.go @@ -32,6 +32,7 @@ import ( operatorv1alpha1 "github.com/redhat-data-and-ai/unstructured-data-controller/api/v1alpha1" "github.com/redhat-data-and-ai/unstructured-data-controller/internal/controller/controllerutils" + "github.com/redhat-data-and-ai/unstructured-data-controller/pkg/docling" "github.com/redhat-data-and-ai/unstructured-data-controller/pkg/filestore" "github.com/redhat-data-and-ai/unstructured-data-controller/pkg/langchain" "github.com/redhat-data-and-ai/unstructured-data-controller/pkg/unstructured" @@ -130,22 +131,39 @@ func (r *ChunksGeneratorReconciler) Reconcile(ctx context.Context, req ctrl.Requ chunkingErrors := []error{} skippedFiles := []string{} - convertedFilePaths := unstructured.FilterConvertedFilePaths(filePaths) - - for _, convertedFilePath := range convertedFilePaths { - logger.Info("processing converted file", "file", convertedFilePath) - - if err := r.processConvertedFile(ctx, convertedFilePath, chunksGeneratorCR); err != nil { - if strings.Contains(err.Error(), langchain.SemaphoreAcquireError) { - logger.Error(err, "failed to process converted file, semaphore acquire error, will try again later", "file", convertedFilePath) - skippedFiles = append(skippedFiles, convertedFilePath) - continue + isDoclingChunking := operatorv1alpha1.IsDoclingChunkingStrategy(chunksGeneratorCR.Spec.ChunksGeneratorConfig.Strategy) + + if isDoclingChunking { + // For Docling chunking: iterate over raw files directly (no converted files needed) + rawFilePaths := unstructured.FilterRawFilePaths(filePaths) + for _, rawFilePath := range rawFilePaths { + logger.Info("processing raw file for Docling chunking", "file", rawFilePath) + if err := r.processRawFile(ctx, rawFilePath, chunksGeneratorCR); err != nil { + if strings.Contains(err.Error(), docling.SemaphoreAcquireError) { + logger.Error(err, "semaphore acquire error, will try again later", "file", rawFilePath) + skippedFiles = append(skippedFiles, rawFilePath) + continue + } + chunkingErrors = append(chunkingErrors, err) + logger.Error(err, "failed to process raw file", "file", rawFilePath) + } + } + } else { + // For LangChain chunking: iterate over converted files (existing behavior) + convertedFilePaths := unstructured.FilterConvertedFilePaths(filePaths) + for _, convertedFilePath := range convertedFilePaths { + logger.Info("processing converted file", "file", convertedFilePath) + if err := r.processConvertedFile(ctx, convertedFilePath, chunksGeneratorCR); err != nil { + if strings.Contains(err.Error(), langchain.SemaphoreAcquireError) || + strings.Contains(err.Error(), docling.SemaphoreAcquireError) { + logger.Error(err, "semaphore acquire error, will try again later", "file", convertedFilePath) + skippedFiles = append(skippedFiles, convertedFilePath) + continue + } + chunkingErrors = append(chunkingErrors, err) + logger.Error(err, "failed to process converted file", "file", convertedFilePath) } - - chunkingErrors = append(chunkingErrors, err) - logger.Error(err, "failed to process converted file", "file", convertedFilePath) } - logger.Info("successfully processed converted file", "file", convertedFilePath) } if len(chunkingErrors) > 0 { @@ -259,9 +277,17 @@ func (r *ChunksGeneratorReconciler) needsChunking(ctx context.Context, converted } // now the chunks file should be the same as the current chunks file in filestore + var chunkingTool unstructured.ChunkingTool + switch chunksGeneratorCR.Spec.ChunksGeneratorConfig.Strategy { + case operatorv1alpha1.ChunkingStrategyDoclingHierarchical, operatorv1alpha1.ChunkingStrategyDoclingHybrid: + chunkingTool = unstructured.DoclingChunkingTool + default: + chunkingTool = unstructured.LangchainChunkingTool + } + newChunksFileMetadata := unstructured.ChunksFileMetadata{ ConvertedFileMetadata: &convertedFileMetadata, - ChunkingTool: unstructured.LangchainChunkingTool, + ChunkingTool: chunkingTool, ChunksGeneratorConfig: chunksGeneratorCR.Spec.ChunksGeneratorConfig, } if !chunksFile.ChunksDocument.Metadata.Equal(&newChunksFileMetadata) { @@ -324,6 +350,60 @@ func (r *ChunksGeneratorReconciler) chunkFile(ctx context.Context, convertedFile DisallowedSpecial: chunksGeneratorCR.Spec.ChunksGeneratorConfig.TokenSplitterConfig.DisallowedSpecial, }, } + case operatorv1alpha1.ChunkingStrategyDoclingHierarchical: + rawFilePath := unstructured.GetRawFilePathFromConvertedFilePath(convertedFilePath) + fileURL, err := r.fileStore.GetFileURL(ctx, rawFilePath) + if err != nil { + return nil, fmt.Errorf("failed to get presigned URL for raw file: %w", err) + } + chunkingOptions := &docling.HierarchicalChunkingOptions{ + MergeListItems: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHierarchicalChunkerConfig.MergeListItems, + } + chunks, err := doclingClient.ChunkFile(ctx, fileURL, docling.ChunkTypeHierarchical, chunkingOptions) + if err != nil { + return nil, err + } + return &unstructured.ChunksFile{ + ConvertedDocument: convertedFile.ConvertedDocument, + ChunksDocument: &unstructured.ChunksDocument{ + Metadata: &unstructured.ChunksFileMetadata{ + ChunkingTool: unstructured.DoclingChunkingTool, + ChunksGeneratorConfig: chunksGeneratorCR.Spec.ChunksGeneratorConfig, + ConvertedFileMetadata: convertedFile.ConvertedDocument.Metadata, + }, + Chunks: &unstructured.Chunks{ + Text: chunks, + }, + }, + }, nil + case operatorv1alpha1.ChunkingStrategyDoclingHybrid: + rawFilePath := unstructured.GetRawFilePathFromConvertedFilePath(convertedFilePath) + fileURL, err := r.fileStore.GetFileURL(ctx, rawFilePath) + if err != nil { + return nil, fmt.Errorf("failed to get presigned URL for raw file: %w", err) + } + chunkingOptions := &docling.HybridChunkingOptions{ + Tokenizer: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHybridChunkerConfig.Tokenizer, + MaxTokens: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHybridChunkerConfig.MaxTokens, + MergePeers: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHybridChunkerConfig.MergePeers, + } + chunks, err := doclingClient.ChunkFile(ctx, fileURL, docling.ChunkTypeHybrid, chunkingOptions) + if err != nil { + return nil, err + } + return &unstructured.ChunksFile{ + ConvertedDocument: convertedFile.ConvertedDocument, + ChunksDocument: &unstructured.ChunksDocument{ + Metadata: &unstructured.ChunksFileMetadata{ + ChunkingTool: unstructured.DoclingChunkingTool, + ChunksGeneratorConfig: chunksGeneratorCR.Spec.ChunksGeneratorConfig, + ConvertedFileMetadata: convertedFile.ConvertedDocument.Metadata, + }, + Chunks: &unstructured.Chunks{ + Text: chunks, + }, + }, + }, nil default: return nil, fmt.Errorf("invalid strategy: %s", chunksGeneratorCR.Spec.ChunksGeneratorConfig.Strategy) } @@ -348,6 +428,126 @@ func (r *ChunksGeneratorReconciler) chunkFile(ctx context.Context, convertedFile }, nil } +func (r *ChunksGeneratorReconciler) processRawFile(ctx context.Context, rawFilePath string, chunksGeneratorCR *operatorv1alpha1.ChunksGenerator) error { + logger := log.FromContext(ctx) + logger.Info("processing raw file for Docling chunking", "file", rawFilePath) + + chunksFilePath := unstructured.GetChunksFilePath(rawFilePath) + + needsChunking, err := r.needsChunkingForRawFile(ctx, rawFilePath, chunksGeneratorCR) + if err != nil { + logger.Error(err, "failed to check if file needs chunking") + return err + } + if !needsChunking { + logger.Info("file is already chunked, skipping ...") + return nil + } + + chunksFile, err := r.chunkRawFile(ctx, rawFilePath, chunksGeneratorCR) + if err != nil { + logger.Error(err, "failed to chunk file") + return err + } + + chunksFileBytes, err := json.Marshal(chunksFile) + if err != nil { + logger.Error(err, "failed to marshal chunks file") + return err + } + if err := r.fileStore.Store(ctx, chunksFilePath, chunksFileBytes); err != nil { + logger.Error(err, "failed to store chunks file") + return err + } + return nil +} + +func (r *ChunksGeneratorReconciler) needsChunkingForRawFile(ctx context.Context, rawFilePath string, chunksGeneratorCR *operatorv1alpha1.ChunksGenerator) (bool, error) { + logger := log.FromContext(ctx) + logger.Info("checking if raw file needs chunking", "file", rawFilePath) + + chunksFilePath := unstructured.GetChunksFilePath(rawFilePath) + + chunksFileExists, err := r.fileStore.Exists(ctx, chunksFilePath) + if err != nil { + return false, err + } + if !chunksFileExists { + return true, nil + } + + chunksFileRaw, err := r.fileStore.Retrieve(ctx, chunksFilePath) + if err != nil { + return false, err + } + + chunksFile := unstructured.ChunksFile{} + if err = json.Unmarshal(chunksFileRaw, &chunksFile); err != nil { + return false, err + } + + newChunksFileMetadata := unstructured.ChunksFileMetadata{ + ConvertedFileMetadata: nil, + ChunkingTool: unstructured.DoclingChunkingTool, + ChunksGeneratorConfig: chunksGeneratorCR.Spec.ChunksGeneratorConfig, + } + if !chunksFile.ChunksDocument.Metadata.Equal(&newChunksFileMetadata) { + logger.Info("chunks file configuration differs, re-chunking needed", "file", rawFilePath) + return true, nil + } + + return false, nil +} + +func (r *ChunksGeneratorReconciler) chunkRawFile(ctx context.Context, rawFilePath string, chunksGeneratorCR *operatorv1alpha1.ChunksGenerator) (*unstructured.ChunksFile, error) { + logger := log.FromContext(ctx) + logger.Info("chunking raw file via Docling", "file", rawFilePath) + + fileURL, err := r.fileStore.GetFileURL(ctx, rawFilePath) + if err != nil { + return nil, fmt.Errorf("failed to get presigned URL for raw file: %w", err) + } + + var chunkType docling.ChunkType + var chunkingOptions any + + switch chunksGeneratorCR.Spec.ChunksGeneratorConfig.Strategy { + case operatorv1alpha1.ChunkingStrategyDoclingHierarchical: + chunkType = docling.ChunkTypeHierarchical + chunkingOptions = &docling.HierarchicalChunkingOptions{ + MergeListItems: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHierarchicalChunkerConfig.MergeListItems, + } + case operatorv1alpha1.ChunkingStrategyDoclingHybrid: + chunkType = docling.ChunkTypeHybrid + chunkingOptions = &docling.HybridChunkingOptions{ + Tokenizer: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHybridChunkerConfig.Tokenizer, + MaxTokens: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHybridChunkerConfig.MaxTokens, + MergePeers: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHybridChunkerConfig.MergePeers, + } + default: + return nil, fmt.Errorf("chunkRawFile called with non-Docling strategy: %s", chunksGeneratorCR.Spec.ChunksGeneratorConfig.Strategy) + } + + chunks, err := doclingClient.ChunkFile(ctx, fileURL, chunkType, chunkingOptions) + if err != nil { + return nil, err + } + + return &unstructured.ChunksFile{ + ConvertedDocument: nil, + ChunksDocument: &unstructured.ChunksDocument{ + Metadata: &unstructured.ChunksFileMetadata{ + ChunkingTool: unstructured.DoclingChunkingTool, + ChunksGeneratorConfig: chunksGeneratorCR.Spec.ChunksGeneratorConfig, + ConvertedFileMetadata: nil, + }, + Chunks: &unstructured.Chunks{ + Text: chunks, + }, + }, + }, nil +} + // SetupWithManager sets up the controller with the Manager. func (r *ChunksGeneratorReconciler) SetupWithManager(mgr ctrl.Manager) error { labelPredicate := controllerutils.ForceReconcilePredicate() diff --git a/internal/controller/unstructureddataproduct_controller.go b/internal/controller/unstructureddataproduct_controller.go index afdb0b55..c1ea5678 100644 --- a/internal/controller/unstructureddataproduct_controller.go +++ b/internal/controller/unstructureddataproduct_controller.go @@ -88,30 +88,32 @@ func (r *UnstructuredDataProductReconciler) Reconcile(ctx context.Context, req c } r.sf = sf - // first, let's create (or update) the DocumentProcessor CR for this data product - documentProcessorCR := &operatorv1alpha1.DocumentProcessor{ - ObjectMeta: metav1.ObjectMeta{ - Name: dataProductName, - Namespace: unstructuredDataProductCR.Namespace, - }, - Spec: operatorv1alpha1.DocumentProcessorSpec{ - DataProduct: dataProductName, - DocumentProcessorConfig: unstructuredDataProductCR.Spec.DocumentProcessorConfig, - }, - } - // result, err := kubecontrollerutil.CreateOrUpdate(ctx, r.Client, documentProcessorCR, func() error { return nil }) - result, err := controllerutil.CreateOrUpdate(ctx, r.Client, documentProcessorCR, func() error { - documentProcessorCR.Spec = operatorv1alpha1.DocumentProcessorSpec{ - DataProduct: dataProductName, - DocumentProcessorConfig: unstructuredDataProductCR.Spec.DocumentProcessorConfig, + // create (or update) the DocumentProcessor CR only if NOT using Docling chunking strategies + // Docling chunking performs conversion+chunking in a single call, so document processing is not needed + if !operatorv1alpha1.IsDoclingChunkingStrategy(unstructuredDataProductCR.Spec.ChunksGeneratorConfig.Strategy) { + documentProcessorCR := &operatorv1alpha1.DocumentProcessor{ + ObjectMeta: metav1.ObjectMeta{ + Name: dataProductName, + Namespace: unstructuredDataProductCR.Namespace, + }, + Spec: operatorv1alpha1.DocumentProcessorSpec{ + DataProduct: dataProductName, + DocumentProcessorConfig: unstructuredDataProductCR.Spec.DocumentProcessorConfig, + }, } - return nil - }) - if err != nil { - logger.Error(err, "failed to create/update DocumentProcessor CR") - return r.handleError(ctx, unstructuredDataProductCR, err) + result, err := controllerutil.CreateOrUpdate(ctx, r.Client, documentProcessorCR, func() error { + documentProcessorCR.Spec = operatorv1alpha1.DocumentProcessorSpec{ + DataProduct: dataProductName, + DocumentProcessorConfig: unstructuredDataProductCR.Spec.DocumentProcessorConfig, + } + return nil + }) + if err != nil { + logger.Error(err, "failed to create/update DocumentProcessor CR") + return r.handleError(ctx, unstructuredDataProductCR, err) + } + logger.Info("DocumentProcessor CR created/updated", "result", result) } - logger.Info("DocumentProcessor CR created/updated", "result", result) // create ChunksGenerator CR for this data product here chunksGeneratorCR := &operatorv1alpha1.ChunksGenerator{ @@ -124,12 +126,12 @@ func (r *UnstructuredDataProductReconciler) Reconcile(ctx context.Context, req c ChunksGeneratorConfig: unstructuredDataProductCR.Spec.ChunksGeneratorConfig, }, } - result, err = controllerutil.CreateOrUpdate(ctx, r.Client, chunksGeneratorCR, func() error { return nil }) + chunksResult, err := controllerutil.CreateOrUpdate(ctx, r.Client, chunksGeneratorCR, func() error { return nil }) if err != nil { logger.Error(err, "failed to create/update ChunksGenerator CR") return r.handleError(ctx, unstructuredDataProductCR, err) } - logger.Info("ChunksGenerator CR created/updated", "result", result) + logger.Info("ChunksGenerator CR created/updated", "result", chunksResult) var source unstructured.DataSource switch unstructuredDataProductCR.Spec.SourceConfig.Type { @@ -185,20 +187,40 @@ func (r *UnstructuredDataProductReconciler) Reconcile(ctx context.Context, req c return r.handleError(ctx, unstructuredDataProductCR, err) } - // add force reconcile label to the DocumentProcessor CR - documentProcessorKey := client.ObjectKey{ - Namespace: unstructuredDataProductCR.Namespace, - Name: dataProductName, - } - if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - latestDocumentProcessorCR := &operatorv1alpha1.DocumentProcessor{} - if err := r.Get(ctx, documentProcessorKey, latestDocumentProcessorCR); err != nil { - return err + if !operatorv1alpha1.IsDoclingChunkingStrategy(unstructuredDataProductCR.Spec.ChunksGeneratorConfig.Strategy) { + // add force reconcile label to the DocumentProcessor CR + // DocumentProcessor will in turn trigger ChunksGenerator when conversion is done + documentProcessorKey := client.ObjectKey{ + Namespace: unstructuredDataProductCR.Namespace, + Name: dataProductName, + } + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + latestDocumentProcessorCR := &operatorv1alpha1.DocumentProcessor{} + if err := r.Get(ctx, documentProcessorKey, latestDocumentProcessorCR); err != nil { + return err + } + return controllerutils.AddForceReconcileLabel(ctx, r.Client, latestDocumentProcessorCR) + }); err != nil { + logger.Error(err, "failed to add force reconcile label to DocumentProcessor CR") + return r.handleError(ctx, unstructuredDataProductCR, err) + } + } else { + // For Docling chunking strategies, trigger ChunksGenerator directly + // (no document processing needed — Docling chunk endpoint handles conversion internally) + chunksGeneratorKey := client.ObjectKey{ + Namespace: unstructuredDataProductCR.Namespace, + Name: dataProductName, + } + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + latestChunksGeneratorCR := &operatorv1alpha1.ChunksGenerator{} + if err := r.Get(ctx, chunksGeneratorKey, latestChunksGeneratorCR); err != nil { + return err + } + return controllerutils.AddForceReconcileLabel(ctx, r.Client, latestChunksGeneratorCR) + }); err != nil { + logger.Error(err, "failed to add force reconcile label to ChunksGenerator CR") + return r.handleError(ctx, unstructuredDataProductCR, err) } - return controllerutils.AddForceReconcileLabel(ctx, r.Client, latestDocumentProcessorCR) - }); err != nil { - logger.Error(err, "failed to add force reconcile label to DocumentProcessor CR") - return r.handleError(ctx, unstructuredDataProductCR, err) } // Setup destination diff --git a/pkg/docling/chunk.go b/pkg/docling/chunk.go new file mode 100644 index 00000000..341c95bc --- /dev/null +++ b/pkg/docling/chunk.go @@ -0,0 +1,175 @@ +/* +Copyright 2026. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package docling + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "time" + + "sigs.k8s.io/controller-runtime/pkg/log" +) + +// ChunkType represents the type of Docling chunker to use. +type ChunkType string + +const ( + ChunkTypeHierarchical ChunkType = "hierarchical" + ChunkTypeHybrid ChunkType = "hybrid" + + chunkingHTTPTimeout = 300 * time.Second +) + +// HierarchicalChunkingOptions are options for the Docling HierarchicalChunker. +type HierarchicalChunkingOptions struct { + MergeListItems *bool `json:"merge_list_items,omitempty"` +} + +// HybridChunkingOptions are options for the Docling HybridChunker. +type HybridChunkingOptions struct { + Tokenizer string `json:"tokenizer,omitempty"` + MaxTokens int `json:"max_tokens,omitempty"` + MergePeers *bool `json:"merge_peers,omitempty"` +} + +// DoclingChunkRequestPayload is the request body for the Docling chunk endpoint. +type DoclingChunkRequestPayload struct { + Sources []DoclingSource `json:"sources"` + Options *DoclingConfig `json:"options,omitempty"` + ChunkingOptions any `json:"chunking_options,omitempty"` +} + +// DoclingChunk represents a single chunk in the Docling chunk response. +type DoclingChunk struct { + Text string `json:"text"` +} + +// DoclingChunkResponse is the response from the Docling chunk endpoint. +type DoclingChunkResponse struct { + Chunks []DoclingChunk `json:"chunks"` +} + +func (c *Client) chunkSourceEndpoint(chunkType ChunkType) (string, error) { + return url.JoinPath(c.ClientConfig.URL, "/v1/chunk", string(chunkType), "source") +} + +// ChunkFile sends a document to the Docling chunk endpoint and returns the chunk texts. +// It uses TryAcquire on the semaphore to avoid blocking the reconciliation loop. +func (c *Client) ChunkFile( + ctx context.Context, + fileURL string, + chunkType ChunkType, + chunkingOptions any, +) ([]string, error) { + logger := log.FromContext(ctx) + + acquired := c.ClientConfig.sem.TryAcquire(1) + if !acquired { + return nil, errors.New(SemaphoreAcquireError) + } + defer c.ClientConfig.sem.Release(1) + + endpoint, err := c.chunkSourceEndpoint(chunkType) + if err != nil { + return nil, fmt.Errorf("failed to get chunk endpoint: %w", err) + } + + payload, err := json.Marshal(DoclingChunkRequestPayload{ + Sources: []DoclingSource{ + {URL: fileURL, Kind: "http"}, + }, + ChunkingOptions: chunkingOptions, + }) + if err != nil { + return nil, fmt.Errorf("failed to marshal chunk request payload: %w", err) + } + + logger.Info("sending chunk request to docling", "url", endpoint, "source", fileURL, "chunkType", string(chunkType)) + + responseBody, err := c.createDoclingChunkRequest(ctx, http.MethodPost, endpoint, payload) + if err != nil { + return nil, fmt.Errorf("failed to send chunk request: %w", err) + } + defer func() { + if closeErr := responseBody.Close(); closeErr != nil { + logger.Error(closeErr, "failed to close chunk response body") + } + }() + + body, err := io.ReadAll(responseBody) + if err != nil { + return nil, fmt.Errorf("failed to read chunk response body: %w", err) + } + + var chunkResponse DoclingChunkResponse + if err = json.Unmarshal(body, &chunkResponse); err != nil { + return nil, fmt.Errorf("failed to decode chunk response: %w", err) + } + + chunks := make([]string, 0, len(chunkResponse.Chunks)) + for _, chunk := range chunkResponse.Chunks { + chunks = append(chunks, chunk.Text) + } + + logger.Info("successfully chunked file via docling", "chunkCount", len(chunks)) + return chunks, nil +} + +// createDoclingChunkRequest is like createDoclingRequest but with a longer timeout +// suitable for synchronous chunking (which includes conversion + chunking). +func (c *Client) createDoclingChunkRequest(ctx context.Context, method, endpoint string, payload []byte) ( + io.ReadCloser, error) { + logger := log.FromContext(ctx) + client := &http.Client{ + Timeout: chunkingHTTPTimeout, + } + + req, err := c.createHTTPRequest(ctx, method, endpoint, payload, "Bearer %s") + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + logger.Info("sending chunk request to docling service", "url", endpoint) + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to send request: %w", err) + } + + if resp.StatusCode == http.StatusForbidden && c.ClientConfig.Key != "" { + req, err = c.createHTTPRequest(ctx, method, endpoint, payload, "%s") + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + resp, err = client.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to send request: %w", err) + } + } + + if resp.StatusCode != http.StatusOK { + logger.Error(errors.New("received non-200 OK response from chunk endpoint"), + "status code", resp.StatusCode, "url", endpoint) + return nil, fmt.Errorf("failed to chunk file: status code %d", resp.StatusCode) + } + + return resp.Body, nil +} diff --git a/pkg/docling/chunk_test.go b/pkg/docling/chunk_test.go new file mode 100644 index 00000000..87a6bc0c --- /dev/null +++ b/pkg/docling/chunk_test.go @@ -0,0 +1,314 @@ +/* +Copyright 2026. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package docling + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "golang.org/x/sync/semaphore" +) + +func newTestClient(serverURL string, maxConcurrent int64) *Client { + return &Client{ + ClientConfig: &ClientConfig{ + URL: serverURL, + MaxConcurrentRequests: maxConcurrent, + sem: semaphore.NewWeighted(maxConcurrent), + }, + } +} + +func TestChunkFile_Hierarchical(t *testing.T) { + response := DoclingChunkResponse{ + Chunks: []DoclingChunk{ + {Text: "This is the first chunk."}, + {Text: "This is the second chunk."}, + {Text: "This is the third chunk."}, + }, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Errorf("expected POST, got %s", r.Method) + } + if r.URL.Path != "/v1/chunk/hierarchical/source" { + t.Errorf("expected /v1/chunk/hierarchical/source, got %s", r.URL.Path) + } + + var payload DoclingChunkRequestPayload + if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { + t.Fatalf("failed to decode request body: %v", err) + } + if len(payload.Sources) != 1 || payload.Sources[0].URL != "https://example.com/doc.pdf" { + t.Errorf("unexpected sources: %+v", payload.Sources) + } + + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(response) + })) + defer server.Close() + + client := newTestClient(server.URL, 5) + mergeListItems := true + options := &HierarchicalChunkingOptions{MergeListItems: &mergeListItems} + + chunks, err := client.ChunkFile(context.Background(), "https://example.com/doc.pdf", ChunkTypeHierarchical, options) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(chunks) != 3 { + t.Fatalf("expected 3 chunks, got %d", len(chunks)) + } + if chunks[0] != "This is the first chunk." { + t.Errorf("unexpected first chunk: %s", chunks[0]) + } + if chunks[1] != "This is the second chunk." { + t.Errorf("unexpected second chunk: %s", chunks[1]) + } + if chunks[2] != "This is the third chunk." { + t.Errorf("unexpected third chunk: %s", chunks[2]) + } +} + +func TestChunkFile_Hybrid(t *testing.T) { + response := DoclingChunkResponse{ + Chunks: []DoclingChunk{ + {Text: "Hybrid chunk one."}, + {Text: "Hybrid chunk two."}, + }, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/v1/chunk/hybrid/source" { + t.Errorf("expected /v1/chunk/hybrid/source, got %s", r.URL.Path) + } + + var payload DoclingChunkRequestPayload + if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { + t.Fatalf("failed to decode request body: %v", err) + } + + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(response) + })) + defer server.Close() + + client := newTestClient(server.URL, 5) + mergePeers := false + options := &HybridChunkingOptions{ + Tokenizer: "sentence-transformers/all-MiniLM-L6-v2", + MaxTokens: 512, + MergePeers: &mergePeers, + } + + chunks, err := client.ChunkFile(context.Background(), "https://example.com/doc.pdf", ChunkTypeHybrid, options) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(chunks) != 2 { + t.Fatalf("expected 2 chunks, got %d", len(chunks)) + } + if chunks[0] != "Hybrid chunk one." { + t.Errorf("unexpected first chunk: %s", chunks[0]) + } +} + +func TestChunkFile_SemaphoreAcquireError(t *testing.T) { + // Create a client with max concurrency of 1 and acquire the only slot + client := newTestClient("http://localhost:9999", 1) + client.ClientConfig.sem.TryAcquire(1) + + _, err := client.ChunkFile(context.Background(), "https://example.com/doc.pdf", ChunkTypeHierarchical, nil) + if err == nil { + t.Fatal("expected error, got nil") + } + if err.Error() != SemaphoreAcquireError { + t.Errorf("expected semaphore acquire error, got: %s", err.Error()) + } + + // Release the slot we acquired + client.ClientConfig.sem.Release(1) +} + +func TestChunkFile_HTTPError(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + client := newTestClient(server.URL, 5) + + _, err := client.ChunkFile(context.Background(), "https://example.com/doc.pdf", ChunkTypeHierarchical, nil) + if err == nil { + t.Fatal("expected error, got nil") + } +} + +func TestChunkSourceEndpoint(t *testing.T) { + client := newTestClient("http://localhost:5001", 1) + + hierarchicalEndpoint, err := client.chunkSourceEndpoint(ChunkTypeHierarchical) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if hierarchicalEndpoint != "http://localhost:5001/v1/chunk/hierarchical/source" { + t.Errorf("unexpected endpoint: %s", hierarchicalEndpoint) + } + + hybridEndpoint, err := client.chunkSourceEndpoint(ChunkTypeHybrid) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if hybridEndpoint != "http://localhost:5001/v1/chunk/hybrid/source" { + t.Errorf("unexpected endpoint: %s", hybridEndpoint) + } +} + +func TestChunkRequestPayload_Serialization(t *testing.T) { + mergeListItems := true + payload := DoclingChunkRequestPayload{ + Sources: []DoclingSource{ + {URL: "https://example.com/doc.pdf", Kind: "http"}, + }, + ChunkingOptions: &HierarchicalChunkingOptions{ + MergeListItems: &mergeListItems, + }, + } + + data, err := json.Marshal(payload) + if err != nil { + t.Fatalf("failed to marshal: %v", err) + } + + var result map[string]any + if err := json.Unmarshal(data, &result); err != nil { + t.Fatalf("failed to unmarshal: %v", err) + } + + // Verify top-level field names match Docling API + if _, ok := result["sources"]; !ok { + t.Error("expected 'sources' field in JSON") + } + if _, ok := result["chunking_options"]; !ok { + t.Error("expected 'chunking_options' field in JSON") + } + + // Verify chunking_options field names use snake_case + chunkingOpts, ok := result["chunking_options"].(map[string]any) + if !ok { + t.Fatal("chunking_options is not a map") + } + if _, ok := chunkingOpts["merge_list_items"]; !ok { + t.Error("expected 'merge_list_items' field in chunking_options") + } +} + +func TestChunkRequestPayload_HybridSerialization(t *testing.T) { + mergePeers := false + payload := DoclingChunkRequestPayload{ + Sources: []DoclingSource{ + {URL: "https://example.com/doc.pdf", Kind: "http"}, + }, + ChunkingOptions: &HybridChunkingOptions{ + Tokenizer: "sentence-transformers/all-MiniLM-L6-v2", + MaxTokens: 512, + MergePeers: &mergePeers, + }, + } + + data, err := json.Marshal(payload) + if err != nil { + t.Fatalf("failed to marshal: %v", err) + } + + var result map[string]any + if err := json.Unmarshal(data, &result); err != nil { + t.Fatalf("failed to unmarshal: %v", err) + } + + chunkingOpts, ok := result["chunking_options"].(map[string]any) + if !ok { + t.Fatal("chunking_options is not a map") + } + if _, ok := chunkingOpts["tokenizer"]; !ok { + t.Error("expected 'tokenizer' field in chunking_options") + } + if _, ok := chunkingOpts["max_tokens"]; !ok { + t.Error("expected 'max_tokens' field in chunking_options") + } + if _, ok := chunkingOpts["merge_peers"]; !ok { + t.Error("expected 'merge_peers' field in chunking_options") + } +} + +func TestChunkFile_EmptyResponse(t *testing.T) { + response := DoclingChunkResponse{ + Chunks: []DoclingChunk{}, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(response) + })) + defer server.Close() + + client := newTestClient(server.URL, 5) + + chunks, err := client.ChunkFile(context.Background(), "https://example.com/doc.pdf", ChunkTypeHierarchical, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(chunks) != 0 { + t.Errorf("expected 0 chunks, got %d", len(chunks)) + } +} + +func TestChunkFile_AuthHeader(t *testing.T) { + var receivedAuthHeader string + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + receivedAuthHeader = r.Header.Get("Authorization") + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(DoclingChunkResponse{Chunks: []DoclingChunk{}}) + })) + defer server.Close() + + client := &Client{ + ClientConfig: &ClientConfig{ + URL: server.URL, + Key: "test-api-key", + MaxConcurrentRequests: 5, + sem: semaphore.NewWeighted(5), + }, + } + + _, err := client.ChunkFile(context.Background(), "https://example.com/doc.pdf", ChunkTypeHierarchical, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if receivedAuthHeader != "Bearer test-api-key" { + t.Errorf("expected 'Bearer test-api-key', got '%s'", receivedAuthHeader) + } +} diff --git a/pkg/unstructured/chunks_file.go b/pkg/unstructured/chunks_file.go index 5a7f7ee2..fa759bc8 100644 --- a/pkg/unstructured/chunks_file.go +++ b/pkg/unstructured/chunks_file.go @@ -9,6 +9,7 @@ type ChunkingTool string const ( LangchainChunkingTool ChunkingTool = "langchain" + DoclingChunkingTool ChunkingTool = "docling" ) type Chunks struct { @@ -32,7 +33,11 @@ type ChunksFile struct { } func (c *ChunksFileMetadata) Equal(other *ChunksFileMetadata) bool { - if !c.ConvertedFileMetadata.Equal(other.ConvertedFileMetadata) { + if c.ConvertedFileMetadata == nil && other.ConvertedFileMetadata == nil { + // both nil, equal in this regard + } else if c.ConvertedFileMetadata == nil || other.ConvertedFileMetadata == nil { + return false + } else if !c.ConvertedFileMetadata.Equal(other.ConvertedFileMetadata) { return false } if c.ChunkingTool != other.ChunkingTool { diff --git a/pkg/unstructured/chunks_file_test.go b/pkg/unstructured/chunks_file_test.go new file mode 100644 index 00000000..d903fbd3 --- /dev/null +++ b/pkg/unstructured/chunks_file_test.go @@ -0,0 +1,147 @@ +/* +Copyright 2026. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package unstructured + +import ( + "testing" + + "github.com/redhat-data-and-ai/unstructured-data-controller/api/v1alpha1" +) + +func TestChunksFileMetadata_Equal_BothNilConvertedFileMetadata(t *testing.T) { + a := &ChunksFileMetadata{ + ConvertedFileMetadata: nil, + ChunkingTool: DoclingChunkingTool, + ChunksGeneratorConfig: v1alpha1.ChunksGeneratorConfig{ + Strategy: v1alpha1.ChunkingStrategyDoclingHierarchical, + }, + } + b := &ChunksFileMetadata{ + ConvertedFileMetadata: nil, + ChunkingTool: DoclingChunkingTool, + ChunksGeneratorConfig: v1alpha1.ChunksGeneratorConfig{ + Strategy: v1alpha1.ChunkingStrategyDoclingHierarchical, + }, + } + + if !a.Equal(b) { + t.Error("expected Equal to return true when both ConvertedFileMetadata are nil and other fields match") + } +} + +func TestChunksFileMetadata_Equal_OneNilConvertedFileMetadata(t *testing.T) { + a := &ChunksFileMetadata{ + ConvertedFileMetadata: nil, + ChunkingTool: DoclingChunkingTool, + ChunksGeneratorConfig: v1alpha1.ChunksGeneratorConfig{ + Strategy: v1alpha1.ChunkingStrategyDoclingHierarchical, + }, + } + b := &ChunksFileMetadata{ + ConvertedFileMetadata: &ConvertedFileMetadata{ + RawFilePath: "test/file.pdf", + DocumentConverter: DocumentConverterDocling, + }, + ChunkingTool: DoclingChunkingTool, + ChunksGeneratorConfig: v1alpha1.ChunksGeneratorConfig{ + Strategy: v1alpha1.ChunkingStrategyDoclingHierarchical, + }, + } + + if a.Equal(b) { + t.Error("expected Equal to return false when one ConvertedFileMetadata is nil and the other is not") + } + if b.Equal(a) { + t.Error("expected Equal to return false when one ConvertedFileMetadata is nil and the other is not (reversed)") + } +} + +func TestChunksFileMetadata_Equal_BothNonNilSame(t *testing.T) { + meta := &ConvertedFileMetadata{ + RawFilePath: "test/file.pdf", + DocumentConverter: DocumentConverterDocling, + } + a := &ChunksFileMetadata{ + ConvertedFileMetadata: meta, + ChunkingTool: LangchainChunkingTool, + ChunksGeneratorConfig: v1alpha1.ChunksGeneratorConfig{ + Strategy: v1alpha1.ChunkingStrategyRecursiveCharacter, + }, + } + b := &ChunksFileMetadata{ + ConvertedFileMetadata: &ConvertedFileMetadata{ + RawFilePath: "test/file.pdf", + DocumentConverter: DocumentConverterDocling, + }, + ChunkingTool: LangchainChunkingTool, + ChunksGeneratorConfig: v1alpha1.ChunksGeneratorConfig{ + Strategy: v1alpha1.ChunkingStrategyRecursiveCharacter, + }, + } + + if !a.Equal(b) { + t.Error("expected Equal to return true when both ConvertedFileMetadata are non-nil and match") + } +} + +func TestChunksFileMetadata_Equal_BothNonNilDifferent(t *testing.T) { + a := &ChunksFileMetadata{ + ConvertedFileMetadata: &ConvertedFileMetadata{ + RawFilePath: "test/file1.pdf", + DocumentConverter: DocumentConverterDocling, + }, + ChunkingTool: LangchainChunkingTool, + ChunksGeneratorConfig: v1alpha1.ChunksGeneratorConfig{ + Strategy: v1alpha1.ChunkingStrategyRecursiveCharacter, + }, + } + b := &ChunksFileMetadata{ + ConvertedFileMetadata: &ConvertedFileMetadata{ + RawFilePath: "test/file2.pdf", + DocumentConverter: DocumentConverterDocling, + }, + ChunkingTool: LangchainChunkingTool, + ChunksGeneratorConfig: v1alpha1.ChunksGeneratorConfig{ + Strategy: v1alpha1.ChunkingStrategyRecursiveCharacter, + }, + } + + if a.Equal(b) { + t.Error("expected Equal to return false when ConvertedFileMetadata differ") + } +} + +func TestChunksFileMetadata_Equal_DifferentChunkingTool(t *testing.T) { + a := &ChunksFileMetadata{ + ConvertedFileMetadata: nil, + ChunkingTool: DoclingChunkingTool, + ChunksGeneratorConfig: v1alpha1.ChunksGeneratorConfig{ + Strategy: v1alpha1.ChunkingStrategyDoclingHierarchical, + }, + } + b := &ChunksFileMetadata{ + ConvertedFileMetadata: nil, + ChunkingTool: LangchainChunkingTool, + ChunksGeneratorConfig: v1alpha1.ChunksGeneratorConfig{ + Strategy: v1alpha1.ChunkingStrategyDoclingHierarchical, + }, + } + + if a.Equal(b) { + t.Error("expected Equal to return false when ChunkingTool differs") + } +} From 3789fdeac1b20c1d28eb5cbbd507834d5df7a981 Mon Sep 17 00:00:00 2001 From: Pradeepto Bhattacharya Date: Sat, 7 Mar 2026 21:17:25 +0000 Subject: [PATCH 2/4] Add spec validation and simplify DocumentProcessorConfig handling Add ValidateSpec() to UnstructuredDataProductSpec that returns an error when a Docling chunking strategy is selected and DocumentProcessorConfig is set. Docling performs conversion and chunking in a single step, so document processing configuration is incompatible. Change DocumentProcessorConfig from value type to pointer (*DocumentProcessorConfig) in UnstructuredDataProductSpec. Replace IsDoclingChunkingStrategy() checks in the controller with DocumentProcessorConfig != nil, making the logic driven by whether document processing is configured rather than strategy-specific knowledge. --- api/v1alpha1/unstructureddataproduct_types.go | 19 ++++- .../unstructureddataproduct_types_test.go | 85 ++++++++++++++++++- api/v1alpha1/zz_generated.deepcopy.go | 6 +- .../unstructureddataproduct_controller.go | 19 +++-- 4 files changed, 115 insertions(+), 14 deletions(-) diff --git a/api/v1alpha1/unstructureddataproduct_types.go b/api/v1alpha1/unstructureddataproduct_types.go index 04c62707..34eab491 100644 --- a/api/v1alpha1/unstructureddataproduct_types.go +++ b/api/v1alpha1/unstructureddataproduct_types.go @@ -17,6 +17,8 @@ limitations under the License. package v1alpha1 import ( + "fmt" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -42,6 +44,15 @@ func IsDoclingChunkingStrategy(strategy ChunkingStrategy) bool { return strategy == ChunkingStrategyDoclingHierarchical || strategy == ChunkingStrategyDoclingHybrid } +func (s *UnstructuredDataProductSpec) ValidateSpec() error { + if IsDoclingChunkingStrategy(s.ChunksGeneratorConfig.Strategy) { + if s.DocumentProcessorConfig != nil { + return fmt.Errorf("documentProcessorConfig must not be set when using Docling chunking strategy %q; Docling performs conversion and chunking in a single step", s.ChunksGeneratorConfig.Strategy) + } + } + return nil +} + type DocumentProcessorConfig struct { Type string `json:"type,omitempty"` DoclingConfig DoclingConfig `json:"doclingConfig,omitempty"` @@ -106,10 +117,10 @@ type DoclingHybridChunkerConfig struct { // UnstructuredDataProductSpec defines the desired state of UnstructuredDataProduct type UnstructuredDataProductSpec struct { - SourceConfig SourceConfig `json:"sourceConfig,omitempty"` - DestinationConfig DestinationConfig `json:"destinationConfig,omitempty"` - DocumentProcessorConfig DocumentProcessorConfig `json:"documentProcessorConfig,omitempty"` - ChunksGeneratorConfig ChunksGeneratorConfig `json:"chunksGeneratorConfig,omitempty"` + SourceConfig SourceConfig `json:"sourceConfig,omitempty"` + DestinationConfig DestinationConfig `json:"destinationConfig,omitempty"` + DocumentProcessorConfig *DocumentProcessorConfig `json:"documentProcessorConfig,omitempty"` + ChunksGeneratorConfig ChunksGeneratorConfig `json:"chunksGeneratorConfig,omitempty"` } type SourceConfig struct { diff --git a/api/v1alpha1/unstructureddataproduct_types_test.go b/api/v1alpha1/unstructureddataproduct_types_test.go index 7ba7898b..d9110b1f 100644 --- a/api/v1alpha1/unstructureddataproduct_types_test.go +++ b/api/v1alpha1/unstructureddataproduct_types_test.go @@ -16,7 +16,90 @@ limitations under the License. package v1alpha1 -import "testing" +import ( + "testing" +) + +func TestValidateSpec(t *testing.T) { + tests := []struct { + name string + spec UnstructuredDataProductSpec + wantErr bool + }{ + { + name: "docling hierarchical with DocumentProcessorConfig set returns error", + spec: UnstructuredDataProductSpec{ + ChunksGeneratorConfig: ChunksGeneratorConfig{ + Strategy: ChunkingStrategyDoclingHierarchical, + }, + DocumentProcessorConfig: &DocumentProcessorConfig{ + Type: "docling", + }, + }, + wantErr: true, + }, + { + name: "docling hybrid with DocumentProcessorConfig set returns error", + spec: UnstructuredDataProductSpec{ + ChunksGeneratorConfig: ChunksGeneratorConfig{ + Strategy: ChunkingStrategyDoclingHybrid, + }, + DocumentProcessorConfig: &DocumentProcessorConfig{ + Type: "docling", + }, + }, + wantErr: true, + }, + { + name: "docling hierarchical without DocumentProcessorConfig returns no error", + spec: UnstructuredDataProductSpec{ + ChunksGeneratorConfig: ChunksGeneratorConfig{ + Strategy: ChunkingStrategyDoclingHierarchical, + }, + }, + wantErr: false, + }, + { + name: "docling hybrid without DocumentProcessorConfig returns no error", + spec: UnstructuredDataProductSpec{ + ChunksGeneratorConfig: ChunksGeneratorConfig{ + Strategy: ChunkingStrategyDoclingHybrid, + }, + }, + wantErr: false, + }, + { + name: "non-docling strategy with DocumentProcessorConfig set returns no error", + spec: UnstructuredDataProductSpec{ + ChunksGeneratorConfig: ChunksGeneratorConfig{ + Strategy: ChunkingStrategyRecursiveCharacter, + }, + DocumentProcessorConfig: &DocumentProcessorConfig{ + Type: "docling", + }, + }, + wantErr: false, + }, + { + name: "non-docling strategy without DocumentProcessorConfig returns no error", + spec: UnstructuredDataProductSpec{ + ChunksGeneratorConfig: ChunksGeneratorConfig{ + Strategy: ChunkingStrategyMarkdown, + }, + }, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.spec.ValidateSpec() + if (err != nil) != tt.wantErr { + t.Errorf("ValidateSpec() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} func TestIsDoclingChunkingStrategy(t *testing.T) { tests := []struct { diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index b29ce2be..333b4a80 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -763,7 +763,11 @@ func (in *UnstructuredDataProductSpec) DeepCopyInto(out *UnstructuredDataProduct *out = *in out.SourceConfig = in.SourceConfig out.DestinationConfig = in.DestinationConfig - in.DocumentProcessorConfig.DeepCopyInto(&out.DocumentProcessorConfig) + if in.DocumentProcessorConfig != nil { + in, out := &in.DocumentProcessorConfig, &out.DocumentProcessorConfig + *out = new(DocumentProcessorConfig) + (*in).DeepCopyInto(*out) + } in.ChunksGeneratorConfig.DeepCopyInto(&out.ChunksGeneratorConfig) } diff --git a/internal/controller/unstructureddataproduct_controller.go b/internal/controller/unstructureddataproduct_controller.go index c1ea5678..f91cd2d2 100644 --- a/internal/controller/unstructureddataproduct_controller.go +++ b/internal/controller/unstructureddataproduct_controller.go @@ -68,6 +68,11 @@ func (r *UnstructuredDataProductReconciler) Reconcile(ctx context.Context, req c } dataProductName := unstructuredDataProductCR.Name + if err := unstructuredDataProductCR.Spec.ValidateSpec(); err != nil { + logger.Error(err, "spec validation failed") + return r.handleError(ctx, unstructuredDataProductCR, err) + } + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { latest := &operatorv1alpha1.UnstructuredDataProduct{} if err := r.Get(ctx, req.NamespacedName, latest); err != nil { @@ -88,9 +93,8 @@ func (r *UnstructuredDataProductReconciler) Reconcile(ctx context.Context, req c } r.sf = sf - // create (or update) the DocumentProcessor CR only if NOT using Docling chunking strategies - // Docling chunking performs conversion+chunking in a single call, so document processing is not needed - if !operatorv1alpha1.IsDoclingChunkingStrategy(unstructuredDataProductCR.Spec.ChunksGeneratorConfig.Strategy) { + // create (or update) the DocumentProcessor CR only if DocumentProcessorConfig is specified + if unstructuredDataProductCR.Spec.DocumentProcessorConfig != nil { documentProcessorCR := &operatorv1alpha1.DocumentProcessor{ ObjectMeta: metav1.ObjectMeta{ Name: dataProductName, @@ -98,13 +102,13 @@ func (r *UnstructuredDataProductReconciler) Reconcile(ctx context.Context, req c }, Spec: operatorv1alpha1.DocumentProcessorSpec{ DataProduct: dataProductName, - DocumentProcessorConfig: unstructuredDataProductCR.Spec.DocumentProcessorConfig, + DocumentProcessorConfig: *unstructuredDataProductCR.Spec.DocumentProcessorConfig, }, } result, err := controllerutil.CreateOrUpdate(ctx, r.Client, documentProcessorCR, func() error { documentProcessorCR.Spec = operatorv1alpha1.DocumentProcessorSpec{ DataProduct: dataProductName, - DocumentProcessorConfig: unstructuredDataProductCR.Spec.DocumentProcessorConfig, + DocumentProcessorConfig: *unstructuredDataProductCR.Spec.DocumentProcessorConfig, } return nil }) @@ -187,7 +191,7 @@ func (r *UnstructuredDataProductReconciler) Reconcile(ctx context.Context, req c return r.handleError(ctx, unstructuredDataProductCR, err) } - if !operatorv1alpha1.IsDoclingChunkingStrategy(unstructuredDataProductCR.Spec.ChunksGeneratorConfig.Strategy) { + if unstructuredDataProductCR.Spec.DocumentProcessorConfig != nil { // add force reconcile label to the DocumentProcessor CR // DocumentProcessor will in turn trigger ChunksGenerator when conversion is done documentProcessorKey := client.ObjectKey{ @@ -205,8 +209,7 @@ func (r *UnstructuredDataProductReconciler) Reconcile(ctx context.Context, req c return r.handleError(ctx, unstructuredDataProductCR, err) } } else { - // For Docling chunking strategies, trigger ChunksGenerator directly - // (no document processing needed — Docling chunk endpoint handles conversion internally) + // No document processing needed — trigger ChunksGenerator directly chunksGeneratorKey := client.ObjectKey{ Namespace: unstructuredDataProductCR.Namespace, Name: dataProductName, From b4f9615dde4da75cc1fa8b8935e89d056221ab23 Mon Sep 17 00:00:00 2001 From: Pradeepto Bhattacharya Date: Sat, 7 Mar 2026 21:20:40 +0000 Subject: [PATCH 3/4] Implement Chunker interface for Docling chunkers Add DoclingHierarchicalChunker and DoclingHybridChunker types that implement the Chunker interface. Update the interface signature to Chunk(ctx context.Context, input string) to support context propagation for HTTP calls. Refactor chunkFile() and chunkRawFile() in the ChunksGenerator controller to use the unified chunker.Chunk() call for all strategies. --- .../controller/chunksgenerator_controller.go | 94 +++++++------- pkg/unstructured/chunking.go | 29 ++++- pkg/unstructured/chunking_test.go | 116 ++++++++++++++++++ 3 files changed, 182 insertions(+), 57 deletions(-) create mode 100644 pkg/unstructured/chunking_test.go diff --git a/internal/controller/chunksgenerator_controller.go b/internal/controller/chunksgenerator_controller.go index bc6ffb5f..7c79809c 100644 --- a/internal/controller/chunksgenerator_controller.go +++ b/internal/controller/chunksgenerator_controller.go @@ -315,6 +315,9 @@ func (r *ChunksGeneratorReconciler) chunkFile(ctx context.Context, convertedFile } var chunker unstructured.Chunker + var chunkingTool unstructured.ChunkingTool + var input string + switch chunksGeneratorCR.Spec.ChunksGeneratorConfig.Strategy { case operatorv1alpha1.ChunkingStrategyRecursiveCharacter: chunker = &unstructured.RecursiveCharacterSplitter{ @@ -326,6 +329,8 @@ func (r *ChunksGeneratorReconciler) chunkFile(ctx context.Context, convertedFile KeepSeparator: chunksGeneratorCR.Spec.ChunksGeneratorConfig.RecursiveCharacterSplitterConfig.KeepSeparator, }, } + chunkingTool = unstructured.LangchainChunkingTool + input = convertedFile.ConvertedDocument.Content.Markdown case operatorv1alpha1.ChunkingStrategyMarkdown: chunker = &unstructured.MarkdownSplitter{ LangchainClient: langchainClient, @@ -338,6 +343,8 @@ func (r *ChunksGeneratorReconciler) chunkFile(ctx context.Context, convertedFile JoinTableRows: chunksGeneratorCR.Spec.ChunksGeneratorConfig.MarkdownSplitterConfig.JoinTableRows, }, } + chunkingTool = unstructured.LangchainChunkingTool + input = convertedFile.ConvertedDocument.Content.Markdown case operatorv1alpha1.ChunkingStrategyToken: chunker = &unstructured.TokenSplitter{ LangchainClient: langchainClient, @@ -350,65 +357,43 @@ func (r *ChunksGeneratorReconciler) chunkFile(ctx context.Context, convertedFile DisallowedSpecial: chunksGeneratorCR.Spec.ChunksGeneratorConfig.TokenSplitterConfig.DisallowedSpecial, }, } + chunkingTool = unstructured.LangchainChunkingTool + input = convertedFile.ConvertedDocument.Content.Markdown case operatorv1alpha1.ChunkingStrategyDoclingHierarchical: rawFilePath := unstructured.GetRawFilePathFromConvertedFilePath(convertedFilePath) fileURL, err := r.fileStore.GetFileURL(ctx, rawFilePath) if err != nil { return nil, fmt.Errorf("failed to get presigned URL for raw file: %w", err) } - chunkingOptions := &docling.HierarchicalChunkingOptions{ - MergeListItems: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHierarchicalChunkerConfig.MergeListItems, - } - chunks, err := doclingClient.ChunkFile(ctx, fileURL, docling.ChunkTypeHierarchical, chunkingOptions) - if err != nil { - return nil, err - } - return &unstructured.ChunksFile{ - ConvertedDocument: convertedFile.ConvertedDocument, - ChunksDocument: &unstructured.ChunksDocument{ - Metadata: &unstructured.ChunksFileMetadata{ - ChunkingTool: unstructured.DoclingChunkingTool, - ChunksGeneratorConfig: chunksGeneratorCR.Spec.ChunksGeneratorConfig, - ConvertedFileMetadata: convertedFile.ConvertedDocument.Metadata, - }, - Chunks: &unstructured.Chunks{ - Text: chunks, - }, + chunker = &unstructured.DoclingHierarchicalChunker{ + DoclingClient: doclingClient, + Options: &docling.HierarchicalChunkingOptions{ + MergeListItems: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHierarchicalChunkerConfig.MergeListItems, }, - }, nil + } + chunkingTool = unstructured.DoclingChunkingTool + input = fileURL case operatorv1alpha1.ChunkingStrategyDoclingHybrid: rawFilePath := unstructured.GetRawFilePathFromConvertedFilePath(convertedFilePath) fileURL, err := r.fileStore.GetFileURL(ctx, rawFilePath) if err != nil { return nil, fmt.Errorf("failed to get presigned URL for raw file: %w", err) } - chunkingOptions := &docling.HybridChunkingOptions{ - Tokenizer: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHybridChunkerConfig.Tokenizer, - MaxTokens: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHybridChunkerConfig.MaxTokens, - MergePeers: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHybridChunkerConfig.MergePeers, - } - chunks, err := doclingClient.ChunkFile(ctx, fileURL, docling.ChunkTypeHybrid, chunkingOptions) - if err != nil { - return nil, err - } - return &unstructured.ChunksFile{ - ConvertedDocument: convertedFile.ConvertedDocument, - ChunksDocument: &unstructured.ChunksDocument{ - Metadata: &unstructured.ChunksFileMetadata{ - ChunkingTool: unstructured.DoclingChunkingTool, - ChunksGeneratorConfig: chunksGeneratorCR.Spec.ChunksGeneratorConfig, - ConvertedFileMetadata: convertedFile.ConvertedDocument.Metadata, - }, - Chunks: &unstructured.Chunks{ - Text: chunks, - }, + chunker = &unstructured.DoclingHybridChunker{ + DoclingClient: doclingClient, + Options: &docling.HybridChunkingOptions{ + Tokenizer: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHybridChunkerConfig.Tokenizer, + MaxTokens: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHybridChunkerConfig.MaxTokens, + MergePeers: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHybridChunkerConfig.MergePeers, }, - }, nil + } + chunkingTool = unstructured.DoclingChunkingTool + input = fileURL default: return nil, fmt.Errorf("invalid strategy: %s", chunksGeneratorCR.Spec.ChunksGeneratorConfig.Strategy) } - chunks, err := chunker.Chunk(convertedFile.ConvertedDocument.Content.Markdown) + chunks, err := chunker.Chunk(ctx, input) if err != nil { return nil, err } @@ -417,7 +402,7 @@ func (r *ChunksGeneratorReconciler) chunkFile(ctx context.Context, convertedFile ConvertedDocument: convertedFile.ConvertedDocument, ChunksDocument: &unstructured.ChunksDocument{ Metadata: &unstructured.ChunksFileMetadata{ - ChunkingTool: unstructured.LangchainChunkingTool, + ChunkingTool: chunkingTool, ChunksGeneratorConfig: chunksGeneratorCR.Spec.ChunksGeneratorConfig, ConvertedFileMetadata: convertedFile.ConvertedDocument.Metadata, }, @@ -508,27 +493,30 @@ func (r *ChunksGeneratorReconciler) chunkRawFile(ctx context.Context, rawFilePat return nil, fmt.Errorf("failed to get presigned URL for raw file: %w", err) } - var chunkType docling.ChunkType - var chunkingOptions any + var chunker unstructured.Chunker switch chunksGeneratorCR.Spec.ChunksGeneratorConfig.Strategy { case operatorv1alpha1.ChunkingStrategyDoclingHierarchical: - chunkType = docling.ChunkTypeHierarchical - chunkingOptions = &docling.HierarchicalChunkingOptions{ - MergeListItems: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHierarchicalChunkerConfig.MergeListItems, + chunker = &unstructured.DoclingHierarchicalChunker{ + DoclingClient: doclingClient, + Options: &docling.HierarchicalChunkingOptions{ + MergeListItems: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHierarchicalChunkerConfig.MergeListItems, + }, } case operatorv1alpha1.ChunkingStrategyDoclingHybrid: - chunkType = docling.ChunkTypeHybrid - chunkingOptions = &docling.HybridChunkingOptions{ - Tokenizer: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHybridChunkerConfig.Tokenizer, - MaxTokens: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHybridChunkerConfig.MaxTokens, - MergePeers: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHybridChunkerConfig.MergePeers, + chunker = &unstructured.DoclingHybridChunker{ + DoclingClient: doclingClient, + Options: &docling.HybridChunkingOptions{ + Tokenizer: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHybridChunkerConfig.Tokenizer, + MaxTokens: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHybridChunkerConfig.MaxTokens, + MergePeers: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHybridChunkerConfig.MergePeers, + }, } default: return nil, fmt.Errorf("chunkRawFile called with non-Docling strategy: %s", chunksGeneratorCR.Spec.ChunksGeneratorConfig.Strategy) } - chunks, err := doclingClient.ChunkFile(ctx, fileURL, chunkType, chunkingOptions) + chunks, err := chunker.Chunk(ctx, fileURL) if err != nil { return nil, err } diff --git a/pkg/unstructured/chunking.go b/pkg/unstructured/chunking.go index 3e2534c9..01f361aa 100644 --- a/pkg/unstructured/chunking.go +++ b/pkg/unstructured/chunking.go @@ -1,12 +1,15 @@ package unstructured import ( + "context" + + "github.com/redhat-data-and-ai/unstructured-data-controller/pkg/docling" "github.com/redhat-data-and-ai/unstructured-data-controller/pkg/langchain" "github.com/tmc/langchaingo/textsplitter" ) type Chunker interface { - Chunk(text string) ([]string, error) + Chunk(ctx context.Context, input string) ([]string, error) } type MarkdownSplitter struct { @@ -24,14 +27,32 @@ type TokenSplitter struct { Config *textsplitter.Options } -func (c *MarkdownSplitter) Chunk(text string) ([]string, error) { +type DoclingHierarchicalChunker struct { + DoclingClient *docling.Client + Options *docling.HierarchicalChunkingOptions +} + +type DoclingHybridChunker struct { + DoclingClient *docling.Client + Options *docling.HybridChunkingOptions +} + +func (c *MarkdownSplitter) Chunk(_ context.Context, text string) ([]string, error) { return c.LangchainClient.SplitTextViaMarkdownTextSplitter(text, c.Config) } -func (c *RecursiveCharacterSplitter) Chunk(text string) ([]string, error) { +func (c *RecursiveCharacterSplitter) Chunk(_ context.Context, text string) ([]string, error) { return c.LangchainClient.SplitTextViaRecursiveCharacterTextSplitter(text, c.Config) } -func (c *TokenSplitter) Chunk(text string) ([]string, error) { +func (c *TokenSplitter) Chunk(_ context.Context, text string) ([]string, error) { return c.LangchainClient.SplitTextViaTokenTextSplitter(text, c.Config) } + +func (c *DoclingHierarchicalChunker) Chunk(ctx context.Context, fileURL string) ([]string, error) { + return c.DoclingClient.ChunkFile(ctx, fileURL, docling.ChunkTypeHierarchical, c.Options) +} + +func (c *DoclingHybridChunker) Chunk(ctx context.Context, fileURL string) ([]string, error) { + return c.DoclingClient.ChunkFile(ctx, fileURL, docling.ChunkTypeHybrid, c.Options) +} diff --git a/pkg/unstructured/chunking_test.go b/pkg/unstructured/chunking_test.go new file mode 100644 index 00000000..fce27315 --- /dev/null +++ b/pkg/unstructured/chunking_test.go @@ -0,0 +1,116 @@ +/* +Copyright 2026. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package unstructured + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/redhat-data-and-ai/unstructured-data-controller/pkg/docling" +) + +func newTestDoclingClient(serverURL string) *docling.Client { + return docling.NewClientFromURL(&docling.ClientConfig{ + URL: serverURL, + MaxConcurrentRequests: 5, + }) +} + +func TestDoclingHierarchicalChunker_ImplementsChunker(t *testing.T) { + response := docling.DoclingChunkResponse{ + Chunks: []docling.DoclingChunk{ + {Text: "chunk one"}, + {Text: "chunk two"}, + }, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/v1/chunk/hierarchical/source" { + t.Errorf("expected /v1/chunk/hierarchical/source, got %s", r.URL.Path) + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(response) + })) + defer server.Close() + + mergeListItems := true + var chunker Chunker = &DoclingHierarchicalChunker{ + DoclingClient: newTestDoclingClient(server.URL), + Options: &docling.HierarchicalChunkingOptions{ + MergeListItems: &mergeListItems, + }, + } + + chunks, err := chunker.Chunk(context.Background(), "https://example.com/doc.pdf") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(chunks) != 2 { + t.Fatalf("expected 2 chunks, got %d", len(chunks)) + } + if chunks[0] != "chunk one" { + t.Errorf("unexpected first chunk: %s", chunks[0]) + } + if chunks[1] != "chunk two" { + t.Errorf("unexpected second chunk: %s", chunks[1]) + } +} + +func TestDoclingHybridChunker_ImplementsChunker(t *testing.T) { + response := docling.DoclingChunkResponse{ + Chunks: []docling.DoclingChunk{ + {Text: "hybrid one"}, + {Text: "hybrid two"}, + {Text: "hybrid three"}, + }, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/v1/chunk/hybrid/source" { + t.Errorf("expected /v1/chunk/hybrid/source, got %s", r.URL.Path) + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(response) + })) + defer server.Close() + + mergePeers := false + var chunker Chunker = &DoclingHybridChunker{ + DoclingClient: newTestDoclingClient(server.URL), + Options: &docling.HybridChunkingOptions{ + Tokenizer: "sentence-transformers/all-MiniLM-L6-v2", + MaxTokens: 512, + MergePeers: &mergePeers, + }, + } + + chunks, err := chunker.Chunk(context.Background(), "https://example.com/doc.pdf") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(chunks) != 3 { + t.Fatalf("expected 3 chunks, got %d", len(chunks)) + } + if chunks[0] != "hybrid one" { + t.Errorf("unexpected first chunk: %s", chunks[0]) + } +} From 9e6adf9f6f0b6e48743e287606e79e161cf35876 Mon Sep 17 00:00:00 2001 From: Pradeepto Bhattacharya Date: Sat, 7 Mar 2026 21:25:59 +0000 Subject: [PATCH 4/4] Use async endpoints for Docling chunking Switch from synchronous POST /v1/chunk/{type}/source to async POST /v1/chunk/{type}/source/async. ChunkFile() now submits an async task, polls GET /v1/status/poll/{task_id} until completion, then fetches the result from GET /v1/result/{task_id}. Add ChunkDocumentResponse and ChunkedDocumentResultItem types matching the Docling OpenAPI spec. Reuse existing createDoclingRequest() and getTaskStatus() methods from the conversion flow. --- pkg/docling/chunk.go | 134 ++++++++++++-------- pkg/docling/chunk_test.go | 201 ++++++++++++++++++++++++------ pkg/unstructured/chunking_test.go | 61 +++++---- 3 files changed, 279 insertions(+), 117 deletions(-) diff --git a/pkg/docling/chunk.go b/pkg/docling/chunk.go index 341c95bc..e3d4086f 100644 --- a/pkg/docling/chunk.go +++ b/pkg/docling/chunk.go @@ -22,7 +22,6 @@ import ( "errors" "fmt" "io" - "net/http" "net/url" "time" @@ -36,7 +35,7 @@ const ( ChunkTypeHierarchical ChunkType = "hierarchical" ChunkTypeHybrid ChunkType = "hybrid" - chunkingHTTPTimeout = 300 * time.Second + chunkPollInterval = 2 * time.Second ) // HierarchicalChunkingOptions are options for the Docling HierarchicalChunker. @@ -58,21 +57,23 @@ type DoclingChunkRequestPayload struct { ChunkingOptions any `json:"chunking_options,omitempty"` } -// DoclingChunk represents a single chunk in the Docling chunk response. -type DoclingChunk struct { +// ChunkedDocumentResultItem represents a single chunk returned by the Docling chunk API. +type ChunkedDocumentResultItem struct { Text string `json:"text"` } -// DoclingChunkResponse is the response from the Docling chunk endpoint. -type DoclingChunkResponse struct { - Chunks []DoclingChunk `json:"chunks"` +// ChunkDocumentResponse is the response from GET /v1/result/{task_id} for chunk tasks. +type ChunkDocumentResponse struct { + Chunks []ChunkedDocumentResultItem `json:"chunks"` + ProcessingTime float64 `json:"processing_time"` } -func (c *Client) chunkSourceEndpoint(chunkType ChunkType) (string, error) { - return url.JoinPath(c.ClientConfig.URL, "/v1/chunk", string(chunkType), "source") +func (c *Client) chunkSourceAsyncEndpoint(chunkType ChunkType) (string, error) { + return url.JoinPath(c.ClientConfig.URL, "/v1/chunk", string(chunkType), "source", "async") } -// ChunkFile sends a document to the Docling chunk endpoint and returns the chunk texts. +// ChunkFile sends a document to the Docling async chunk endpoint and returns the chunk texts. +// It submits the task, polls for completion, then fetches the result. // It uses TryAcquire on the semaphore to avoid blocking the reconciliation loop. func (c *Client) ChunkFile( ctx context.Context, @@ -88,9 +89,10 @@ func (c *Client) ChunkFile( } defer c.ClientConfig.sem.Release(1) - endpoint, err := c.chunkSourceEndpoint(chunkType) + // Step 1: Submit async chunk request + endpoint, err := c.chunkSourceAsyncEndpoint(chunkType) if err != nil { - return nil, fmt.Errorf("failed to get chunk endpoint: %w", err) + return nil, fmt.Errorf("failed to get async chunk endpoint: %w", err) } payload, err := json.Marshal(DoclingChunkRequestPayload{ @@ -103,73 +105,97 @@ func (c *Client) ChunkFile( return nil, fmt.Errorf("failed to marshal chunk request payload: %w", err) } - logger.Info("sending chunk request to docling", "url", endpoint, "source", fileURL, "chunkType", string(chunkType)) + logger.Info("submitting async chunk request to docling", "url", endpoint, "source", fileURL, "chunkType", string(chunkType)) - responseBody, err := c.createDoclingChunkRequest(ctx, http.MethodPost, endpoint, payload) + responseBody, err := c.createDoclingRequest(ctx, "POST", endpoint, payload) if err != nil { - return nil, fmt.Errorf("failed to send chunk request: %w", err) + return nil, fmt.Errorf("failed to submit async chunk request: %w", err) } - defer func() { - if closeErr := responseBody.Close(); closeErr != nil { - logger.Error(closeErr, "failed to close chunk response body") - } - }() body, err := io.ReadAll(responseBody) if err != nil { - return nil, fmt.Errorf("failed to read chunk response body: %w", err) + responseBody.Close() + return nil, fmt.Errorf("failed to read async chunk response body: %w", err) } + responseBody.Close() - var chunkResponse DoclingChunkResponse - if err = json.Unmarshal(body, &chunkResponse); err != nil { - return nil, fmt.Errorf("failed to decode chunk response: %w", err) + var taskStatus TaskStatusResponse + if err = json.Unmarshal(body, &taskStatus); err != nil { + return nil, fmt.Errorf("failed to decode async chunk response: %w", err) } - chunks := make([]string, 0, len(chunkResponse.Chunks)) - for _, chunk := range chunkResponse.Chunks { - chunks = append(chunks, chunk.Text) + taskID := taskStatus.TaskID + logger.Info("async chunk task submitted", "taskID", taskID, "status", taskStatus.TaskStatus) + + // Step 2: Poll for completion + if err := c.pollUntilComplete(ctx, taskID); err != nil { + return nil, fmt.Errorf("chunk task failed: %w", err) } - logger.Info("successfully chunked file via docling", "chunkCount", len(chunks)) - return chunks, nil + // Step 3: Fetch result + return c.getChunkResult(ctx, taskID) +} + +func (c *Client) pollUntilComplete(ctx context.Context, taskID string) error { + logger := log.FromContext(ctx) + + for { + _, taskStatus, err := c.getTaskStatus(ctx, taskID) + if err != nil { + return fmt.Errorf("failed to poll task status: %w", err) + } + + switch taskStatus.TaskStatus { + case TaskStatusSuccess: + logger.Info("chunk task completed successfully", "taskID", taskID) + return nil + case TaskStatusFailure: + return fmt.Errorf("task failed: task id: %s", taskID) + case TaskStatusPending, TaskStatusStarted: + logger.Info("chunk task still in progress, polling again", "taskID", taskID, "status", taskStatus.TaskStatus) + default: + return fmt.Errorf("unexpected task status %q for task id: %s", taskStatus.TaskStatus, taskID) + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(chunkPollInterval): + } + } } -// createDoclingChunkRequest is like createDoclingRequest but with a longer timeout -// suitable for synchronous chunking (which includes conversion + chunking). -func (c *Client) createDoclingChunkRequest(ctx context.Context, method, endpoint string, payload []byte) ( - io.ReadCloser, error) { +func (c *Client) getChunkResult(ctx context.Context, taskID string) ([]string, error) { logger := log.FromContext(ctx) - client := &http.Client{ - Timeout: chunkingHTTPTimeout, + + taskResultURL, err := c.getTaskResultEndpoint(taskID) + if err != nil { + return nil, fmt.Errorf("failed to get task result endpoint: %w", err) } - req, err := c.createHTTPRequest(ctx, method, endpoint, payload, "Bearer %s") + logger.Info("fetching chunk result", "url", taskResultURL) + responseBody, err := c.createDoclingRequest(ctx, "GET", taskResultURL, nil) if err != nil { - return nil, fmt.Errorf("failed to create request: %w", err) + return nil, fmt.Errorf("failed to get chunk result: %w", err) } - logger.Info("sending chunk request to docling service", "url", endpoint) - resp, err := client.Do(req) + body, err := io.ReadAll(responseBody) if err != nil { - return nil, fmt.Errorf("failed to send request: %w", err) + responseBody.Close() + return nil, fmt.Errorf("failed to read chunk result body: %w", err) } + responseBody.Close() - if resp.StatusCode == http.StatusForbidden && c.ClientConfig.Key != "" { - req, err = c.createHTTPRequest(ctx, method, endpoint, payload, "%s") - if err != nil { - return nil, fmt.Errorf("failed to create request: %w", err) - } - resp, err = client.Do(req) - if err != nil { - return nil, fmt.Errorf("failed to send request: %w", err) - } + var chunkResponse ChunkDocumentResponse + if err = json.Unmarshal(body, &chunkResponse); err != nil { + return nil, fmt.Errorf("failed to decode chunk result: %w", err) } - if resp.StatusCode != http.StatusOK { - logger.Error(errors.New("received non-200 OK response from chunk endpoint"), - "status code", resp.StatusCode, "url", endpoint) - return nil, fmt.Errorf("failed to chunk file: status code %d", resp.StatusCode) + chunks := make([]string, 0, len(chunkResponse.Chunks)) + for _, chunk := range chunkResponse.Chunks { + chunks = append(chunks, chunk.Text) } - return resp.Body, nil + logger.Info("successfully chunked file via docling", "chunkCount", len(chunks)) + return chunks, nil } diff --git a/pkg/docling/chunk_test.go b/pkg/docling/chunk_test.go index 87a6bc0c..a297bbd3 100644 --- a/pkg/docling/chunk_test.go +++ b/pkg/docling/chunk_test.go @@ -37,8 +37,9 @@ func newTestClient(serverURL string, maxConcurrent int64) *Client { } func TestChunkFile_Hierarchical(t *testing.T) { - response := DoclingChunkResponse{ - Chunks: []DoclingChunk{ + taskID := "test-task-123" + chunkResult := ChunkDocumentResponse{ + Chunks: []ChunkedDocumentResultItem{ {Text: "This is the first chunk."}, {Text: "This is the second chunk."}, {Text: "This is the third chunk."}, @@ -46,23 +47,32 @@ func TestChunkFile_Hierarchical(t *testing.T) { } server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - t.Errorf("expected POST, got %s", r.Method) - } - if r.URL.Path != "/v1/chunk/hierarchical/source" { - t.Errorf("expected /v1/chunk/hierarchical/source, got %s", r.URL.Path) - } + w.Header().Set("Content-Type", "application/json") - var payload DoclingChunkRequestPayload - if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { - t.Fatalf("failed to decode request body: %v", err) - } - if len(payload.Sources) != 1 || payload.Sources[0].URL != "https://example.com/doc.pdf" { - t.Errorf("unexpected sources: %+v", payload.Sources) + switch { + case r.Method == http.MethodPost && r.URL.Path == "/v1/chunk/hierarchical/source/async": + var payload DoclingChunkRequestPayload + if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { + t.Errorf("failed to decode request body: %v", err) + } + if len(payload.Sources) != 1 || payload.Sources[0].URL != "https://example.com/doc.pdf" { + t.Errorf("unexpected sources: %+v", payload.Sources) + } + _ = json.NewEncoder(w).Encode(TaskStatusResponse{ + TaskID: taskID, + TaskStatus: TaskStatusSuccess, + }) + case r.Method == http.MethodGet && r.URL.Path == "/v1/status/poll/"+taskID: + _ = json.NewEncoder(w).Encode(TaskStatusResponse{ + TaskID: taskID, + TaskStatus: TaskStatusSuccess, + }) + case r.Method == http.MethodGet && r.URL.Path == "/v1/result/"+taskID: + _ = json.NewEncoder(w).Encode(chunkResult) + default: + t.Errorf("unexpected request: %s %s", r.Method, r.URL.Path) + w.WriteHeader(http.StatusNotFound) } - - w.Header().Set("Content-Type", "application/json") - _ = json.NewEncoder(w).Encode(response) })) defer server.Close() @@ -90,25 +100,34 @@ func TestChunkFile_Hierarchical(t *testing.T) { } func TestChunkFile_Hybrid(t *testing.T) { - response := DoclingChunkResponse{ - Chunks: []DoclingChunk{ + taskID := "test-hybrid-456" + chunkResult := ChunkDocumentResponse{ + Chunks: []ChunkedDocumentResultItem{ {Text: "Hybrid chunk one."}, {Text: "Hybrid chunk two."}, }, } server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != "/v1/chunk/hybrid/source" { - t.Errorf("expected /v1/chunk/hybrid/source, got %s", r.URL.Path) - } + w.Header().Set("Content-Type", "application/json") - var payload DoclingChunkRequestPayload - if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { - t.Fatalf("failed to decode request body: %v", err) + switch { + case r.Method == http.MethodPost && r.URL.Path == "/v1/chunk/hybrid/source/async": + _ = json.NewEncoder(w).Encode(TaskStatusResponse{ + TaskID: taskID, + TaskStatus: TaskStatusSuccess, + }) + case r.Method == http.MethodGet && r.URL.Path == "/v1/status/poll/"+taskID: + _ = json.NewEncoder(w).Encode(TaskStatusResponse{ + TaskID: taskID, + TaskStatus: TaskStatusSuccess, + }) + case r.Method == http.MethodGet && r.URL.Path == "/v1/result/"+taskID: + _ = json.NewEncoder(w).Encode(chunkResult) + default: + t.Errorf("unexpected request: %s %s", r.Method, r.URL.Path) + w.WriteHeader(http.StatusNotFound) } - - w.Header().Set("Content-Type", "application/json") - _ = json.NewEncoder(w).Encode(response) })) defer server.Close() @@ -164,22 +183,106 @@ func TestChunkFile_HTTPError(t *testing.T) { } } -func TestChunkSourceEndpoint(t *testing.T) { +func TestChunkFile_AsyncPolling(t *testing.T) { + taskID := "poll-task-789" + pollCount := 0 + chunkResult := ChunkDocumentResponse{ + Chunks: []ChunkedDocumentResultItem{ + {Text: "polled chunk"}, + }, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + switch { + case r.Method == http.MethodPost && r.URL.Path == "/v1/chunk/hierarchical/source/async": + _ = json.NewEncoder(w).Encode(TaskStatusResponse{ + TaskID: taskID, + TaskStatus: TaskStatusPending, + }) + case r.Method == http.MethodGet && r.URL.Path == "/v1/status/poll/"+taskID: + pollCount++ + status := TaskStatusStarted + if pollCount >= 2 { + status = TaskStatusSuccess + } + _ = json.NewEncoder(w).Encode(TaskStatusResponse{ + TaskID: taskID, + TaskStatus: status, + }) + case r.Method == http.MethodGet && r.URL.Path == "/v1/result/"+taskID: + _ = json.NewEncoder(w).Encode(chunkResult) + default: + t.Errorf("unexpected request: %s %s", r.Method, r.URL.Path) + w.WriteHeader(http.StatusNotFound) + } + })) + defer server.Close() + + client := newTestClient(server.URL, 5) + + chunks, err := client.ChunkFile(context.Background(), "https://example.com/doc.pdf", ChunkTypeHierarchical, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(chunks) != 1 || chunks[0] != "polled chunk" { + t.Errorf("unexpected chunks: %v", chunks) + } + if pollCount < 2 { + t.Errorf("expected at least 2 poll calls, got %d", pollCount) + } +} + +func TestChunkFile_AsyncTaskFailure(t *testing.T) { + taskID := "fail-task-000" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + switch { + case r.Method == http.MethodPost && r.URL.Path == "/v1/chunk/hierarchical/source/async": + _ = json.NewEncoder(w).Encode(TaskStatusResponse{ + TaskID: taskID, + TaskStatus: TaskStatusPending, + }) + case r.Method == http.MethodGet && r.URL.Path == "/v1/status/poll/"+taskID: + _ = json.NewEncoder(w).Encode(TaskStatusResponse{ + TaskID: taskID, + TaskStatus: TaskStatusFailure, + }) + default: + t.Errorf("unexpected request: %s %s", r.Method, r.URL.Path) + w.WriteHeader(http.StatusNotFound) + } + })) + defer server.Close() + + client := newTestClient(server.URL, 5) + + _, err := client.ChunkFile(context.Background(), "https://example.com/doc.pdf", ChunkTypeHierarchical, nil) + if err == nil { + t.Fatal("expected error for failed task, got nil") + } +} + +func TestChunkSourceAsyncEndpoint(t *testing.T) { client := newTestClient("http://localhost:5001", 1) - hierarchicalEndpoint, err := client.chunkSourceEndpoint(ChunkTypeHierarchical) + hierarchicalEndpoint, err := client.chunkSourceAsyncEndpoint(ChunkTypeHierarchical) if err != nil { t.Fatalf("unexpected error: %v", err) } - if hierarchicalEndpoint != "http://localhost:5001/v1/chunk/hierarchical/source" { + if hierarchicalEndpoint != "http://localhost:5001/v1/chunk/hierarchical/source/async" { t.Errorf("unexpected endpoint: %s", hierarchicalEndpoint) } - hybridEndpoint, err := client.chunkSourceEndpoint(ChunkTypeHybrid) + hybridEndpoint, err := client.chunkSourceAsyncEndpoint(ChunkTypeHybrid) if err != nil { t.Fatalf("unexpected error: %v", err) } - if hybridEndpoint != "http://localhost:5001/v1/chunk/hybrid/source" { + if hybridEndpoint != "http://localhost:5001/v1/chunk/hybrid/source/async" { t.Errorf("unexpected endpoint: %s", hybridEndpoint) } } @@ -262,13 +365,22 @@ func TestChunkRequestPayload_HybridSerialization(t *testing.T) { } func TestChunkFile_EmptyResponse(t *testing.T) { - response := DoclingChunkResponse{ - Chunks: []DoclingChunk{}, + taskID := "empty-task" + chunkResult := ChunkDocumentResponse{ + Chunks: []ChunkedDocumentResultItem{}, } - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") - _ = json.NewEncoder(w).Encode(response) + + switch { + case r.Method == http.MethodPost: + _ = json.NewEncoder(w).Encode(TaskStatusResponse{TaskID: taskID, TaskStatus: TaskStatusSuccess}) + case r.URL.Path == "/v1/status/poll/"+taskID: + _ = json.NewEncoder(w).Encode(TaskStatusResponse{TaskID: taskID, TaskStatus: TaskStatusSuccess}) + case r.URL.Path == "/v1/result/"+taskID: + _ = json.NewEncoder(w).Encode(chunkResult) + } })) defer server.Close() @@ -285,12 +397,23 @@ func TestChunkFile_EmptyResponse(t *testing.T) { } func TestChunkFile_AuthHeader(t *testing.T) { + taskID := "auth-task" var receivedAuthHeader string server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - receivedAuthHeader = r.Header.Get("Authorization") + if r.Method == http.MethodPost { + receivedAuthHeader = r.Header.Get("Authorization") + } w.Header().Set("Content-Type", "application/json") - _ = json.NewEncoder(w).Encode(DoclingChunkResponse{Chunks: []DoclingChunk{}}) + + switch { + case r.Method == http.MethodPost: + _ = json.NewEncoder(w).Encode(TaskStatusResponse{TaskID: taskID, TaskStatus: TaskStatusSuccess}) + case r.URL.Path == "/v1/status/poll/"+taskID: + _ = json.NewEncoder(w).Encode(TaskStatusResponse{TaskID: taskID, TaskStatus: TaskStatusSuccess}) + case r.URL.Path == "/v1/result/"+taskID: + _ = json.NewEncoder(w).Encode(ChunkDocumentResponse{Chunks: []ChunkedDocumentResultItem{}}) + } })) defer server.Close() diff --git a/pkg/unstructured/chunking_test.go b/pkg/unstructured/chunking_test.go index fce27315..24af232d 100644 --- a/pkg/unstructured/chunking_test.go +++ b/pkg/unstructured/chunking_test.go @@ -33,21 +33,43 @@ func newTestDoclingClient(serverURL string) *docling.Client { }) } -func TestDoclingHierarchicalChunker_ImplementsChunker(t *testing.T) { - response := docling.DoclingChunkResponse{ - Chunks: []docling.DoclingChunk{ - {Text: "chunk one"}, - {Text: "chunk two"}, - }, - } +// asyncChunkServer creates a test server that handles the async chunk flow: +// POST /v1/chunk/{type}/source/async -> TaskStatusResponse +// GET /v1/status/poll/{taskID} -> TaskStatusResponse (success) +// GET /v1/result/{taskID} -> ChunkDocumentResponse +func asyncChunkServer(t *testing.T, expectedPath string, taskID string, chunks []docling.ChunkedDocumentResultItem) *httptest.Server { + t.Helper() + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != "/v1/chunk/hierarchical/source" { - t.Errorf("expected /v1/chunk/hierarchical/source, got %s", r.URL.Path) + switch { + case r.Method == http.MethodPost && r.URL.Path == expectedPath: + _ = json.NewEncoder(w).Encode(docling.TaskStatusResponse{ + TaskID: taskID, + TaskStatus: docling.TaskStatusSuccess, + }) + case r.Method == http.MethodGet && r.URL.Path == "/v1/status/poll/"+taskID: + _ = json.NewEncoder(w).Encode(docling.TaskStatusResponse{ + TaskID: taskID, + TaskStatus: docling.TaskStatusSuccess, + }) + case r.Method == http.MethodGet && r.URL.Path == "/v1/result/"+taskID: + _ = json.NewEncoder(w).Encode(docling.ChunkDocumentResponse{ + Chunks: chunks, + }) + default: + t.Errorf("unexpected request: %s %s", r.Method, r.URL.Path) + w.WriteHeader(http.StatusNotFound) } - w.Header().Set("Content-Type", "application/json") - _ = json.NewEncoder(w).Encode(response) })) +} + +func TestDoclingHierarchicalChunker_ImplementsChunker(t *testing.T) { + server := asyncChunkServer(t, "/v1/chunk/hierarchical/source/async", "hier-task-1", + []docling.ChunkedDocumentResultItem{ + {Text: "chunk one"}, + {Text: "chunk two"}, + }) defer server.Close() mergeListItems := true @@ -75,21 +97,12 @@ func TestDoclingHierarchicalChunker_ImplementsChunker(t *testing.T) { } func TestDoclingHybridChunker_ImplementsChunker(t *testing.T) { - response := docling.DoclingChunkResponse{ - Chunks: []docling.DoclingChunk{ + server := asyncChunkServer(t, "/v1/chunk/hybrid/source/async", "hybrid-task-1", + []docling.ChunkedDocumentResultItem{ {Text: "hybrid one"}, {Text: "hybrid two"}, {Text: "hybrid three"}, - }, - } - - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != "/v1/chunk/hybrid/source" { - t.Errorf("expected /v1/chunk/hybrid/source, got %s", r.URL.Path) - } - w.Header().Set("Content-Type", "application/json") - _ = json.NewEncoder(w).Encode(response) - })) + }) defer server.Close() mergePeers := false