diff --git a/api/v1alpha1/unstructureddataproduct_types.go b/api/v1alpha1/unstructureddataproduct_types.go index 6e526689..34eab491 100644 --- a/api/v1alpha1/unstructureddataproduct_types.go +++ b/api/v1alpha1/unstructureddataproduct_types.go @@ -17,6 +17,8 @@ limitations under the License. package v1alpha1 import ( + "fmt" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -27,15 +29,30 @@ type ( ) const ( - SourceTypeS3 UnstructuredDataSourceType = "s3" - DestinationTypeInternalStage UnstructuredDataDestinationType = "snowflakeInternalStage" - ChunkingStrategyRecursiveCharacter ChunkingStrategy = "recursiveCharacterTextSplitter" - ChunkingStrategyMarkdown ChunkingStrategy = "markdownTextSplitter" - ChunkingStrategyToken ChunkingStrategy = "tokenTextSplitter" + SourceTypeS3 UnstructuredDataSourceType = "s3" + DestinationTypeInternalStage UnstructuredDataDestinationType = "snowflakeInternalStage" + ChunkingStrategyRecursiveCharacter ChunkingStrategy = "recursiveCharacterTextSplitter" + ChunkingStrategyMarkdown ChunkingStrategy = "markdownTextSplitter" + ChunkingStrategyToken ChunkingStrategy = "tokenTextSplitter" + ChunkingStrategyDoclingHierarchical ChunkingStrategy = "doclingHierarchicalChunker" + ChunkingStrategyDoclingHybrid ChunkingStrategy = "doclingHybridChunker" UnstructuredDataProductCondition = "UnstructuredDataProductReady" ) +func IsDoclingChunkingStrategy(strategy ChunkingStrategy) bool { + return strategy == ChunkingStrategyDoclingHierarchical || strategy == ChunkingStrategyDoclingHybrid +} + +func (s *UnstructuredDataProductSpec) ValidateSpec() error { + if IsDoclingChunkingStrategy(s.ChunksGeneratorConfig.Strategy) { + if s.DocumentProcessorConfig != nil { + return fmt.Errorf("documentProcessorConfig must not be set when using Docling chunking strategy %q; Docling performs conversion and chunking in a single step", s.ChunksGeneratorConfig.Strategy) + } + } + return nil +} + type DocumentProcessorConfig struct { Type string `json:"type,omitempty"` DoclingConfig DoclingConfig `json:"doclingConfig,omitempty"` @@ -59,6 +76,8 @@ type ChunksGeneratorConfig struct { RecursiveCharacterSplitterConfig RecursiveCharacterSplitterConfig `json:"recursiveCharacterSplitterConfig,omitempty"` MarkdownSplitterConfig MarkdownSplitterConfig `json:"markdownSplitterConfig,omitempty"` TokenSplitterConfig TokenSplitterConfig `json:"tokenSplitterConfig,omitempty"` + DoclingHierarchicalChunkerConfig DoclingHierarchicalChunkerConfig `json:"doclingHierarchicalChunkerConfig,omitempty"` + DoclingHybridChunkerConfig DoclingHybridChunkerConfig `json:"doclingHybridChunkerConfig,omitempty"` } type RecursiveCharacterSplitterConfig struct { @@ -86,12 +105,22 @@ type TokenSplitterConfig struct { DisallowedSpecial []string `json:"disallowedSpecial,omitempty"` } +type DoclingHierarchicalChunkerConfig struct { + MergeListItems *bool `json:"mergeListItems,omitempty"` +} + +type DoclingHybridChunkerConfig struct { + Tokenizer string `json:"tokenizer,omitempty"` + MaxTokens int `json:"maxTokens,omitempty"` + MergePeers *bool `json:"mergePeers,omitempty"` +} + // UnstructuredDataProductSpec defines the desired state of UnstructuredDataProduct type UnstructuredDataProductSpec struct { - SourceConfig SourceConfig `json:"sourceConfig,omitempty"` - DestinationConfig DestinationConfig `json:"destinationConfig,omitempty"` - DocumentProcessorConfig DocumentProcessorConfig `json:"documentProcessorConfig,omitempty"` - ChunksGeneratorConfig ChunksGeneratorConfig `json:"chunksGeneratorConfig,omitempty"` + SourceConfig SourceConfig `json:"sourceConfig,omitempty"` + DestinationConfig DestinationConfig `json:"destinationConfig,omitempty"` + DocumentProcessorConfig *DocumentProcessorConfig `json:"documentProcessorConfig,omitempty"` + ChunksGeneratorConfig ChunksGeneratorConfig `json:"chunksGeneratorConfig,omitempty"` } type SourceConfig struct { diff --git a/api/v1alpha1/unstructureddataproduct_types_test.go b/api/v1alpha1/unstructureddataproduct_types_test.go new file mode 100644 index 00000000..d9110b1f --- /dev/null +++ b/api/v1alpha1/unstructureddataproduct_types_test.go @@ -0,0 +1,150 @@ +/* +Copyright 2026. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "testing" +) + +func TestValidateSpec(t *testing.T) { + tests := []struct { + name string + spec UnstructuredDataProductSpec + wantErr bool + }{ + { + name: "docling hierarchical with DocumentProcessorConfig set returns error", + spec: UnstructuredDataProductSpec{ + ChunksGeneratorConfig: ChunksGeneratorConfig{ + Strategy: ChunkingStrategyDoclingHierarchical, + }, + DocumentProcessorConfig: &DocumentProcessorConfig{ + Type: "docling", + }, + }, + wantErr: true, + }, + { + name: "docling hybrid with DocumentProcessorConfig set returns error", + spec: UnstructuredDataProductSpec{ + ChunksGeneratorConfig: ChunksGeneratorConfig{ + Strategy: ChunkingStrategyDoclingHybrid, + }, + DocumentProcessorConfig: &DocumentProcessorConfig{ + Type: "docling", + }, + }, + wantErr: true, + }, + { + name: "docling hierarchical without DocumentProcessorConfig returns no error", + spec: UnstructuredDataProductSpec{ + ChunksGeneratorConfig: ChunksGeneratorConfig{ + Strategy: ChunkingStrategyDoclingHierarchical, + }, + }, + wantErr: false, + }, + { + name: "docling hybrid without DocumentProcessorConfig returns no error", + spec: UnstructuredDataProductSpec{ + ChunksGeneratorConfig: ChunksGeneratorConfig{ + Strategy: ChunkingStrategyDoclingHybrid, + }, + }, + wantErr: false, + }, + { + name: "non-docling strategy with DocumentProcessorConfig set returns no error", + spec: UnstructuredDataProductSpec{ + ChunksGeneratorConfig: ChunksGeneratorConfig{ + Strategy: ChunkingStrategyRecursiveCharacter, + }, + DocumentProcessorConfig: &DocumentProcessorConfig{ + Type: "docling", + }, + }, + wantErr: false, + }, + { + name: "non-docling strategy without DocumentProcessorConfig returns no error", + spec: UnstructuredDataProductSpec{ + ChunksGeneratorConfig: ChunksGeneratorConfig{ + Strategy: ChunkingStrategyMarkdown, + }, + }, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.spec.ValidateSpec() + if (err != nil) != tt.wantErr { + t.Errorf("ValidateSpec() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestIsDoclingChunkingStrategy(t *testing.T) { + tests := []struct { + name string + strategy ChunkingStrategy + want bool + }{ + { + name: "docling hierarchical", + strategy: ChunkingStrategyDoclingHierarchical, + want: true, + }, + { + name: "docling hybrid", + strategy: ChunkingStrategyDoclingHybrid, + want: true, + }, + { + name: "recursive character", + strategy: ChunkingStrategyRecursiveCharacter, + want: false, + }, + { + name: "markdown", + strategy: ChunkingStrategyMarkdown, + want: false, + }, + { + name: "token", + strategy: ChunkingStrategyToken, + want: false, + }, + { + name: "empty string", + strategy: "", + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := IsDoclingChunkingStrategy(tt.strategy) + if got != tt.want { + t.Errorf("IsDoclingChunkingStrategy(%q) = %v, want %v", tt.strategy, got, tt.want) + } + }) + } +} diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 91cf9575..333b4a80 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -58,6 +58,8 @@ func (in *ChunksGeneratorConfig) DeepCopyInto(out *ChunksGeneratorConfig) { in.RecursiveCharacterSplitterConfig.DeepCopyInto(&out.RecursiveCharacterSplitterConfig) out.MarkdownSplitterConfig = in.MarkdownSplitterConfig in.TokenSplitterConfig.DeepCopyInto(&out.TokenSplitterConfig) + in.DoclingHierarchicalChunkerConfig.DeepCopyInto(&out.DoclingHierarchicalChunkerConfig) + in.DoclingHybridChunkerConfig.DeepCopyInto(&out.DoclingHybridChunkerConfig) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ChunksGeneratorConfig. @@ -284,6 +286,46 @@ func (in *DoclingConfig) DeepCopy() *DoclingConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DoclingHierarchicalChunkerConfig) DeepCopyInto(out *DoclingHierarchicalChunkerConfig) { + *out = *in + if in.MergeListItems != nil { + in, out := &in.MergeListItems, &out.MergeListItems + *out = new(bool) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DoclingHierarchicalChunkerConfig. +func (in *DoclingHierarchicalChunkerConfig) DeepCopy() *DoclingHierarchicalChunkerConfig { + if in == nil { + return nil + } + out := new(DoclingHierarchicalChunkerConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DoclingHybridChunkerConfig) DeepCopyInto(out *DoclingHybridChunkerConfig) { + *out = *in + if in.MergePeers != nil { + in, out := &in.MergePeers, &out.MergePeers + *out = new(bool) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DoclingHybridChunkerConfig. +func (in *DoclingHybridChunkerConfig) DeepCopy() *DoclingHybridChunkerConfig { + if in == nil { + return nil + } + out := new(DoclingHybridChunkerConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DocumentProcessor) DeepCopyInto(out *DocumentProcessor) { *out = *in @@ -721,7 +763,11 @@ func (in *UnstructuredDataProductSpec) DeepCopyInto(out *UnstructuredDataProduct *out = *in out.SourceConfig = in.SourceConfig out.DestinationConfig = in.DestinationConfig - in.DocumentProcessorConfig.DeepCopyInto(&out.DocumentProcessorConfig) + if in.DocumentProcessorConfig != nil { + in, out := &in.DocumentProcessorConfig, &out.DocumentProcessorConfig + *out = new(DocumentProcessorConfig) + (*in).DeepCopyInto(*out) + } in.ChunksGeneratorConfig.DeepCopyInto(&out.ChunksGeneratorConfig) } diff --git a/config/crd/bases/operator.dataverse.redhat.com_chunksgenerators.yaml b/config/crd/bases/operator.dataverse.redhat.com_chunksgenerators.yaml index 333c825d..79f45427 100644 --- a/config/crd/bases/operator.dataverse.redhat.com_chunksgenerators.yaml +++ b/config/crd/bases/operator.dataverse.redhat.com_chunksgenerators.yaml @@ -48,6 +48,20 @@ spec: properties: config: properties: + doclingHierarchicalChunkerConfig: + properties: + mergeListItems: + type: boolean + type: object + doclingHybridChunkerConfig: + properties: + maxTokens: + type: integer + mergePeers: + type: boolean + tokenizer: + type: string + type: object markdownSplitterConfig: properties: chunkOverlap: diff --git a/config/crd/bases/operator.dataverse.redhat.com_unstructureddataproducts.yaml b/config/crd/bases/operator.dataverse.redhat.com_unstructureddataproducts.yaml index a6d8f803..47a37ab7 100644 --- a/config/crd/bases/operator.dataverse.redhat.com_unstructureddataproducts.yaml +++ b/config/crd/bases/operator.dataverse.redhat.com_unstructureddataproducts.yaml @@ -50,6 +50,20 @@ spec: properties: chunksGeneratorConfig: properties: + doclingHierarchicalChunkerConfig: + properties: + mergeListItems: + type: boolean + type: object + doclingHybridChunkerConfig: + properties: + maxTokens: + type: integer + mergePeers: + type: boolean + tokenizer: + type: string + type: object markdownSplitterConfig: properties: chunkOverlap: diff --git a/internal/controller/chunksgenerator_controller.go b/internal/controller/chunksgenerator_controller.go index 24337b45..7c79809c 100644 --- a/internal/controller/chunksgenerator_controller.go +++ b/internal/controller/chunksgenerator_controller.go @@ -32,6 +32,7 @@ import ( operatorv1alpha1 "github.com/redhat-data-and-ai/unstructured-data-controller/api/v1alpha1" "github.com/redhat-data-and-ai/unstructured-data-controller/internal/controller/controllerutils" + "github.com/redhat-data-and-ai/unstructured-data-controller/pkg/docling" "github.com/redhat-data-and-ai/unstructured-data-controller/pkg/filestore" "github.com/redhat-data-and-ai/unstructured-data-controller/pkg/langchain" "github.com/redhat-data-and-ai/unstructured-data-controller/pkg/unstructured" @@ -130,22 +131,39 @@ func (r *ChunksGeneratorReconciler) Reconcile(ctx context.Context, req ctrl.Requ chunkingErrors := []error{} skippedFiles := []string{} - convertedFilePaths := unstructured.FilterConvertedFilePaths(filePaths) - - for _, convertedFilePath := range convertedFilePaths { - logger.Info("processing converted file", "file", convertedFilePath) - - if err := r.processConvertedFile(ctx, convertedFilePath, chunksGeneratorCR); err != nil { - if strings.Contains(err.Error(), langchain.SemaphoreAcquireError) { - logger.Error(err, "failed to process converted file, semaphore acquire error, will try again later", "file", convertedFilePath) - skippedFiles = append(skippedFiles, convertedFilePath) - continue + isDoclingChunking := operatorv1alpha1.IsDoclingChunkingStrategy(chunksGeneratorCR.Spec.ChunksGeneratorConfig.Strategy) + + if isDoclingChunking { + // For Docling chunking: iterate over raw files directly (no converted files needed) + rawFilePaths := unstructured.FilterRawFilePaths(filePaths) + for _, rawFilePath := range rawFilePaths { + logger.Info("processing raw file for Docling chunking", "file", rawFilePath) + if err := r.processRawFile(ctx, rawFilePath, chunksGeneratorCR); err != nil { + if strings.Contains(err.Error(), docling.SemaphoreAcquireError) { + logger.Error(err, "semaphore acquire error, will try again later", "file", rawFilePath) + skippedFiles = append(skippedFiles, rawFilePath) + continue + } + chunkingErrors = append(chunkingErrors, err) + logger.Error(err, "failed to process raw file", "file", rawFilePath) + } + } + } else { + // For LangChain chunking: iterate over converted files (existing behavior) + convertedFilePaths := unstructured.FilterConvertedFilePaths(filePaths) + for _, convertedFilePath := range convertedFilePaths { + logger.Info("processing converted file", "file", convertedFilePath) + if err := r.processConvertedFile(ctx, convertedFilePath, chunksGeneratorCR); err != nil { + if strings.Contains(err.Error(), langchain.SemaphoreAcquireError) || + strings.Contains(err.Error(), docling.SemaphoreAcquireError) { + logger.Error(err, "semaphore acquire error, will try again later", "file", convertedFilePath) + skippedFiles = append(skippedFiles, convertedFilePath) + continue + } + chunkingErrors = append(chunkingErrors, err) + logger.Error(err, "failed to process converted file", "file", convertedFilePath) } - - chunkingErrors = append(chunkingErrors, err) - logger.Error(err, "failed to process converted file", "file", convertedFilePath) } - logger.Info("successfully processed converted file", "file", convertedFilePath) } if len(chunkingErrors) > 0 { @@ -259,9 +277,17 @@ func (r *ChunksGeneratorReconciler) needsChunking(ctx context.Context, converted } // now the chunks file should be the same as the current chunks file in filestore + var chunkingTool unstructured.ChunkingTool + switch chunksGeneratorCR.Spec.ChunksGeneratorConfig.Strategy { + case operatorv1alpha1.ChunkingStrategyDoclingHierarchical, operatorv1alpha1.ChunkingStrategyDoclingHybrid: + chunkingTool = unstructured.DoclingChunkingTool + default: + chunkingTool = unstructured.LangchainChunkingTool + } + newChunksFileMetadata := unstructured.ChunksFileMetadata{ ConvertedFileMetadata: &convertedFileMetadata, - ChunkingTool: unstructured.LangchainChunkingTool, + ChunkingTool: chunkingTool, ChunksGeneratorConfig: chunksGeneratorCR.Spec.ChunksGeneratorConfig, } if !chunksFile.ChunksDocument.Metadata.Equal(&newChunksFileMetadata) { @@ -289,6 +315,9 @@ func (r *ChunksGeneratorReconciler) chunkFile(ctx context.Context, convertedFile } var chunker unstructured.Chunker + var chunkingTool unstructured.ChunkingTool + var input string + switch chunksGeneratorCR.Spec.ChunksGeneratorConfig.Strategy { case operatorv1alpha1.ChunkingStrategyRecursiveCharacter: chunker = &unstructured.RecursiveCharacterSplitter{ @@ -300,6 +329,8 @@ func (r *ChunksGeneratorReconciler) chunkFile(ctx context.Context, convertedFile KeepSeparator: chunksGeneratorCR.Spec.ChunksGeneratorConfig.RecursiveCharacterSplitterConfig.KeepSeparator, }, } + chunkingTool = unstructured.LangchainChunkingTool + input = convertedFile.ConvertedDocument.Content.Markdown case operatorv1alpha1.ChunkingStrategyMarkdown: chunker = &unstructured.MarkdownSplitter{ LangchainClient: langchainClient, @@ -312,6 +343,8 @@ func (r *ChunksGeneratorReconciler) chunkFile(ctx context.Context, convertedFile JoinTableRows: chunksGeneratorCR.Spec.ChunksGeneratorConfig.MarkdownSplitterConfig.JoinTableRows, }, } + chunkingTool = unstructured.LangchainChunkingTool + input = convertedFile.ConvertedDocument.Content.Markdown case operatorv1alpha1.ChunkingStrategyToken: chunker = &unstructured.TokenSplitter{ LangchainClient: langchainClient, @@ -324,11 +357,43 @@ func (r *ChunksGeneratorReconciler) chunkFile(ctx context.Context, convertedFile DisallowedSpecial: chunksGeneratorCR.Spec.ChunksGeneratorConfig.TokenSplitterConfig.DisallowedSpecial, }, } + chunkingTool = unstructured.LangchainChunkingTool + input = convertedFile.ConvertedDocument.Content.Markdown + case operatorv1alpha1.ChunkingStrategyDoclingHierarchical: + rawFilePath := unstructured.GetRawFilePathFromConvertedFilePath(convertedFilePath) + fileURL, err := r.fileStore.GetFileURL(ctx, rawFilePath) + if err != nil { + return nil, fmt.Errorf("failed to get presigned URL for raw file: %w", err) + } + chunker = &unstructured.DoclingHierarchicalChunker{ + DoclingClient: doclingClient, + Options: &docling.HierarchicalChunkingOptions{ + MergeListItems: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHierarchicalChunkerConfig.MergeListItems, + }, + } + chunkingTool = unstructured.DoclingChunkingTool + input = fileURL + case operatorv1alpha1.ChunkingStrategyDoclingHybrid: + rawFilePath := unstructured.GetRawFilePathFromConvertedFilePath(convertedFilePath) + fileURL, err := r.fileStore.GetFileURL(ctx, rawFilePath) + if err != nil { + return nil, fmt.Errorf("failed to get presigned URL for raw file: %w", err) + } + chunker = &unstructured.DoclingHybridChunker{ + DoclingClient: doclingClient, + Options: &docling.HybridChunkingOptions{ + Tokenizer: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHybridChunkerConfig.Tokenizer, + MaxTokens: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHybridChunkerConfig.MaxTokens, + MergePeers: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHybridChunkerConfig.MergePeers, + }, + } + chunkingTool = unstructured.DoclingChunkingTool + input = fileURL default: return nil, fmt.Errorf("invalid strategy: %s", chunksGeneratorCR.Spec.ChunksGeneratorConfig.Strategy) } - chunks, err := chunker.Chunk(convertedFile.ConvertedDocument.Content.Markdown) + chunks, err := chunker.Chunk(ctx, input) if err != nil { return nil, err } @@ -337,7 +402,7 @@ func (r *ChunksGeneratorReconciler) chunkFile(ctx context.Context, convertedFile ConvertedDocument: convertedFile.ConvertedDocument, ChunksDocument: &unstructured.ChunksDocument{ Metadata: &unstructured.ChunksFileMetadata{ - ChunkingTool: unstructured.LangchainChunkingTool, + ChunkingTool: chunkingTool, ChunksGeneratorConfig: chunksGeneratorCR.Spec.ChunksGeneratorConfig, ConvertedFileMetadata: convertedFile.ConvertedDocument.Metadata, }, @@ -348,6 +413,129 @@ func (r *ChunksGeneratorReconciler) chunkFile(ctx context.Context, convertedFile }, nil } +func (r *ChunksGeneratorReconciler) processRawFile(ctx context.Context, rawFilePath string, chunksGeneratorCR *operatorv1alpha1.ChunksGenerator) error { + logger := log.FromContext(ctx) + logger.Info("processing raw file for Docling chunking", "file", rawFilePath) + + chunksFilePath := unstructured.GetChunksFilePath(rawFilePath) + + needsChunking, err := r.needsChunkingForRawFile(ctx, rawFilePath, chunksGeneratorCR) + if err != nil { + logger.Error(err, "failed to check if file needs chunking") + return err + } + if !needsChunking { + logger.Info("file is already chunked, skipping ...") + return nil + } + + chunksFile, err := r.chunkRawFile(ctx, rawFilePath, chunksGeneratorCR) + if err != nil { + logger.Error(err, "failed to chunk file") + return err + } + + chunksFileBytes, err := json.Marshal(chunksFile) + if err != nil { + logger.Error(err, "failed to marshal chunks file") + return err + } + if err := r.fileStore.Store(ctx, chunksFilePath, chunksFileBytes); err != nil { + logger.Error(err, "failed to store chunks file") + return err + } + return nil +} + +func (r *ChunksGeneratorReconciler) needsChunkingForRawFile(ctx context.Context, rawFilePath string, chunksGeneratorCR *operatorv1alpha1.ChunksGenerator) (bool, error) { + logger := log.FromContext(ctx) + logger.Info("checking if raw file needs chunking", "file", rawFilePath) + + chunksFilePath := unstructured.GetChunksFilePath(rawFilePath) + + chunksFileExists, err := r.fileStore.Exists(ctx, chunksFilePath) + if err != nil { + return false, err + } + if !chunksFileExists { + return true, nil + } + + chunksFileRaw, err := r.fileStore.Retrieve(ctx, chunksFilePath) + if err != nil { + return false, err + } + + chunksFile := unstructured.ChunksFile{} + if err = json.Unmarshal(chunksFileRaw, &chunksFile); err != nil { + return false, err + } + + newChunksFileMetadata := unstructured.ChunksFileMetadata{ + ConvertedFileMetadata: nil, + ChunkingTool: unstructured.DoclingChunkingTool, + ChunksGeneratorConfig: chunksGeneratorCR.Spec.ChunksGeneratorConfig, + } + if !chunksFile.ChunksDocument.Metadata.Equal(&newChunksFileMetadata) { + logger.Info("chunks file configuration differs, re-chunking needed", "file", rawFilePath) + return true, nil + } + + return false, nil +} + +func (r *ChunksGeneratorReconciler) chunkRawFile(ctx context.Context, rawFilePath string, chunksGeneratorCR *operatorv1alpha1.ChunksGenerator) (*unstructured.ChunksFile, error) { + logger := log.FromContext(ctx) + logger.Info("chunking raw file via Docling", "file", rawFilePath) + + fileURL, err := r.fileStore.GetFileURL(ctx, rawFilePath) + if err != nil { + return nil, fmt.Errorf("failed to get presigned URL for raw file: %w", err) + } + + var chunker unstructured.Chunker + + switch chunksGeneratorCR.Spec.ChunksGeneratorConfig.Strategy { + case operatorv1alpha1.ChunkingStrategyDoclingHierarchical: + chunker = &unstructured.DoclingHierarchicalChunker{ + DoclingClient: doclingClient, + Options: &docling.HierarchicalChunkingOptions{ + MergeListItems: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHierarchicalChunkerConfig.MergeListItems, + }, + } + case operatorv1alpha1.ChunkingStrategyDoclingHybrid: + chunker = &unstructured.DoclingHybridChunker{ + DoclingClient: doclingClient, + Options: &docling.HybridChunkingOptions{ + Tokenizer: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHybridChunkerConfig.Tokenizer, + MaxTokens: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHybridChunkerConfig.MaxTokens, + MergePeers: chunksGeneratorCR.Spec.ChunksGeneratorConfig.DoclingHybridChunkerConfig.MergePeers, + }, + } + default: + return nil, fmt.Errorf("chunkRawFile called with non-Docling strategy: %s", chunksGeneratorCR.Spec.ChunksGeneratorConfig.Strategy) + } + + chunks, err := chunker.Chunk(ctx, fileURL) + if err != nil { + return nil, err + } + + return &unstructured.ChunksFile{ + ConvertedDocument: nil, + ChunksDocument: &unstructured.ChunksDocument{ + Metadata: &unstructured.ChunksFileMetadata{ + ChunkingTool: unstructured.DoclingChunkingTool, + ChunksGeneratorConfig: chunksGeneratorCR.Spec.ChunksGeneratorConfig, + ConvertedFileMetadata: nil, + }, + Chunks: &unstructured.Chunks{ + Text: chunks, + }, + }, + }, nil +} + // SetupWithManager sets up the controller with the Manager. func (r *ChunksGeneratorReconciler) SetupWithManager(mgr ctrl.Manager) error { labelPredicate := controllerutils.ForceReconcilePredicate() diff --git a/internal/controller/unstructureddataproduct_controller.go b/internal/controller/unstructureddataproduct_controller.go index afdb0b55..f91cd2d2 100644 --- a/internal/controller/unstructureddataproduct_controller.go +++ b/internal/controller/unstructureddataproduct_controller.go @@ -68,6 +68,11 @@ func (r *UnstructuredDataProductReconciler) Reconcile(ctx context.Context, req c } dataProductName := unstructuredDataProductCR.Name + if err := unstructuredDataProductCR.Spec.ValidateSpec(); err != nil { + logger.Error(err, "spec validation failed") + return r.handleError(ctx, unstructuredDataProductCR, err) + } + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { latest := &operatorv1alpha1.UnstructuredDataProduct{} if err := r.Get(ctx, req.NamespacedName, latest); err != nil { @@ -88,30 +93,31 @@ func (r *UnstructuredDataProductReconciler) Reconcile(ctx context.Context, req c } r.sf = sf - // first, let's create (or update) the DocumentProcessor CR for this data product - documentProcessorCR := &operatorv1alpha1.DocumentProcessor{ - ObjectMeta: metav1.ObjectMeta{ - Name: dataProductName, - Namespace: unstructuredDataProductCR.Namespace, - }, - Spec: operatorv1alpha1.DocumentProcessorSpec{ - DataProduct: dataProductName, - DocumentProcessorConfig: unstructuredDataProductCR.Spec.DocumentProcessorConfig, - }, - } - // result, err := kubecontrollerutil.CreateOrUpdate(ctx, r.Client, documentProcessorCR, func() error { return nil }) - result, err := controllerutil.CreateOrUpdate(ctx, r.Client, documentProcessorCR, func() error { - documentProcessorCR.Spec = operatorv1alpha1.DocumentProcessorSpec{ - DataProduct: dataProductName, - DocumentProcessorConfig: unstructuredDataProductCR.Spec.DocumentProcessorConfig, + // create (or update) the DocumentProcessor CR only if DocumentProcessorConfig is specified + if unstructuredDataProductCR.Spec.DocumentProcessorConfig != nil { + documentProcessorCR := &operatorv1alpha1.DocumentProcessor{ + ObjectMeta: metav1.ObjectMeta{ + Name: dataProductName, + Namespace: unstructuredDataProductCR.Namespace, + }, + Spec: operatorv1alpha1.DocumentProcessorSpec{ + DataProduct: dataProductName, + DocumentProcessorConfig: *unstructuredDataProductCR.Spec.DocumentProcessorConfig, + }, } - return nil - }) - if err != nil { - logger.Error(err, "failed to create/update DocumentProcessor CR") - return r.handleError(ctx, unstructuredDataProductCR, err) + result, err := controllerutil.CreateOrUpdate(ctx, r.Client, documentProcessorCR, func() error { + documentProcessorCR.Spec = operatorv1alpha1.DocumentProcessorSpec{ + DataProduct: dataProductName, + DocumentProcessorConfig: *unstructuredDataProductCR.Spec.DocumentProcessorConfig, + } + return nil + }) + if err != nil { + logger.Error(err, "failed to create/update DocumentProcessor CR") + return r.handleError(ctx, unstructuredDataProductCR, err) + } + logger.Info("DocumentProcessor CR created/updated", "result", result) } - logger.Info("DocumentProcessor CR created/updated", "result", result) // create ChunksGenerator CR for this data product here chunksGeneratorCR := &operatorv1alpha1.ChunksGenerator{ @@ -124,12 +130,12 @@ func (r *UnstructuredDataProductReconciler) Reconcile(ctx context.Context, req c ChunksGeneratorConfig: unstructuredDataProductCR.Spec.ChunksGeneratorConfig, }, } - result, err = controllerutil.CreateOrUpdate(ctx, r.Client, chunksGeneratorCR, func() error { return nil }) + chunksResult, err := controllerutil.CreateOrUpdate(ctx, r.Client, chunksGeneratorCR, func() error { return nil }) if err != nil { logger.Error(err, "failed to create/update ChunksGenerator CR") return r.handleError(ctx, unstructuredDataProductCR, err) } - logger.Info("ChunksGenerator CR created/updated", "result", result) + logger.Info("ChunksGenerator CR created/updated", "result", chunksResult) var source unstructured.DataSource switch unstructuredDataProductCR.Spec.SourceConfig.Type { @@ -185,20 +191,39 @@ func (r *UnstructuredDataProductReconciler) Reconcile(ctx context.Context, req c return r.handleError(ctx, unstructuredDataProductCR, err) } - // add force reconcile label to the DocumentProcessor CR - documentProcessorKey := client.ObjectKey{ - Namespace: unstructuredDataProductCR.Namespace, - Name: dataProductName, - } - if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - latestDocumentProcessorCR := &operatorv1alpha1.DocumentProcessor{} - if err := r.Get(ctx, documentProcessorKey, latestDocumentProcessorCR); err != nil { - return err + if unstructuredDataProductCR.Spec.DocumentProcessorConfig != nil { + // add force reconcile label to the DocumentProcessor CR + // DocumentProcessor will in turn trigger ChunksGenerator when conversion is done + documentProcessorKey := client.ObjectKey{ + Namespace: unstructuredDataProductCR.Namespace, + Name: dataProductName, + } + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + latestDocumentProcessorCR := &operatorv1alpha1.DocumentProcessor{} + if err := r.Get(ctx, documentProcessorKey, latestDocumentProcessorCR); err != nil { + return err + } + return controllerutils.AddForceReconcileLabel(ctx, r.Client, latestDocumentProcessorCR) + }); err != nil { + logger.Error(err, "failed to add force reconcile label to DocumentProcessor CR") + return r.handleError(ctx, unstructuredDataProductCR, err) + } + } else { + // No document processing needed — trigger ChunksGenerator directly + chunksGeneratorKey := client.ObjectKey{ + Namespace: unstructuredDataProductCR.Namespace, + Name: dataProductName, + } + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + latestChunksGeneratorCR := &operatorv1alpha1.ChunksGenerator{} + if err := r.Get(ctx, chunksGeneratorKey, latestChunksGeneratorCR); err != nil { + return err + } + return controllerutils.AddForceReconcileLabel(ctx, r.Client, latestChunksGeneratorCR) + }); err != nil { + logger.Error(err, "failed to add force reconcile label to ChunksGenerator CR") + return r.handleError(ctx, unstructuredDataProductCR, err) } - return controllerutils.AddForceReconcileLabel(ctx, r.Client, latestDocumentProcessorCR) - }); err != nil { - logger.Error(err, "failed to add force reconcile label to DocumentProcessor CR") - return r.handleError(ctx, unstructuredDataProductCR, err) } // Setup destination diff --git a/pkg/docling/chunk.go b/pkg/docling/chunk.go new file mode 100644 index 00000000..e3d4086f --- /dev/null +++ b/pkg/docling/chunk.go @@ -0,0 +1,201 @@ +/* +Copyright 2026. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package docling + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/url" + "time" + + "sigs.k8s.io/controller-runtime/pkg/log" +) + +// ChunkType represents the type of Docling chunker to use. +type ChunkType string + +const ( + ChunkTypeHierarchical ChunkType = "hierarchical" + ChunkTypeHybrid ChunkType = "hybrid" + + chunkPollInterval = 2 * time.Second +) + +// HierarchicalChunkingOptions are options for the Docling HierarchicalChunker. +type HierarchicalChunkingOptions struct { + MergeListItems *bool `json:"merge_list_items,omitempty"` +} + +// HybridChunkingOptions are options for the Docling HybridChunker. +type HybridChunkingOptions struct { + Tokenizer string `json:"tokenizer,omitempty"` + MaxTokens int `json:"max_tokens,omitempty"` + MergePeers *bool `json:"merge_peers,omitempty"` +} + +// DoclingChunkRequestPayload is the request body for the Docling chunk endpoint. +type DoclingChunkRequestPayload struct { + Sources []DoclingSource `json:"sources"` + Options *DoclingConfig `json:"options,omitempty"` + ChunkingOptions any `json:"chunking_options,omitempty"` +} + +// ChunkedDocumentResultItem represents a single chunk returned by the Docling chunk API. +type ChunkedDocumentResultItem struct { + Text string `json:"text"` +} + +// ChunkDocumentResponse is the response from GET /v1/result/{task_id} for chunk tasks. +type ChunkDocumentResponse struct { + Chunks []ChunkedDocumentResultItem `json:"chunks"` + ProcessingTime float64 `json:"processing_time"` +} + +func (c *Client) chunkSourceAsyncEndpoint(chunkType ChunkType) (string, error) { + return url.JoinPath(c.ClientConfig.URL, "/v1/chunk", string(chunkType), "source", "async") +} + +// ChunkFile sends a document to the Docling async chunk endpoint and returns the chunk texts. +// It submits the task, polls for completion, then fetches the result. +// It uses TryAcquire on the semaphore to avoid blocking the reconciliation loop. +func (c *Client) ChunkFile( + ctx context.Context, + fileURL string, + chunkType ChunkType, + chunkingOptions any, +) ([]string, error) { + logger := log.FromContext(ctx) + + acquired := c.ClientConfig.sem.TryAcquire(1) + if !acquired { + return nil, errors.New(SemaphoreAcquireError) + } + defer c.ClientConfig.sem.Release(1) + + // Step 1: Submit async chunk request + endpoint, err := c.chunkSourceAsyncEndpoint(chunkType) + if err != nil { + return nil, fmt.Errorf("failed to get async chunk endpoint: %w", err) + } + + payload, err := json.Marshal(DoclingChunkRequestPayload{ + Sources: []DoclingSource{ + {URL: fileURL, Kind: "http"}, + }, + ChunkingOptions: chunkingOptions, + }) + if err != nil { + return nil, fmt.Errorf("failed to marshal chunk request payload: %w", err) + } + + logger.Info("submitting async chunk request to docling", "url", endpoint, "source", fileURL, "chunkType", string(chunkType)) + + responseBody, err := c.createDoclingRequest(ctx, "POST", endpoint, payload) + if err != nil { + return nil, fmt.Errorf("failed to submit async chunk request: %w", err) + } + + body, err := io.ReadAll(responseBody) + if err != nil { + responseBody.Close() + return nil, fmt.Errorf("failed to read async chunk response body: %w", err) + } + responseBody.Close() + + var taskStatus TaskStatusResponse + if err = json.Unmarshal(body, &taskStatus); err != nil { + return nil, fmt.Errorf("failed to decode async chunk response: %w", err) + } + + taskID := taskStatus.TaskID + logger.Info("async chunk task submitted", "taskID", taskID, "status", taskStatus.TaskStatus) + + // Step 2: Poll for completion + if err := c.pollUntilComplete(ctx, taskID); err != nil { + return nil, fmt.Errorf("chunk task failed: %w", err) + } + + // Step 3: Fetch result + return c.getChunkResult(ctx, taskID) +} + +func (c *Client) pollUntilComplete(ctx context.Context, taskID string) error { + logger := log.FromContext(ctx) + + for { + _, taskStatus, err := c.getTaskStatus(ctx, taskID) + if err != nil { + return fmt.Errorf("failed to poll task status: %w", err) + } + + switch taskStatus.TaskStatus { + case TaskStatusSuccess: + logger.Info("chunk task completed successfully", "taskID", taskID) + return nil + case TaskStatusFailure: + return fmt.Errorf("task failed: task id: %s", taskID) + case TaskStatusPending, TaskStatusStarted: + logger.Info("chunk task still in progress, polling again", "taskID", taskID, "status", taskStatus.TaskStatus) + default: + return fmt.Errorf("unexpected task status %q for task id: %s", taskStatus.TaskStatus, taskID) + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(chunkPollInterval): + } + } +} + +func (c *Client) getChunkResult(ctx context.Context, taskID string) ([]string, error) { + logger := log.FromContext(ctx) + + taskResultURL, err := c.getTaskResultEndpoint(taskID) + if err != nil { + return nil, fmt.Errorf("failed to get task result endpoint: %w", err) + } + + logger.Info("fetching chunk result", "url", taskResultURL) + responseBody, err := c.createDoclingRequest(ctx, "GET", taskResultURL, nil) + if err != nil { + return nil, fmt.Errorf("failed to get chunk result: %w", err) + } + + body, err := io.ReadAll(responseBody) + if err != nil { + responseBody.Close() + return nil, fmt.Errorf("failed to read chunk result body: %w", err) + } + responseBody.Close() + + var chunkResponse ChunkDocumentResponse + if err = json.Unmarshal(body, &chunkResponse); err != nil { + return nil, fmt.Errorf("failed to decode chunk result: %w", err) + } + + chunks := make([]string, 0, len(chunkResponse.Chunks)) + for _, chunk := range chunkResponse.Chunks { + chunks = append(chunks, chunk.Text) + } + + logger.Info("successfully chunked file via docling", "chunkCount", len(chunks)) + return chunks, nil +} diff --git a/pkg/docling/chunk_test.go b/pkg/docling/chunk_test.go new file mode 100644 index 00000000..a297bbd3 --- /dev/null +++ b/pkg/docling/chunk_test.go @@ -0,0 +1,437 @@ +/* +Copyright 2026. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package docling + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "golang.org/x/sync/semaphore" +) + +func newTestClient(serverURL string, maxConcurrent int64) *Client { + return &Client{ + ClientConfig: &ClientConfig{ + URL: serverURL, + MaxConcurrentRequests: maxConcurrent, + sem: semaphore.NewWeighted(maxConcurrent), + }, + } +} + +func TestChunkFile_Hierarchical(t *testing.T) { + taskID := "test-task-123" + chunkResult := ChunkDocumentResponse{ + Chunks: []ChunkedDocumentResultItem{ + {Text: "This is the first chunk."}, + {Text: "This is the second chunk."}, + {Text: "This is the third chunk."}, + }, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + switch { + case r.Method == http.MethodPost && r.URL.Path == "/v1/chunk/hierarchical/source/async": + var payload DoclingChunkRequestPayload + if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { + t.Errorf("failed to decode request body: %v", err) + } + if len(payload.Sources) != 1 || payload.Sources[0].URL != "https://example.com/doc.pdf" { + t.Errorf("unexpected sources: %+v", payload.Sources) + } + _ = json.NewEncoder(w).Encode(TaskStatusResponse{ + TaskID: taskID, + TaskStatus: TaskStatusSuccess, + }) + case r.Method == http.MethodGet && r.URL.Path == "/v1/status/poll/"+taskID: + _ = json.NewEncoder(w).Encode(TaskStatusResponse{ + TaskID: taskID, + TaskStatus: TaskStatusSuccess, + }) + case r.Method == http.MethodGet && r.URL.Path == "/v1/result/"+taskID: + _ = json.NewEncoder(w).Encode(chunkResult) + default: + t.Errorf("unexpected request: %s %s", r.Method, r.URL.Path) + w.WriteHeader(http.StatusNotFound) + } + })) + defer server.Close() + + client := newTestClient(server.URL, 5) + mergeListItems := true + options := &HierarchicalChunkingOptions{MergeListItems: &mergeListItems} + + chunks, err := client.ChunkFile(context.Background(), "https://example.com/doc.pdf", ChunkTypeHierarchical, options) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(chunks) != 3 { + t.Fatalf("expected 3 chunks, got %d", len(chunks)) + } + if chunks[0] != "This is the first chunk." { + t.Errorf("unexpected first chunk: %s", chunks[0]) + } + if chunks[1] != "This is the second chunk." { + t.Errorf("unexpected second chunk: %s", chunks[1]) + } + if chunks[2] != "This is the third chunk." { + t.Errorf("unexpected third chunk: %s", chunks[2]) + } +} + +func TestChunkFile_Hybrid(t *testing.T) { + taskID := "test-hybrid-456" + chunkResult := ChunkDocumentResponse{ + Chunks: []ChunkedDocumentResultItem{ + {Text: "Hybrid chunk one."}, + {Text: "Hybrid chunk two."}, + }, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + switch { + case r.Method == http.MethodPost && r.URL.Path == "/v1/chunk/hybrid/source/async": + _ = json.NewEncoder(w).Encode(TaskStatusResponse{ + TaskID: taskID, + TaskStatus: TaskStatusSuccess, + }) + case r.Method == http.MethodGet && r.URL.Path == "/v1/status/poll/"+taskID: + _ = json.NewEncoder(w).Encode(TaskStatusResponse{ + TaskID: taskID, + TaskStatus: TaskStatusSuccess, + }) + case r.Method == http.MethodGet && r.URL.Path == "/v1/result/"+taskID: + _ = json.NewEncoder(w).Encode(chunkResult) + default: + t.Errorf("unexpected request: %s %s", r.Method, r.URL.Path) + w.WriteHeader(http.StatusNotFound) + } + })) + defer server.Close() + + client := newTestClient(server.URL, 5) + mergePeers := false + options := &HybridChunkingOptions{ + Tokenizer: "sentence-transformers/all-MiniLM-L6-v2", + MaxTokens: 512, + MergePeers: &mergePeers, + } + + chunks, err := client.ChunkFile(context.Background(), "https://example.com/doc.pdf", ChunkTypeHybrid, options) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(chunks) != 2 { + t.Fatalf("expected 2 chunks, got %d", len(chunks)) + } + if chunks[0] != "Hybrid chunk one." { + t.Errorf("unexpected first chunk: %s", chunks[0]) + } +} + +func TestChunkFile_SemaphoreAcquireError(t *testing.T) { + // Create a client with max concurrency of 1 and acquire the only slot + client := newTestClient("http://localhost:9999", 1) + client.ClientConfig.sem.TryAcquire(1) + + _, err := client.ChunkFile(context.Background(), "https://example.com/doc.pdf", ChunkTypeHierarchical, nil) + if err == nil { + t.Fatal("expected error, got nil") + } + if err.Error() != SemaphoreAcquireError { + t.Errorf("expected semaphore acquire error, got: %s", err.Error()) + } + + // Release the slot we acquired + client.ClientConfig.sem.Release(1) +} + +func TestChunkFile_HTTPError(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + client := newTestClient(server.URL, 5) + + _, err := client.ChunkFile(context.Background(), "https://example.com/doc.pdf", ChunkTypeHierarchical, nil) + if err == nil { + t.Fatal("expected error, got nil") + } +} + +func TestChunkFile_AsyncPolling(t *testing.T) { + taskID := "poll-task-789" + pollCount := 0 + chunkResult := ChunkDocumentResponse{ + Chunks: []ChunkedDocumentResultItem{ + {Text: "polled chunk"}, + }, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + switch { + case r.Method == http.MethodPost && r.URL.Path == "/v1/chunk/hierarchical/source/async": + _ = json.NewEncoder(w).Encode(TaskStatusResponse{ + TaskID: taskID, + TaskStatus: TaskStatusPending, + }) + case r.Method == http.MethodGet && r.URL.Path == "/v1/status/poll/"+taskID: + pollCount++ + status := TaskStatusStarted + if pollCount >= 2 { + status = TaskStatusSuccess + } + _ = json.NewEncoder(w).Encode(TaskStatusResponse{ + TaskID: taskID, + TaskStatus: status, + }) + case r.Method == http.MethodGet && r.URL.Path == "/v1/result/"+taskID: + _ = json.NewEncoder(w).Encode(chunkResult) + default: + t.Errorf("unexpected request: %s %s", r.Method, r.URL.Path) + w.WriteHeader(http.StatusNotFound) + } + })) + defer server.Close() + + client := newTestClient(server.URL, 5) + + chunks, err := client.ChunkFile(context.Background(), "https://example.com/doc.pdf", ChunkTypeHierarchical, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(chunks) != 1 || chunks[0] != "polled chunk" { + t.Errorf("unexpected chunks: %v", chunks) + } + if pollCount < 2 { + t.Errorf("expected at least 2 poll calls, got %d", pollCount) + } +} + +func TestChunkFile_AsyncTaskFailure(t *testing.T) { + taskID := "fail-task-000" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + switch { + case r.Method == http.MethodPost && r.URL.Path == "/v1/chunk/hierarchical/source/async": + _ = json.NewEncoder(w).Encode(TaskStatusResponse{ + TaskID: taskID, + TaskStatus: TaskStatusPending, + }) + case r.Method == http.MethodGet && r.URL.Path == "/v1/status/poll/"+taskID: + _ = json.NewEncoder(w).Encode(TaskStatusResponse{ + TaskID: taskID, + TaskStatus: TaskStatusFailure, + }) + default: + t.Errorf("unexpected request: %s %s", r.Method, r.URL.Path) + w.WriteHeader(http.StatusNotFound) + } + })) + defer server.Close() + + client := newTestClient(server.URL, 5) + + _, err := client.ChunkFile(context.Background(), "https://example.com/doc.pdf", ChunkTypeHierarchical, nil) + if err == nil { + t.Fatal("expected error for failed task, got nil") + } +} + +func TestChunkSourceAsyncEndpoint(t *testing.T) { + client := newTestClient("http://localhost:5001", 1) + + hierarchicalEndpoint, err := client.chunkSourceAsyncEndpoint(ChunkTypeHierarchical) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if hierarchicalEndpoint != "http://localhost:5001/v1/chunk/hierarchical/source/async" { + t.Errorf("unexpected endpoint: %s", hierarchicalEndpoint) + } + + hybridEndpoint, err := client.chunkSourceAsyncEndpoint(ChunkTypeHybrid) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if hybridEndpoint != "http://localhost:5001/v1/chunk/hybrid/source/async" { + t.Errorf("unexpected endpoint: %s", hybridEndpoint) + } +} + +func TestChunkRequestPayload_Serialization(t *testing.T) { + mergeListItems := true + payload := DoclingChunkRequestPayload{ + Sources: []DoclingSource{ + {URL: "https://example.com/doc.pdf", Kind: "http"}, + }, + ChunkingOptions: &HierarchicalChunkingOptions{ + MergeListItems: &mergeListItems, + }, + } + + data, err := json.Marshal(payload) + if err != nil { + t.Fatalf("failed to marshal: %v", err) + } + + var result map[string]any + if err := json.Unmarshal(data, &result); err != nil { + t.Fatalf("failed to unmarshal: %v", err) + } + + // Verify top-level field names match Docling API + if _, ok := result["sources"]; !ok { + t.Error("expected 'sources' field in JSON") + } + if _, ok := result["chunking_options"]; !ok { + t.Error("expected 'chunking_options' field in JSON") + } + + // Verify chunking_options field names use snake_case + chunkingOpts, ok := result["chunking_options"].(map[string]any) + if !ok { + t.Fatal("chunking_options is not a map") + } + if _, ok := chunkingOpts["merge_list_items"]; !ok { + t.Error("expected 'merge_list_items' field in chunking_options") + } +} + +func TestChunkRequestPayload_HybridSerialization(t *testing.T) { + mergePeers := false + payload := DoclingChunkRequestPayload{ + Sources: []DoclingSource{ + {URL: "https://example.com/doc.pdf", Kind: "http"}, + }, + ChunkingOptions: &HybridChunkingOptions{ + Tokenizer: "sentence-transformers/all-MiniLM-L6-v2", + MaxTokens: 512, + MergePeers: &mergePeers, + }, + } + + data, err := json.Marshal(payload) + if err != nil { + t.Fatalf("failed to marshal: %v", err) + } + + var result map[string]any + if err := json.Unmarshal(data, &result); err != nil { + t.Fatalf("failed to unmarshal: %v", err) + } + + chunkingOpts, ok := result["chunking_options"].(map[string]any) + if !ok { + t.Fatal("chunking_options is not a map") + } + if _, ok := chunkingOpts["tokenizer"]; !ok { + t.Error("expected 'tokenizer' field in chunking_options") + } + if _, ok := chunkingOpts["max_tokens"]; !ok { + t.Error("expected 'max_tokens' field in chunking_options") + } + if _, ok := chunkingOpts["merge_peers"]; !ok { + t.Error("expected 'merge_peers' field in chunking_options") + } +} + +func TestChunkFile_EmptyResponse(t *testing.T) { + taskID := "empty-task" + chunkResult := ChunkDocumentResponse{ + Chunks: []ChunkedDocumentResultItem{}, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + switch { + case r.Method == http.MethodPost: + _ = json.NewEncoder(w).Encode(TaskStatusResponse{TaskID: taskID, TaskStatus: TaskStatusSuccess}) + case r.URL.Path == "/v1/status/poll/"+taskID: + _ = json.NewEncoder(w).Encode(TaskStatusResponse{TaskID: taskID, TaskStatus: TaskStatusSuccess}) + case r.URL.Path == "/v1/result/"+taskID: + _ = json.NewEncoder(w).Encode(chunkResult) + } + })) + defer server.Close() + + client := newTestClient(server.URL, 5) + + chunks, err := client.ChunkFile(context.Background(), "https://example.com/doc.pdf", ChunkTypeHierarchical, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(chunks) != 0 { + t.Errorf("expected 0 chunks, got %d", len(chunks)) + } +} + +func TestChunkFile_AuthHeader(t *testing.T) { + taskID := "auth-task" + var receivedAuthHeader string + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodPost { + receivedAuthHeader = r.Header.Get("Authorization") + } + w.Header().Set("Content-Type", "application/json") + + switch { + case r.Method == http.MethodPost: + _ = json.NewEncoder(w).Encode(TaskStatusResponse{TaskID: taskID, TaskStatus: TaskStatusSuccess}) + case r.URL.Path == "/v1/status/poll/"+taskID: + _ = json.NewEncoder(w).Encode(TaskStatusResponse{TaskID: taskID, TaskStatus: TaskStatusSuccess}) + case r.URL.Path == "/v1/result/"+taskID: + _ = json.NewEncoder(w).Encode(ChunkDocumentResponse{Chunks: []ChunkedDocumentResultItem{}}) + } + })) + defer server.Close() + + client := &Client{ + ClientConfig: &ClientConfig{ + URL: server.URL, + Key: "test-api-key", + MaxConcurrentRequests: 5, + sem: semaphore.NewWeighted(5), + }, + } + + _, err := client.ChunkFile(context.Background(), "https://example.com/doc.pdf", ChunkTypeHierarchical, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if receivedAuthHeader != "Bearer test-api-key" { + t.Errorf("expected 'Bearer test-api-key', got '%s'", receivedAuthHeader) + } +} diff --git a/pkg/unstructured/chunking.go b/pkg/unstructured/chunking.go index 3e2534c9..01f361aa 100644 --- a/pkg/unstructured/chunking.go +++ b/pkg/unstructured/chunking.go @@ -1,12 +1,15 @@ package unstructured import ( + "context" + + "github.com/redhat-data-and-ai/unstructured-data-controller/pkg/docling" "github.com/redhat-data-and-ai/unstructured-data-controller/pkg/langchain" "github.com/tmc/langchaingo/textsplitter" ) type Chunker interface { - Chunk(text string) ([]string, error) + Chunk(ctx context.Context, input string) ([]string, error) } type MarkdownSplitter struct { @@ -24,14 +27,32 @@ type TokenSplitter struct { Config *textsplitter.Options } -func (c *MarkdownSplitter) Chunk(text string) ([]string, error) { +type DoclingHierarchicalChunker struct { + DoclingClient *docling.Client + Options *docling.HierarchicalChunkingOptions +} + +type DoclingHybridChunker struct { + DoclingClient *docling.Client + Options *docling.HybridChunkingOptions +} + +func (c *MarkdownSplitter) Chunk(_ context.Context, text string) ([]string, error) { return c.LangchainClient.SplitTextViaMarkdownTextSplitter(text, c.Config) } -func (c *RecursiveCharacterSplitter) Chunk(text string) ([]string, error) { +func (c *RecursiveCharacterSplitter) Chunk(_ context.Context, text string) ([]string, error) { return c.LangchainClient.SplitTextViaRecursiveCharacterTextSplitter(text, c.Config) } -func (c *TokenSplitter) Chunk(text string) ([]string, error) { +func (c *TokenSplitter) Chunk(_ context.Context, text string) ([]string, error) { return c.LangchainClient.SplitTextViaTokenTextSplitter(text, c.Config) } + +func (c *DoclingHierarchicalChunker) Chunk(ctx context.Context, fileURL string) ([]string, error) { + return c.DoclingClient.ChunkFile(ctx, fileURL, docling.ChunkTypeHierarchical, c.Options) +} + +func (c *DoclingHybridChunker) Chunk(ctx context.Context, fileURL string) ([]string, error) { + return c.DoclingClient.ChunkFile(ctx, fileURL, docling.ChunkTypeHybrid, c.Options) +} diff --git a/pkg/unstructured/chunking_test.go b/pkg/unstructured/chunking_test.go new file mode 100644 index 00000000..24af232d --- /dev/null +++ b/pkg/unstructured/chunking_test.go @@ -0,0 +1,129 @@ +/* +Copyright 2026. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package unstructured + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/redhat-data-and-ai/unstructured-data-controller/pkg/docling" +) + +func newTestDoclingClient(serverURL string) *docling.Client { + return docling.NewClientFromURL(&docling.ClientConfig{ + URL: serverURL, + MaxConcurrentRequests: 5, + }) +} + +// asyncChunkServer creates a test server that handles the async chunk flow: +// POST /v1/chunk/{type}/source/async -> TaskStatusResponse +// GET /v1/status/poll/{taskID} -> TaskStatusResponse (success) +// GET /v1/result/{taskID} -> ChunkDocumentResponse +func asyncChunkServer(t *testing.T, expectedPath string, taskID string, chunks []docling.ChunkedDocumentResultItem) *httptest.Server { + t.Helper() + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + switch { + case r.Method == http.MethodPost && r.URL.Path == expectedPath: + _ = json.NewEncoder(w).Encode(docling.TaskStatusResponse{ + TaskID: taskID, + TaskStatus: docling.TaskStatusSuccess, + }) + case r.Method == http.MethodGet && r.URL.Path == "/v1/status/poll/"+taskID: + _ = json.NewEncoder(w).Encode(docling.TaskStatusResponse{ + TaskID: taskID, + TaskStatus: docling.TaskStatusSuccess, + }) + case r.Method == http.MethodGet && r.URL.Path == "/v1/result/"+taskID: + _ = json.NewEncoder(w).Encode(docling.ChunkDocumentResponse{ + Chunks: chunks, + }) + default: + t.Errorf("unexpected request: %s %s", r.Method, r.URL.Path) + w.WriteHeader(http.StatusNotFound) + } + })) +} + +func TestDoclingHierarchicalChunker_ImplementsChunker(t *testing.T) { + server := asyncChunkServer(t, "/v1/chunk/hierarchical/source/async", "hier-task-1", + []docling.ChunkedDocumentResultItem{ + {Text: "chunk one"}, + {Text: "chunk two"}, + }) + defer server.Close() + + mergeListItems := true + var chunker Chunker = &DoclingHierarchicalChunker{ + DoclingClient: newTestDoclingClient(server.URL), + Options: &docling.HierarchicalChunkingOptions{ + MergeListItems: &mergeListItems, + }, + } + + chunks, err := chunker.Chunk(context.Background(), "https://example.com/doc.pdf") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(chunks) != 2 { + t.Fatalf("expected 2 chunks, got %d", len(chunks)) + } + if chunks[0] != "chunk one" { + t.Errorf("unexpected first chunk: %s", chunks[0]) + } + if chunks[1] != "chunk two" { + t.Errorf("unexpected second chunk: %s", chunks[1]) + } +} + +func TestDoclingHybridChunker_ImplementsChunker(t *testing.T) { + server := asyncChunkServer(t, "/v1/chunk/hybrid/source/async", "hybrid-task-1", + []docling.ChunkedDocumentResultItem{ + {Text: "hybrid one"}, + {Text: "hybrid two"}, + {Text: "hybrid three"}, + }) + defer server.Close() + + mergePeers := false + var chunker Chunker = &DoclingHybridChunker{ + DoclingClient: newTestDoclingClient(server.URL), + Options: &docling.HybridChunkingOptions{ + Tokenizer: "sentence-transformers/all-MiniLM-L6-v2", + MaxTokens: 512, + MergePeers: &mergePeers, + }, + } + + chunks, err := chunker.Chunk(context.Background(), "https://example.com/doc.pdf") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(chunks) != 3 { + t.Fatalf("expected 3 chunks, got %d", len(chunks)) + } + if chunks[0] != "hybrid one" { + t.Errorf("unexpected first chunk: %s", chunks[0]) + } +} diff --git a/pkg/unstructured/chunks_file.go b/pkg/unstructured/chunks_file.go index 5a7f7ee2..fa759bc8 100644 --- a/pkg/unstructured/chunks_file.go +++ b/pkg/unstructured/chunks_file.go @@ -9,6 +9,7 @@ type ChunkingTool string const ( LangchainChunkingTool ChunkingTool = "langchain" + DoclingChunkingTool ChunkingTool = "docling" ) type Chunks struct { @@ -32,7 +33,11 @@ type ChunksFile struct { } func (c *ChunksFileMetadata) Equal(other *ChunksFileMetadata) bool { - if !c.ConvertedFileMetadata.Equal(other.ConvertedFileMetadata) { + if c.ConvertedFileMetadata == nil && other.ConvertedFileMetadata == nil { + // both nil, equal in this regard + } else if c.ConvertedFileMetadata == nil || other.ConvertedFileMetadata == nil { + return false + } else if !c.ConvertedFileMetadata.Equal(other.ConvertedFileMetadata) { return false } if c.ChunkingTool != other.ChunkingTool { diff --git a/pkg/unstructured/chunks_file_test.go b/pkg/unstructured/chunks_file_test.go new file mode 100644 index 00000000..d903fbd3 --- /dev/null +++ b/pkg/unstructured/chunks_file_test.go @@ -0,0 +1,147 @@ +/* +Copyright 2026. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package unstructured + +import ( + "testing" + + "github.com/redhat-data-and-ai/unstructured-data-controller/api/v1alpha1" +) + +func TestChunksFileMetadata_Equal_BothNilConvertedFileMetadata(t *testing.T) { + a := &ChunksFileMetadata{ + ConvertedFileMetadata: nil, + ChunkingTool: DoclingChunkingTool, + ChunksGeneratorConfig: v1alpha1.ChunksGeneratorConfig{ + Strategy: v1alpha1.ChunkingStrategyDoclingHierarchical, + }, + } + b := &ChunksFileMetadata{ + ConvertedFileMetadata: nil, + ChunkingTool: DoclingChunkingTool, + ChunksGeneratorConfig: v1alpha1.ChunksGeneratorConfig{ + Strategy: v1alpha1.ChunkingStrategyDoclingHierarchical, + }, + } + + if !a.Equal(b) { + t.Error("expected Equal to return true when both ConvertedFileMetadata are nil and other fields match") + } +} + +func TestChunksFileMetadata_Equal_OneNilConvertedFileMetadata(t *testing.T) { + a := &ChunksFileMetadata{ + ConvertedFileMetadata: nil, + ChunkingTool: DoclingChunkingTool, + ChunksGeneratorConfig: v1alpha1.ChunksGeneratorConfig{ + Strategy: v1alpha1.ChunkingStrategyDoclingHierarchical, + }, + } + b := &ChunksFileMetadata{ + ConvertedFileMetadata: &ConvertedFileMetadata{ + RawFilePath: "test/file.pdf", + DocumentConverter: DocumentConverterDocling, + }, + ChunkingTool: DoclingChunkingTool, + ChunksGeneratorConfig: v1alpha1.ChunksGeneratorConfig{ + Strategy: v1alpha1.ChunkingStrategyDoclingHierarchical, + }, + } + + if a.Equal(b) { + t.Error("expected Equal to return false when one ConvertedFileMetadata is nil and the other is not") + } + if b.Equal(a) { + t.Error("expected Equal to return false when one ConvertedFileMetadata is nil and the other is not (reversed)") + } +} + +func TestChunksFileMetadata_Equal_BothNonNilSame(t *testing.T) { + meta := &ConvertedFileMetadata{ + RawFilePath: "test/file.pdf", + DocumentConverter: DocumentConverterDocling, + } + a := &ChunksFileMetadata{ + ConvertedFileMetadata: meta, + ChunkingTool: LangchainChunkingTool, + ChunksGeneratorConfig: v1alpha1.ChunksGeneratorConfig{ + Strategy: v1alpha1.ChunkingStrategyRecursiveCharacter, + }, + } + b := &ChunksFileMetadata{ + ConvertedFileMetadata: &ConvertedFileMetadata{ + RawFilePath: "test/file.pdf", + DocumentConverter: DocumentConverterDocling, + }, + ChunkingTool: LangchainChunkingTool, + ChunksGeneratorConfig: v1alpha1.ChunksGeneratorConfig{ + Strategy: v1alpha1.ChunkingStrategyRecursiveCharacter, + }, + } + + if !a.Equal(b) { + t.Error("expected Equal to return true when both ConvertedFileMetadata are non-nil and match") + } +} + +func TestChunksFileMetadata_Equal_BothNonNilDifferent(t *testing.T) { + a := &ChunksFileMetadata{ + ConvertedFileMetadata: &ConvertedFileMetadata{ + RawFilePath: "test/file1.pdf", + DocumentConverter: DocumentConverterDocling, + }, + ChunkingTool: LangchainChunkingTool, + ChunksGeneratorConfig: v1alpha1.ChunksGeneratorConfig{ + Strategy: v1alpha1.ChunkingStrategyRecursiveCharacter, + }, + } + b := &ChunksFileMetadata{ + ConvertedFileMetadata: &ConvertedFileMetadata{ + RawFilePath: "test/file2.pdf", + DocumentConverter: DocumentConverterDocling, + }, + ChunkingTool: LangchainChunkingTool, + ChunksGeneratorConfig: v1alpha1.ChunksGeneratorConfig{ + Strategy: v1alpha1.ChunkingStrategyRecursiveCharacter, + }, + } + + if a.Equal(b) { + t.Error("expected Equal to return false when ConvertedFileMetadata differ") + } +} + +func TestChunksFileMetadata_Equal_DifferentChunkingTool(t *testing.T) { + a := &ChunksFileMetadata{ + ConvertedFileMetadata: nil, + ChunkingTool: DoclingChunkingTool, + ChunksGeneratorConfig: v1alpha1.ChunksGeneratorConfig{ + Strategy: v1alpha1.ChunkingStrategyDoclingHierarchical, + }, + } + b := &ChunksFileMetadata{ + ConvertedFileMetadata: nil, + ChunkingTool: LangchainChunkingTool, + ChunksGeneratorConfig: v1alpha1.ChunksGeneratorConfig{ + Strategy: v1alpha1.ChunkingStrategyDoclingHierarchical, + }, + } + + if a.Equal(b) { + t.Error("expected Equal to return false when ChunkingTool differs") + } +}