From ea88c0350b482dacc60de04d18d4387dd9fdace4 Mon Sep 17 00:00:00 2001 From: Puneet Punamiya Date: Mon, 30 Mar 2026 16:22:50 +0530 Subject: [PATCH] feat: add artifacts-based configuration for selective destination syncing Enables selective syncing of file types with custom artifact paths under a stages directory structure Users can configure which file types (converted/chunks/embeddings) to sync and customize their destination paths Example usage in UnstructuredDataProduct spec: destinationConfig: artifacts: - type: stage name: documentProcessorConfig path: processed-documents - type: stage name: chunksGeneratorConfig path: chunks - type: stage name: vectorEmbeddingsGeneratorConfig path: vector-embeddings Signed-off-by: Puneet Punamiya --- api/v1alpha1/unstructureddataproduct_types.go | 8 + api/v1alpha1/zz_generated.deepcopy.go | 22 +- ...e.redhat.com_unstructureddataproducts.yaml | 13 + .../controller/chunksgenerator_controller.go | 21 ++ .../documentprocessor_controller.go | 23 ++ .../unstructureddataproduct_controller.go | 151 +++++++-- pkg/snowflake/stage.go | 4 +- pkg/unstructured/destination.go | 290 ++++++++++++------ test/utils/utils_function.go | 7 + 9 files changed, 414 insertions(+), 125 deletions(-) diff --git a/api/v1alpha1/unstructureddataproduct_types.go b/api/v1alpha1/unstructureddataproduct_types.go index 34feab16..61c79bc4 100644 --- a/api/v1alpha1/unstructureddataproduct_types.go +++ b/api/v1alpha1/unstructureddataproduct_types.go @@ -115,9 +115,17 @@ type S3Config struct { Prefix string `json:"prefix,omitempty"` } +// ArtifactConfig defines which processing stage artifacts to sync and their destination paths +type ArtifactConfig struct { + Type string `json:"type,omitempty"` // e.g., "stage" + Name string `json:"name,omitempty"` // e.g., "documentProcessorConfig", "chunksGeneratorConfig", "vectorEmbeddingsGeneratorConfig" + Path string `json:"path,omitempty"` // e.g., "processed-documents", "chunks", "vector-embeddings" +} + // DestinationConfig defines where to write processed data (e.g. Snowflake internal stage or S3). type DestinationConfig struct { Type UnstructuredDataType `json:"type,omitempty"` + Artifacts []ArtifactConfig `json:"artifacts,omitempty"` SnowflakeInternalStageConfig SnowflakeInternalStageConfig `json:"snowflakeInternalStageConfig,omitempty"` S3DestinationConfig S3Config `json:"s3DestinationConfig,omitempty"` } diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index a42858d7..2aa2494d 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -25,6 +25,21 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ArtifactConfig) DeepCopyInto(out *ArtifactConfig) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ArtifactConfig. +func (in *ArtifactConfig) DeepCopy() *ArtifactConfig { + if in == nil { + return nil + } + out := new(ArtifactConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ChunksGenerator) DeepCopyInto(out *ChunksGenerator) { *out = *in @@ -245,6 +260,11 @@ func (in *ControllerConfigStatus) DeepCopy() *ControllerConfigStatus { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DestinationConfig) DeepCopyInto(out *DestinationConfig) { *out = *in + if in.Artifacts != nil { + in, out := &in.Artifacts, &out.Artifacts + *out = make([]ArtifactConfig, len(*in)) + copy(*out, *in) + } out.SnowflakeInternalStageConfig = in.SnowflakeInternalStageConfig out.S3DestinationConfig = in.S3DestinationConfig } @@ -740,7 +760,7 @@ func (in *UnstructuredDataProductList) DeepCopyObject() runtime.Object { func (in *UnstructuredDataProductSpec) DeepCopyInto(out *UnstructuredDataProductSpec) { *out = *in out.SourceConfig = in.SourceConfig - out.DestinationConfig = in.DestinationConfig + in.DestinationConfig.DeepCopyInto(&out.DestinationConfig) in.DocumentProcessorConfig.DeepCopyInto(&out.DocumentProcessorConfig) in.ChunksGeneratorConfig.DeepCopyInto(&out.ChunksGeneratorConfig) out.VectorEmbeddingsGeneratorConfig = in.VectorEmbeddingsGeneratorConfig diff --git a/config/crd/bases/operator.dataverse.redhat.com_unstructureddataproducts.yaml b/config/crd/bases/operator.dataverse.redhat.com_unstructureddataproducts.yaml index 2712afbc..55231857 100644 --- a/config/crd/bases/operator.dataverse.redhat.com_unstructureddataproducts.yaml +++ b/config/crd/bases/operator.dataverse.redhat.com_unstructureddataproducts.yaml @@ -106,6 +106,19 @@ spec: description: DestinationConfig defines where to write processed data (e.g. Snowflake internal stage or S3). properties: + artifacts: + items: + description: ArtifactConfig defines which processing stage artifacts + to sync and their destination paths + properties: + name: + type: string + path: + type: string + type: + type: string + type: object + type: array s3DestinationConfig: description: S3Config configures an S3 bucket and optional prefix. properties: diff --git a/internal/controller/chunksgenerator_controller.go b/internal/controller/chunksgenerator_controller.go index 7e64942f..2b678304 100644 --- a/internal/controller/chunksgenerator_controller.go +++ b/internal/controller/chunksgenerator_controller.go @@ -130,6 +130,7 @@ func (r *ChunksGeneratorReconciler) Reconcile(ctx context.Context, req ctrl.Requ skippedFiles := []string{} convertedFilePaths := unstructured.FilterConvertedFilePaths(filePaths) + hadFilesToProcess := len(convertedFilePaths) > 0 for _, convertedFilePath := range convertedFilePaths { logger.Info("processing converted file", "file", convertedFilePath) @@ -173,6 +174,26 @@ func (r *ChunksGeneratorReconciler) Reconcile(ctx context.Context, req ctrl.Requ return r.handleError(ctx, chunksGeneratorCR, err) } + // trigger UnstructuredDataProduct if: + // Had files to process and successfully processed (no errors/skips) + if hadFilesToProcess && len(chunkingErrors) == 0 && len(skippedFiles) == 0 { + unstructuredDataProductKey := client.ObjectKey{ + Namespace: chunksGeneratorCR.Namespace, + Name: chunksGeneratorCR.Spec.DataProduct, + } + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + unstructuredDataProductCR := &operatorv1alpha1.UnstructuredDataProduct{} + if err := r.Get(ctx, unstructuredDataProductKey, unstructuredDataProductCR); err != nil { + return err + } + return controllerutils.AddForceReconcileLabel(ctx, r.Client, unstructuredDataProductCR) + }); err != nil { + logger.Error(err, "failed to add force reconcile label to UnstructuredDataProduct CR") + return r.handleError(ctx, chunksGeneratorCR, err) + } + logger.Info("triggered UnstructuredDataProduct to sync chunks files") + } + successMessage := fmt.Sprintf("successfully reconciled chunks generator: %s", chunksGeneratorCR.Name) if err := controllerutils.StatusUpdateWithRetry(ctx, r.Client, chunksGeneratorKey, func() client.Object { return &operatorv1alpha1.ChunksGenerator{} }, func(obj client.Object) { obj.(*operatorv1alpha1.ChunksGenerator).UpdateStatus(successMessage, nil) diff --git a/internal/controller/documentprocessor_controller.go b/internal/controller/documentprocessor_controller.go index 2d6487a0..a3de039f 100644 --- a/internal/controller/documentprocessor_controller.go +++ b/internal/controller/documentprocessor_controller.go @@ -126,6 +126,9 @@ func (r *DocumentProcessorReconciler) Reconcile(ctx context.Context, req ctrl.Re } r.fileStore = fs + // track if jobs existed at the start of reconcile to know if files were created + hadJobsAtStart := len(documentProcessorCR.Status.Jobs) > 0 + // first, let's figure out the jobs that are currently running jobProcessingErrors := []error{} for _, job := range documentProcessorCR.Status.Jobs { @@ -186,6 +189,26 @@ func (r *DocumentProcessorReconciler) Reconcile(ctx context.Context, req ctrl.Re logger.Info("all jobs are completed, no need to requeue") + // trigger UnstructuredDataProduct if: + // Jobs existed at start and are now complete (files were created this reconcile) + if hadJobsAtStart { + unstructuredDataProductKey := client.ObjectKey{ + Namespace: documentProcessorCR.Namespace, + Name: documentProcessorCR.Spec.DataProduct, + } + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + unstructuredDataProductCR := &operatorv1alpha1.UnstructuredDataProduct{} + if err := r.Get(ctx, unstructuredDataProductKey, unstructuredDataProductCR); err != nil { + return err + } + return controllerutils.AddForceReconcileLabel(ctx, r.Client, unstructuredDataProductCR) + }); err != nil { + logger.Error(err, "failed to add force reconcile label to UnstructuredDataProduct CR") + return r.handleError(ctx, documentProcessorCR, err) + } + logger.Info("triggered UnstructuredDataProduct to sync converted files") + } + successMessage := fmt.Sprintf("successfully reconciled document processor: %s", latestDocumentProcessorCR.Name) if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { res := &operatorv1alpha1.DocumentProcessor{} diff --git a/internal/controller/unstructureddataproduct_controller.go b/internal/controller/unstructureddataproduct_controller.go index 6b512621..4539895e 100644 --- a/internal/controller/unstructureddataproduct_controller.go +++ b/internal/controller/unstructureddataproduct_controller.go @@ -40,6 +40,9 @@ import ( const ( UnstructuredDataProductControllerName = "UnstructuredDataProduct" + ArtifactNameDocumentProcessor = "documentProcessorConfig" + ArtifactNameChunksGenerator = "chunksGeneratorConfig" + ArtifactNameVectorEmbeddings = "vectorEmbeddingsGeneratorConfig" ) var ( @@ -227,20 +230,32 @@ func (r *UnstructuredDataProductReconciler) Reconcile(ctx context.Context, req c } logger.Info("successfully stored files to filestore", "files", storedFiles) - // add force reconcile label to the DocumentProcessor CR - documentProcessorKey := client.ObjectKey{ - Namespace: unstructuredDataProductCR.Namespace, - Name: dataProductName, + // list files to check if processing is needed + filePaths, err := r.fileStore.ListFilesInPath(ctx, dataProductName) + if err != nil { + logger.Error(err, "failed to list files in path for checking processing needs") + return r.handleError(ctx, unstructuredDataProductCR, err) } - if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - latestDocumentProcessorCR := &operatorv1alpha1.DocumentProcessor{} - if err := r.Get(ctx, documentProcessorKey, latestDocumentProcessorCR); err != nil { - return err + + // conditionally trigger DocumentProcessor only if there are unprocessed raw files + if needsDocumentProcessing(filePaths) { + documentProcessorKey := client.ObjectKey{ + Namespace: unstructuredDataProductCR.Namespace, + Name: dataProductName, } - return controllerutils.AddForceReconcileLabel(ctx, r.Client, latestDocumentProcessorCR) - }); err != nil { - logger.Error(err, "failed to add force reconcile label to DocumentProcessor CR") - return r.handleError(ctx, unstructuredDataProductCR, err) + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + latestDocumentProcessorCR := &operatorv1alpha1.DocumentProcessor{} + if err := r.Get(ctx, documentProcessorKey, latestDocumentProcessorCR); err != nil { + return err + } + return controllerutils.AddForceReconcileLabel(ctx, r.Client, latestDocumentProcessorCR) + }); err != nil { + logger.Error(err, "failed to add force reconcile label to DocumentProcessor CR") + return r.handleError(ctx, unstructuredDataProductCR, err) + } + logger.Info("triggered DocumentProcessor for unprocessed files") + } else { + logger.Info("no unprocessed files, skipping DocumentProcessor trigger") } // Setup destination @@ -254,7 +269,7 @@ func (r *UnstructuredDataProductReconciler) Reconcile(ctx context.Context, req c } case operatorv1alpha1.TypeS3: var err error - destination, err = setupS3Destination(unstructuredDataProductCR, dataProductName) + destination, err = setupS3Destination(ctx, unstructuredDataProductCR, dataProductName) if err != nil { if IsAWSClientNotInitializedError(err) { logger.Info("ControllerConfig has not initialized destination S3 client yet, will try again in a bit ...") @@ -268,22 +283,23 @@ func (r *UnstructuredDataProductReconciler) Reconcile(ctx context.Context, req c return r.handleError(ctx, unstructuredDataProductCR, err) } - // list all files in the filestore for the data product - filePaths, err := r.fileStore.ListFilesInPath(ctx, dataProductName) - if err != nil { - logger.Error(err, "failed to list files in path") - return r.handleError(ctx, unstructuredDataProductCR, err) - } - // extract the vector embeddings files that are to be ingested to destination - filterEmbeddingsFiles := unstructured.FilterVectorEmbeddingsFilePaths(filePaths) - logger.Info("files to ingest to destination", "files", filterEmbeddingsFiles) + // filePaths already loaded earlier for needsDocumentProcessing check - // ingest the embeddings files to destination - if err := destination.SyncFilesToDestination(ctx, r.fileStore, filterEmbeddingsFiles); err != nil { - logger.Error(err, "failed to ingest embeddings files to destination") - return r.handleError(ctx, unstructuredDataProductCR, err) + // collect files to sync based on syncToDestination flags + filesToSync := buildDestinationSyncFilePaths(ctx, unstructuredDataProductCR, filePaths) + + logger.Info("files to ingest to destination", "files", filesToSync, "total", len(filesToSync)) + + // ingest the files to destination + if len(filesToSync) > 0 { + if err := destination.SyncFilesToDestination(ctx, r.fileStore, filesToSync); err != nil { + logger.Error(err, "failed to ingest files to destination") + return r.handleError(ctx, unstructuredDataProductCR, err) + } + logger.Info("successfully ingested files to destination") + } else { + logger.Info("no files to sync to destination based on syncToDestination flags") } - logger.Info("successfully ingested embeddings files to destination") // all done, let's update the status to ready successMessage := fmt.Sprintf("successfully reconciled unstructured data product: %s", dataProductName) @@ -302,6 +318,25 @@ func (r *UnstructuredDataProductReconciler) Reconcile(ctx context.Context, req c }, nil } +// buildArtifactPathMap builds a map from file suffix to artifact path based on the artifacts configuration +func buildArtifactPathMap(ctx context.Context, artifacts []operatorv1alpha1.ArtifactConfig) map[string]string { + logger := log.FromContext(ctx) + artifactPathMap := make(map[string]string) + for _, artifact := range artifacts { + switch artifact.Name { + case ArtifactNameDocumentProcessor: + artifactPathMap[unstructured.ConvertedFileSuffix] = artifact.Path + case ArtifactNameChunksGenerator: + artifactPathMap[unstructured.ChunksFileSuffix] = artifact.Path + case ArtifactNameVectorEmbeddings: + artifactPathMap[unstructured.VectorEmbeddingsFileSuffix] = artifact.Path + default: + logger.Info("unknown artifact name, skipping", "name", artifact.Name) + } + } + return artifactPathMap +} + // setupSnowflakeDestination returns a Snowflake internal stage destination for the given CR. func (r *UnstructuredDataProductReconciler) setupSnowflakeDestination(ctx context.Context, unstructuredDataProductCR *operatorv1alpha1.UnstructuredDataProduct) (unstructured.Destination, error) { logger := log.FromContext(ctx) @@ -311,30 +346,59 @@ func (r *UnstructuredDataProductReconciler) setupSnowflakeDestination(ctx contex return nil, err } r.sf = sf + return &unstructured.SnowflakeInternalStage{ - Client: sf, - Role: sf.GetRole(), - Stage: unstructuredDataProductCR.Spec.DestinationConfig.SnowflakeInternalStageConfig.Stage, - Database: unstructuredDataProductCR.Spec.DestinationConfig.SnowflakeInternalStageConfig.Database, - Schema: unstructuredDataProductCR.Spec.DestinationConfig.SnowflakeInternalStageConfig.Schema, + Client: sf, + Role: sf.GetRole(), + Stage: unstructuredDataProductCR.Spec.DestinationConfig.SnowflakeInternalStageConfig.Stage, + Database: unstructuredDataProductCR.Spec.DestinationConfig.SnowflakeInternalStageConfig.Database, + Schema: unstructuredDataProductCR.Spec.DestinationConfig.SnowflakeInternalStageConfig.Schema, + ArtifactPathMap: buildArtifactPathMap(ctx, unstructuredDataProductCR.Spec.DestinationConfig.Artifacts), }, nil } // setupS3Destination returns an S3 destination for the given CR -func setupS3Destination(unstructuredDataProductCR *operatorv1alpha1.UnstructuredDataProduct, dataProductName string) (unstructured.Destination, error) { +func setupS3Destination(ctx context.Context, unstructuredDataProductCR *operatorv1alpha1.UnstructuredDataProduct, dataProductName string) (unstructured.Destination, error) { destCfg := unstructuredDataProductCR.Spec.DestinationConfig.S3DestinationConfig destinationS3Client, err := awsclienthandler.GetDestinationS3Client() if err != nil { return nil, err } + return &unstructured.S3Destination{ S3Client: destinationS3Client, Bucket: destCfg.Bucket, Prefix: destCfg.Prefix, DataProductName: dataProductName, + ArtifactPathMap: buildArtifactPathMap(ctx, unstructuredDataProductCR.Spec.DestinationConfig.Artifacts), }, nil } +func buildDestinationSyncFilePaths(ctx context.Context, unstructuredDataProductCR *operatorv1alpha1.UnstructuredDataProduct, filePaths []string) []string { + var filesToSync []string + logger := log.FromContext(ctx) + + // Iterate through artifacts configuration to determine which files to sync + for _, artifact := range unstructuredDataProductCR.Spec.DestinationConfig.Artifacts { + var filteredFiles []string + + switch artifact.Name { + case ArtifactNameDocumentProcessor: + filteredFiles = unstructured.FilterConvertedFilePaths(filePaths) + case ArtifactNameChunksGenerator: + filteredFiles = unstructured.FilterChunksFilePaths(filePaths) + case ArtifactNameVectorEmbeddings: + filteredFiles = unstructured.FilterVectorEmbeddingsFilePaths(filePaths) + default: + logger.Info("unknown artifact name, skipping", "name", artifact.Name) + } + + filesToSync = append(filesToSync, filteredFiles...) + } + + return filesToSync +} + // SetupWithManager sets up the controller with the Manager. func (r *UnstructuredDataProductReconciler) SetupWithManager(mgr ctrl.Manager) error { labelPredicate := controllerutils.ForceReconcilePredicate() @@ -344,6 +408,27 @@ func (r *UnstructuredDataProductReconciler) SetupWithManager(mgr ctrl.Manager) e Complete(r) } +// needsDocumentProcessing checks if there are raw files without corresponding converted files +func needsDocumentProcessing(filePaths []string) bool { + rawFiles := unstructured.FilterRawFilePaths(filePaths) + convertedFiles := unstructured.FilterConvertedFilePaths(filePaths) + + // Build map of converted raw file paths + convertedMap := make(map[string]bool) + for _, convertedPath := range convertedFiles { + rawPath := unstructured.GetRawFilePathFromConvertedFilePath(convertedPath) + convertedMap[rawPath] = true + } + + // Check if any raw file doesn't have a converted file + for _, rawFile := range rawFiles { + if !convertedMap[rawFile] { + return true + } + } + return false +} + func (r *UnstructuredDataProductReconciler) handleError(ctx context.Context, unstructuredDataProductCR *operatorv1alpha1.UnstructuredDataProduct, err error) (ctrl.Result, error) { logger := log.FromContext(ctx) logger.Error(err, "encountered error") diff --git a/pkg/snowflake/stage.go b/pkg/snowflake/stage.go index a1926649..056f0f7a 100644 --- a/pkg/snowflake/stage.go +++ b/pkg/snowflake/stage.go @@ -29,7 +29,9 @@ func (c *Client) Put(ctx context.Context, roleName, filePath, databaseName, func (c *Client) ListFilesFromStage(ctx context.Context, roleName, databaseName, schemaName, stageName string, resources any) error { - query := fmt.Sprintf("SELECT $1 AS data FROM @%s.%s.%s;", databaseName, schemaName, stageName) + query := fmt.Sprintf( + "SELECT METADATA$FILENAME AS filename, $1 AS data FROM @%s.%s.%s;", + databaseName, schemaName, stageName) rows, err := c.query(ctx, query, roleName) if err != nil { return err diff --git a/pkg/unstructured/destination.go b/pkg/unstructured/destination.go index a555016f..7bfc70f5 100644 --- a/pkg/unstructured/destination.go +++ b/pkg/unstructured/destination.go @@ -7,6 +7,7 @@ import ( "encoding/json" "fmt" "path/filepath" + "strings" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -17,138 +18,121 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" ) +const ( + // Default artifact paths for destination sync + DefaultProcessedDocumentsPath = "processed-documents" + DefaultChunksPath = "chunks" + DefaultVectorEmbeddingsPath = "vector-embeddings" +) + type Destination interface { // SyncFilesToDestination will sync the data to the destination SyncFilesToDestination(ctx context.Context, fs *filestore.FileStore, filePaths []string) error } type SnowflakeInternalStage struct { - Client *snowflake.Client - Role string - Database string - Schema string - Stage string + Client *snowflake.Client + Role string + Database string + Schema string + Stage string + ArtifactPathMap map[string]string // map[fileSuffix]artifactPath for stages structure } func (d *SnowflakeInternalStage) SyncFilesToDestination(ctx context.Context, - fs *filestore.FileStore, embeddingsFilePaths []string) error { + fs *filestore.FileStore, filePaths []string) error { logger := log.FromContext(ctx) - logger.Info("ingesting data to snowflake internal stage", "filePaths", embeddingsFilePaths) + logger.Info("ingesting data to snowflake internal stage", "filePaths", filePaths) - // get file name and uid for all files in the stage + // get all files currently in the stage type row struct { - Data string `db:"data"` + Filename string `db:"filename"` + Data string `db:"data"` } rows := []row{} err := d.Client.ListFilesFromStage(ctx, d.Role, d.Database, d.Schema, d.Stage, &rows) if err != nil { - // The SELECT $1 query fails when non-JSON files exist in the stage + // The query fails when non-JSON files exist in the stage // (e.g. manually uploaded test data). Log the error and proceed with // an empty stage — all local files will be re-uploaded (PUT uses // OVERWRITE=TRUE) and no extra-file cleanup will happen this cycle. logger.Error(err, "failed to list files from stage, will re-upload all files") } - // map of raw file path to embeddings file - embeddingsFilesInStage := make(map[string]EmbeddingsFile) - embeddingsFilesList := []string{} + // map of stage path to file data (JSON string) in stage + filesInStage := make(map[string]string) for _, row := range rows { - embeddingsFile := EmbeddingsFile{} - err := json.Unmarshal([]byte(row.Data), &embeddingsFile) - if err != nil { - logger.Info("skipping non-embeddings file in stage", "error", err) - continue - } - if embeddingsFile.ConvertedDocument != nil && - embeddingsFile.ConvertedDocument.Metadata != nil && - embeddingsFile.ConvertedDocument.Metadata.RawFilePath != "" { - embeddingsFilesInStage[embeddingsFile.ConvertedDocument.Metadata.RawFilePath] = embeddingsFile - embeddingsFilesList = append(embeddingsFilesList, embeddingsFile.ConvertedDocument.Metadata.RawFilePath) + // Normalize Snowflake filename to match our stagePath format + // Snowflake returns: path/to/file.json/file.json.gz + // We need: path/to/file.json + if row.Filename != "" { + normalizedPath := normalizeSnowflakeFilename(row.Filename) + filesInStage[normalizedPath] = row.Data } } - logger.Info("files currently in the snowflake internal stage", "files", embeddingsFilesList) - logger.Info("list of files in the local file store to be stored", "files", embeddingsFilePaths) + logger.Info("files currently in the snowflake internal stage", "count", len(filesInStage)) + logger.Info("list of files in the local file store to be stored", "files", filePaths) + errorList := []error{} - for _, embeddingsFilePathInFilestore := range embeddingsFilePaths { + for _, filePathInFilestore := range filePaths { // read the file from filestore - embeddingsFileBytesInFilestore, err := fs.Retrieve(ctx, embeddingsFilePathInFilestore) + fileBytesInFilestore, err := fs.Retrieve(ctx, filePathInFilestore) if err != nil { - logger.Error(err, "failed to retrieve file from filestore", "file", embeddingsFilePathInFilestore) - errorList = append(errorList, err) - } - - embeddingsFileInFilestore := EmbeddingsFile{} - err = json.Unmarshal(embeddingsFileBytesInFilestore, &embeddingsFileInFilestore) - if err != nil { - logger.Error(err, "failed to unmarshal file", "file", embeddingsFilePathInFilestore) + logger.Error(err, "failed to retrieve file from filestore", "file", filePathInFilestore) errorList = append(errorList, err) continue } - // check if embeddings file already exists in the stage - if _, exists := embeddingsFilesInStage[embeddingsFileInFilestore.ConvertedDocument.Metadata.RawFilePath]; exists { - logger.Info("file already exists in the stage", - "file", embeddingsFileInFilestore.ConvertedDocument.Metadata.RawFilePath) + // Calculate stage path for this file + stagePath := d.stagePathForFile(filePathInFilestore) - embeddingsFileInStage := embeddingsFilesInStage[embeddingsFileInFilestore.ConvertedDocument.Metadata.RawFilePath] + // Check if file already exists in the stage + if existingFileData, exists := filesInStage[stagePath]; exists { + logger.Info("file already exists in the stage", "file", stagePath) - // delete the file from the map as we will use this map to delete extra files from the stage - delete(embeddingsFilesInStage, embeddingsFileInFilestore.ConvertedDocument.Metadata.RawFilePath) + // Mark as processed (will be removed from deletion list) + delete(filesInStage, stagePath) - if embeddingsFileInStage.EmbeddingDocument.Metadata.Equal(embeddingsFileInFilestore.EmbeddingDocument.Metadata) { - logger.Info("file is already in the stage and the configuration is the same, skipping ...", - "file", embeddingsFileInFilestore.ConvertedDocument.Metadata.RawFilePath) - // nothing to do, file is already in the stage + // Compare metadata based on file type to see if upload is needed + if filesAreEqual(filePathInFilestore, existingFileData, fileBytesInFilestore) { + logger.Info("file metadata unchanged, skipping upload", "file", stagePath) continue } } - // upload the file to the stage - - // this is needed to pass the file stream to the snowflake client without creating a local temporary file - streamCtx := gosnowflake.WithFileStream(ctx, bytes.NewReader(embeddingsFileBytesInFilestore)) - + // Upload the file to the stage + streamCtx := gosnowflake.WithFileStream(ctx, bytes.NewReader(fileBytesInFilestore)) fileRows := []snowflake.UploadedFileStatus{} if err := d.Client.Put(streamCtx, d.Role, - // this file path does not matter as we are using the stream context to pass the file stream to the snowflake client - embeddingsFilePathInFilestore, - // database name is the database name + filePathInFilestore, d.Database, - // schema name is the data product name d.Schema, - // stage name is the internal stage name d.Stage, - // subpath is the file name - embeddingsFilePathInFilestore, + stagePath, &fileRows); err != nil { - logger.Error(err, "failed to upload file to snowflake internal stage", "file", embeddingsFilePathInFilestore) + logger.Error(err, "failed to upload file to snowflake internal stage", "file", stagePath) errorList = append(errorList, err) continue } - logger.Info("successfully uploaded file to snowflake internal stage", - "file", embeddingsFileInFilestore.ConvertedDocument.Metadata.RawFilePath) + logger.Info("successfully uploaded file to snowflake internal stage", "file", stagePath) if len(fileRows) == 0 { - logger.Error(fmt.Errorf("no file rows returned while uploading file to snowflake internal stage: %s", - embeddingsFileInFilestore.ConvertedDocument.Metadata.RawFilePath), - "file", embeddingsFileInFilestore.ConvertedDocument.Metadata.RawFilePath) + logger.Error(fmt.Errorf("no file rows returned while uploading file to snowflake internal stage: %s", stagePath), + "file", stagePath) errorList = append(errorList, - fmt.Errorf("no file rows returned while uploading file to snowflake internal stage: %s", - embeddingsFileInFilestore.ConvertedDocument.Metadata.RawFilePath)) + fmt.Errorf("no file rows returned while uploading file to snowflake internal stage: %s", stagePath)) continue } } - // delete extra files which are the files in the stage that are not present in the filestore - // at this point, whatever is left in the filesInStage map are the extra files that need to be deleted - - extraFiles := make([]string, 0, len(embeddingsFilesInStage)) - for extraFilePath := range embeddingsFilesInStage { - logger.Info("found extra file in the stage, marking for deletion", "file", extraFilePath) - extraFiles = append(extraFiles, extraFilePath) + // Delete extra files that are in the stage but not in filestore + extraFiles := make([]string, 0, len(filesInStage)) + for stagePath := range filesInStage { + logger.Info("found extra file in the stage, marking for deletion", "file", stagePath) + extraFiles = append(extraFiles, stagePath) } if err := d.Client.DeleteFilesFromStage(ctx, d.Role, d.Database, d.Schema, d.Stage, extraFiles); err != nil { @@ -163,12 +147,130 @@ func (d *SnowflakeInternalStage) SyncFilesToDestination(ctx context.Context, return nil } -// S3Destination syncs chunk files to an S3 bucket (e.g. LocalStack or AWS). +// normalizeSnowflakeFilename converts Snowflake's internal filename format to our expected format. +// Snowflake stores files as: path/to/file.json/file.json.gz +// We need: path/to/file.json +func normalizeSnowflakeFilename(snowflakeFilename string) string { + // Find last slash + lastSlashIdx := strings.LastIndex(snowflakeFilename, "/") + if lastSlashIdx == -1 { + // No slash, return as-is + return snowflakeFilename + } + + // Get the part before the last slash + pathWithoutLast := snowflakeFilename[:lastSlashIdx] + + // Get the last part (should be basename.gz) + lastPart := snowflakeFilename[lastSlashIdx+1:] + + // Remove .gz suffix if present + expectedBasename := strings.TrimSuffix(lastPart, ".gz") + + // Verify that pathWithoutLast ends with expectedBasename + if strings.HasSuffix(pathWithoutLast, expectedBasename) { + return pathWithoutLast + } + + // If it doesn't match expected pattern, return original + return snowflakeFilename +} + +// getArtifactPathForFile determines the artifact path for a file based on its suffix +// Returns the configured path from artifactPathMap or a default path +func getArtifactPathForFile(filePathInFilestore string, artifactPathMap map[string]string) string { + var suffix, defaultPath string + + if strings.HasSuffix(filePathInFilestore, ConvertedFileSuffix) { + suffix = ConvertedFileSuffix + defaultPath = DefaultProcessedDocumentsPath + } else if strings.HasSuffix(filePathInFilestore, ChunksFileSuffix) { + suffix = ChunksFileSuffix + defaultPath = DefaultChunksPath + } else if strings.HasSuffix(filePathInFilestore, VectorEmbeddingsFileSuffix) { + suffix = VectorEmbeddingsFileSuffix + defaultPath = DefaultVectorEmbeddingsPath + } else { + return "" + } + + if path, ok := artifactPathMap[suffix]; ok && path != "" { + return path + } + return defaultPath +} + +// filesAreEqual compares files based on their type and returns true if metadata is unchanged +func filesAreEqual(filePath string, existingData string, newData []byte) bool { + if strings.HasSuffix(filePath, VectorEmbeddingsFileSuffix) { + var existing, incoming EmbeddingsFile + if json.Unmarshal([]byte(existingData), &existing) == nil && + json.Unmarshal(newData, &incoming) == nil && + existing.EmbeddingDocument != nil && incoming.EmbeddingDocument != nil && + existing.EmbeddingDocument.Metadata != nil && incoming.EmbeddingDocument.Metadata != nil { + return existing.EmbeddingDocument.Metadata.Equal(incoming.EmbeddingDocument.Metadata) + } + } else if strings.HasSuffix(filePath, ChunksFileSuffix) { + var existing, incoming ChunksFile + if json.Unmarshal([]byte(existingData), &existing) == nil && + json.Unmarshal(newData, &incoming) == nil && + existing.ChunksDocument != nil && incoming.ChunksDocument != nil && + existing.ChunksDocument.Metadata != nil && incoming.ChunksDocument.Metadata != nil { + return existing.ChunksDocument.Metadata.Equal(incoming.ChunksDocument.Metadata) + } + } else if strings.HasSuffix(filePath, ConvertedFileSuffix) { + var existing, incoming ConvertedFile + if json.Unmarshal([]byte(existingData), &existing) == nil && + json.Unmarshal(newData, &incoming) == nil && + existing.ConvertedDocument != nil && incoming.ConvertedDocument != nil && + existing.ConvertedDocument.Metadata != nil && incoming.ConvertedDocument.Metadata != nil { + return existing.ConvertedDocument.Metadata.Equal(incoming.ConvertedDocument.Metadata) + } + } + return false +} + +// stagePathForFile builds the stage path with stages subdirectory for Snowflake +// Format: {dataProductPrefix}/stages/{artifact-path}/{filename} +// Example: testproduct/stages/chunks/test.pdf-chunks.json +func (d *SnowflakeInternalStage) stagePathForFile(filePathInFilestore string) string { + baseName := filepath.Base(filePathInFilestore) + + // Extract prefix from filePathInFilestore (everything before the last /) + // filePathInFilestore example: "testunstructureddataproduct/12.pdf-converted.json" + // We want to preserve: "testunstructureddataproduct" + var dataProductPrefix string + if idx := strings.LastIndex(filePathInFilestore, "/"); idx != -1 { + dataProductPrefix = filePathInFilestore[:idx] + } + + // Determine artifact path based on file suffix + artifactPath := getArtifactPathForFile(filePathInFilestore, d.ArtifactPathMap) + + // Build path: {dataProductPrefix}/stages/{artifact-path}/{filename} + var stagePath string + if dataProductPrefix != "" && artifactPath != "" { + stagePath = filepath.Join(dataProductPrefix, "stages", artifactPath, baseName) + } else if artifactPath != "" { + stagePath = filepath.Join("stages", artifactPath, baseName) + } else { + stagePath = baseName + } + + if filepath.Separator != '/' { + stagePath = filepath.ToSlash(stagePath) + } + return stagePath +} + +// S3Destination syncs processed JSON artifacts to an S3 bucket (converted, chunks, +// and vector embeddings — whichever paths the controller passes, based on artifacts configuration). type S3Destination struct { S3Client *s3.Client Bucket string Prefix string - DataProductName string // used as default prefix when Prefix is empty (CR name) + DataProductName string // used as default prefix when Prefix is empty (CR name) + ArtifactPathMap map[string]string // map[fileSuffix]artifactPath for stages structure } func (d *S3Destination) getPrefix() string { @@ -178,15 +280,23 @@ func (d *S3Destination) getPrefix() string { return d.DataProductName } -// s3KeyForChunksFile returns the S3 object key for a chunks file path -// When Prefix is not set, uses DataProductName/file_name as default. -func (d *S3Destination) s3KeyForChunksFile(chunksFilePath string) string { - baseName := filepath.Base(chunksFilePath) +// s3KeyForFile maps a filestore object key to an S3 object key. +// Format: {prefix}/stages/{artifact-path}/{filename} +func (d *S3Destination) s3KeyForFile(filePathInFilestore string) string { prefix := d.getPrefix() - key := baseName + baseName := filepath.Base(filePathInFilestore) + + // Determine artifact path based on file suffix + artifactPath := getArtifactPathForFile(filePathInFilestore, d.ArtifactPathMap) + + // Build path: prefix/stages/{artifact-path}/{filename} + var key string if prefix != "" { - key = filepath.Join(prefix, baseName) + key = filepath.Join(prefix, "stages", artifactPath, baseName) + } else { + key = filepath.Join("stages", artifactPath, baseName) } + if filepath.Separator != '/' { key = filepath.ToSlash(key) } @@ -194,10 +304,10 @@ func (d *S3Destination) s3KeyForChunksFile(chunksFilePath string) string { } func (d *S3Destination) SyncFilesToDestination(ctx context.Context, fs *filestore.FileStore, - chunksFilePaths []string) error { + filePaths []string) error { logger := log.FromContext(ctx) logger.Info("syncing data to S3 destination", - "bucket", d.Bucket, "prefix", d.getPrefix(), "filePaths", chunksFilePaths) + "bucket", d.Bucket, "prefix", d.getPrefix(), "filePaths", filePaths) s3Client := d.S3Client if s3Client == nil { @@ -226,14 +336,14 @@ func (d *S3Destination) SyncFilesToDestination(ctx context.Context, fs *filestor } } - for _, chunksFilePath := range chunksFilePaths { - data, err := fs.Retrieve(ctx, chunksFilePath) + for _, filePath := range filePaths { + data, err := fs.Retrieve(ctx, filePath) if err != nil { - logger.Error(err, "failed to retrieve file from filestore", "file", chunksFilePath) - return fmt.Errorf("retrieve %s: %w", chunksFilePath, err) + logger.Error(err, "failed to retrieve file from filestore", "file", filePath) + return fmt.Errorf("retrieve %s: %w", filePath, err) } - key := d.s3KeyForChunksFile(chunksFilePath) + key := d.s3KeyForFile(filePath) delete(keysInDestination, key) // Check if file exists and compare ETag (MD5) to skip unchanged files @@ -246,7 +356,7 @@ func (d *S3Destination) SyncFilesToDestination(ctx context.Context, fs *filestor localMD5 := fmt.Sprintf("\"%x\"", md5.Sum(data)) if localMD5 == *headResp.ETag { logger.Info("file unchanged, skipping upload", - "file", chunksFilePath, "key", key) + "file", filePath, "key", key) continue } } diff --git a/test/utils/utils_function.go b/test/utils/utils_function.go index 510f92ac..c1acd33a 100644 --- a/test/utils/utils_function.go +++ b/test/utils/utils_function.go @@ -87,6 +87,13 @@ func GetUnstructuredDataProductResource(name, namespace string) v1alpha1.Unstruc }, DestinationConfig: v1alpha1.DestinationConfig{ Type: v1alpha1.DestinationTypeInternalStage, + Artifacts: []v1alpha1.ArtifactConfig{ + { + Type: "stage", + Name: "chunksGeneratorConfig", + Path: "chunks", + }, + }, SnowflakeInternalStageConfig: v1alpha1.SnowflakeInternalStageConfig{ Database: "unstructured_db", Schema: "unstructured",