Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions api/v1alpha1/unstructureddataproduct_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,17 @@ type S3Config struct {
Prefix string `json:"prefix,omitempty"`
}

// ArtifactConfig defines which processing stage artifacts to sync and their destination paths
type ArtifactConfig struct {
Type string `json:"type,omitempty"` // e.g., "stage"
Name string `json:"name,omitempty"` // e.g., "documentProcessorConfig", "chunksGeneratorConfig", "vectorEmbeddingsGeneratorConfig"
Path string `json:"path,omitempty"` // e.g., "processed-documents", "chunks", "vector-embeddings"
}

// DestinationConfig defines where to write processed data (e.g. Snowflake internal stage or S3).
type DestinationConfig struct {
Type UnstructuredDataType `json:"type,omitempty"`
Artifacts []ArtifactConfig `json:"artifacts,omitempty"`
SnowflakeInternalStageConfig SnowflakeInternalStageConfig `json:"snowflakeInternalStageConfig,omitempty"`
S3DestinationConfig S3Config `json:"s3DestinationConfig,omitempty"`
}
Expand Down
22 changes: 21 additions & 1 deletion api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,19 @@ spec:
description: DestinationConfig defines where to write processed data
(e.g. Snowflake internal stage or S3).
properties:
artifacts:
items:
description: ArtifactConfig defines which processing stage artifacts
to sync and their destination paths
properties:
name:
type: string
path:
type: string
type:
type: string
type: object
type: array
s3DestinationConfig:
description: S3Config configures an S3 bucket and optional prefix.
properties:
Expand Down
21 changes: 21 additions & 0 deletions internal/controller/chunksgenerator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func (r *ChunksGeneratorReconciler) Reconcile(ctx context.Context, req ctrl.Requ
skippedFiles := []string{}

convertedFilePaths := unstructured.FilterConvertedFilePaths(filePaths)
hadFilesToProcess := len(convertedFilePaths) > 0

for _, convertedFilePath := range convertedFilePaths {
logger.Info("processing converted file", "file", convertedFilePath)
Expand Down Expand Up @@ -173,6 +174,26 @@ func (r *ChunksGeneratorReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return r.handleError(ctx, chunksGeneratorCR, err)
}

// trigger UnstructuredDataProduct if:
// Had files to process and successfully processed (no errors/skips)
if hadFilesToProcess && len(chunkingErrors) == 0 && len(skippedFiles) == 0 {
unstructuredDataProductKey := client.ObjectKey{
Namespace: chunksGeneratorCR.Namespace,
Name: chunksGeneratorCR.Spec.DataProduct,
}
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
unstructuredDataProductCR := &operatorv1alpha1.UnstructuredDataProduct{}
if err := r.Get(ctx, unstructuredDataProductKey, unstructuredDataProductCR); err != nil {
return err
}
return controllerutils.AddForceReconcileLabel(ctx, r.Client, unstructuredDataProductCR)
}); err != nil {
logger.Error(err, "failed to add force reconcile label to UnstructuredDataProduct CR")
return r.handleError(ctx, chunksGeneratorCR, err)
}
logger.Info("triggered UnstructuredDataProduct to sync chunks files")
}

successMessage := fmt.Sprintf("successfully reconciled chunks generator: %s", chunksGeneratorCR.Name)
if err := controllerutils.StatusUpdateWithRetry(ctx, r.Client, chunksGeneratorKey, func() client.Object { return &operatorv1alpha1.ChunksGenerator{} }, func(obj client.Object) {
obj.(*operatorv1alpha1.ChunksGenerator).UpdateStatus(successMessage, nil)
Expand Down
23 changes: 23 additions & 0 deletions internal/controller/documentprocessor_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ func (r *DocumentProcessorReconciler) Reconcile(ctx context.Context, req ctrl.Re
}
r.fileStore = fs

// track if jobs existed at the start of reconcile to know if files were created
hadJobsAtStart := len(documentProcessorCR.Status.Jobs) > 0

// first, let's figure out the jobs that are currently running
jobProcessingErrors := []error{}
for _, job := range documentProcessorCR.Status.Jobs {
Expand Down Expand Up @@ -186,6 +189,26 @@ func (r *DocumentProcessorReconciler) Reconcile(ctx context.Context, req ctrl.Re

logger.Info("all jobs are completed, no need to requeue")

// trigger UnstructuredDataProduct if:
// Jobs existed at start and are now complete (files were created this reconcile)
if hadJobsAtStart {
unstructuredDataProductKey := client.ObjectKey{
Namespace: documentProcessorCR.Namespace,
Name: documentProcessorCR.Spec.DataProduct,
}
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
unstructuredDataProductCR := &operatorv1alpha1.UnstructuredDataProduct{}
if err := r.Get(ctx, unstructuredDataProductKey, unstructuredDataProductCR); err != nil {
return err
}
return controllerutils.AddForceReconcileLabel(ctx, r.Client, unstructuredDataProductCR)
}); err != nil {
logger.Error(err, "failed to add force reconcile label to UnstructuredDataProduct CR")
return r.handleError(ctx, documentProcessorCR, err)
}
logger.Info("triggered UnstructuredDataProduct to sync converted files")
}

successMessage := fmt.Sprintf("successfully reconciled document processor: %s", latestDocumentProcessorCR.Name)
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
res := &operatorv1alpha1.DocumentProcessor{}
Expand Down
151 changes: 118 additions & 33 deletions internal/controller/unstructureddataproduct_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ import (

const (
UnstructuredDataProductControllerName = "UnstructuredDataProduct"
ArtifactNameDocumentProcessor = "documentProcessorConfig"
ArtifactNameChunksGenerator = "chunksGeneratorConfig"
ArtifactNameVectorEmbeddings = "vectorEmbeddingsGeneratorConfig"
)

var (
Expand Down Expand Up @@ -227,20 +230,32 @@ func (r *UnstructuredDataProductReconciler) Reconcile(ctx context.Context, req c
}
logger.Info("successfully stored files to filestore", "files", storedFiles)

// add force reconcile label to the DocumentProcessor CR
documentProcessorKey := client.ObjectKey{
Namespace: unstructuredDataProductCR.Namespace,
Name: dataProductName,
// list files to check if processing is needed
filePaths, err := r.fileStore.ListFilesInPath(ctx, dataProductName)
if err != nil {
logger.Error(err, "failed to list files in path for checking processing needs")
return r.handleError(ctx, unstructuredDataProductCR, err)
}
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
latestDocumentProcessorCR := &operatorv1alpha1.DocumentProcessor{}
if err := r.Get(ctx, documentProcessorKey, latestDocumentProcessorCR); err != nil {
return err

// conditionally trigger DocumentProcessor only if there are unprocessed raw files
if needsDocumentProcessing(filePaths) {
documentProcessorKey := client.ObjectKey{
Namespace: unstructuredDataProductCR.Namespace,
Name: dataProductName,
}
return controllerutils.AddForceReconcileLabel(ctx, r.Client, latestDocumentProcessorCR)
}); err != nil {
logger.Error(err, "failed to add force reconcile label to DocumentProcessor CR")
return r.handleError(ctx, unstructuredDataProductCR, err)
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
latestDocumentProcessorCR := &operatorv1alpha1.DocumentProcessor{}
if err := r.Get(ctx, documentProcessorKey, latestDocumentProcessorCR); err != nil {
return err
}
return controllerutils.AddForceReconcileLabel(ctx, r.Client, latestDocumentProcessorCR)
}); err != nil {
logger.Error(err, "failed to add force reconcile label to DocumentProcessor CR")
return r.handleError(ctx, unstructuredDataProductCR, err)
}
logger.Info("triggered DocumentProcessor for unprocessed files")
} else {
logger.Info("no unprocessed files, skipping DocumentProcessor trigger")
}

// Setup destination
Expand All @@ -254,7 +269,7 @@ func (r *UnstructuredDataProductReconciler) Reconcile(ctx context.Context, req c
}
case operatorv1alpha1.TypeS3:
var err error
destination, err = setupS3Destination(unstructuredDataProductCR, dataProductName)
destination, err = setupS3Destination(ctx, unstructuredDataProductCR, dataProductName)
if err != nil {
if IsAWSClientNotInitializedError(err) {
logger.Info("ControllerConfig has not initialized destination S3 client yet, will try again in a bit ...")
Expand All @@ -268,22 +283,23 @@ func (r *UnstructuredDataProductReconciler) Reconcile(ctx context.Context, req c
return r.handleError(ctx, unstructuredDataProductCR, err)
}

// list all files in the filestore for the data product
filePaths, err := r.fileStore.ListFilesInPath(ctx, dataProductName)
if err != nil {
logger.Error(err, "failed to list files in path")
return r.handleError(ctx, unstructuredDataProductCR, err)
}
// extract the vector embeddings files that are to be ingested to destination
filterEmbeddingsFiles := unstructured.FilterVectorEmbeddingsFilePaths(filePaths)
logger.Info("files to ingest to destination", "files", filterEmbeddingsFiles)
// filePaths already loaded earlier for needsDocumentProcessing check

// ingest the embeddings files to destination
if err := destination.SyncFilesToDestination(ctx, r.fileStore, filterEmbeddingsFiles); err != nil {
logger.Error(err, "failed to ingest embeddings files to destination")
return r.handleError(ctx, unstructuredDataProductCR, err)
// collect files to sync based on syncToDestination flags
filesToSync := buildDestinationSyncFilePaths(ctx, unstructuredDataProductCR, filePaths)

logger.Info("files to ingest to destination", "files", filesToSync, "total", len(filesToSync))

// ingest the files to destination
if len(filesToSync) > 0 {
if err := destination.SyncFilesToDestination(ctx, r.fileStore, filesToSync); err != nil {
logger.Error(err, "failed to ingest files to destination")
return r.handleError(ctx, unstructuredDataProductCR, err)
}
logger.Info("successfully ingested files to destination")
} else {
logger.Info("no files to sync to destination based on syncToDestination flags")
}
logger.Info("successfully ingested embeddings files to destination")

// all done, let's update the status to ready
successMessage := fmt.Sprintf("successfully reconciled unstructured data product: %s", dataProductName)
Expand All @@ -302,6 +318,25 @@ func (r *UnstructuredDataProductReconciler) Reconcile(ctx context.Context, req c
}, nil
}

// buildArtifactPathMap builds a map from file suffix to artifact path based on the artifacts configuration
func buildArtifactPathMap(ctx context.Context, artifacts []operatorv1alpha1.ArtifactConfig) map[string]string {
logger := log.FromContext(ctx)
artifactPathMap := make(map[string]string)
for _, artifact := range artifacts {
switch artifact.Name {
case ArtifactNameDocumentProcessor:
artifactPathMap[unstructured.ConvertedFileSuffix] = artifact.Path
case ArtifactNameChunksGenerator:
artifactPathMap[unstructured.ChunksFileSuffix] = artifact.Path
case ArtifactNameVectorEmbeddings:
artifactPathMap[unstructured.VectorEmbeddingsFileSuffix] = artifact.Path
default:
logger.Info("unknown artifact name, skipping", "name", artifact.Name)
}
}
return artifactPathMap
}

// setupSnowflakeDestination returns a Snowflake internal stage destination for the given CR.
func (r *UnstructuredDataProductReconciler) setupSnowflakeDestination(ctx context.Context, unstructuredDataProductCR *operatorv1alpha1.UnstructuredDataProduct) (unstructured.Destination, error) {
logger := log.FromContext(ctx)
Expand All @@ -311,30 +346,59 @@ func (r *UnstructuredDataProductReconciler) setupSnowflakeDestination(ctx contex
return nil, err
}
r.sf = sf

return &unstructured.SnowflakeInternalStage{
Client: sf,
Role: sf.GetRole(),
Stage: unstructuredDataProductCR.Spec.DestinationConfig.SnowflakeInternalStageConfig.Stage,
Database: unstructuredDataProductCR.Spec.DestinationConfig.SnowflakeInternalStageConfig.Database,
Schema: unstructuredDataProductCR.Spec.DestinationConfig.SnowflakeInternalStageConfig.Schema,
Client: sf,
Role: sf.GetRole(),
Stage: unstructuredDataProductCR.Spec.DestinationConfig.SnowflakeInternalStageConfig.Stage,
Database: unstructuredDataProductCR.Spec.DestinationConfig.SnowflakeInternalStageConfig.Database,
Schema: unstructuredDataProductCR.Spec.DestinationConfig.SnowflakeInternalStageConfig.Schema,
ArtifactPathMap: buildArtifactPathMap(ctx, unstructuredDataProductCR.Spec.DestinationConfig.Artifacts),
}, nil
}

// setupS3Destination returns an S3 destination for the given CR
func setupS3Destination(unstructuredDataProductCR *operatorv1alpha1.UnstructuredDataProduct, dataProductName string) (unstructured.Destination, error) {
func setupS3Destination(ctx context.Context, unstructuredDataProductCR *operatorv1alpha1.UnstructuredDataProduct, dataProductName string) (unstructured.Destination, error) {
destCfg := unstructuredDataProductCR.Spec.DestinationConfig.S3DestinationConfig
destinationS3Client, err := awsclienthandler.GetDestinationS3Client()
if err != nil {
return nil, err
}

return &unstructured.S3Destination{
S3Client: destinationS3Client,
Bucket: destCfg.Bucket,
Prefix: destCfg.Prefix,
DataProductName: dataProductName,
ArtifactPathMap: buildArtifactPathMap(ctx, unstructuredDataProductCR.Spec.DestinationConfig.Artifacts),
}, nil
}

func buildDestinationSyncFilePaths(ctx context.Context, unstructuredDataProductCR *operatorv1alpha1.UnstructuredDataProduct, filePaths []string) []string {
var filesToSync []string
logger := log.FromContext(ctx)

// Iterate through artifacts configuration to determine which files to sync
for _, artifact := range unstructuredDataProductCR.Spec.DestinationConfig.Artifacts {
var filteredFiles []string

switch artifact.Name {
case ArtifactNameDocumentProcessor:
filteredFiles = unstructured.FilterConvertedFilePaths(filePaths)
case ArtifactNameChunksGenerator:
filteredFiles = unstructured.FilterChunksFilePaths(filePaths)
case ArtifactNameVectorEmbeddings:
filteredFiles = unstructured.FilterVectorEmbeddingsFilePaths(filePaths)
default:
logger.Info("unknown artifact name, skipping", "name", artifact.Name)
}

filesToSync = append(filesToSync, filteredFiles...)
}

return filesToSync
}

// SetupWithManager sets up the controller with the Manager.
func (r *UnstructuredDataProductReconciler) SetupWithManager(mgr ctrl.Manager) error {
labelPredicate := controllerutils.ForceReconcilePredicate()
Expand All @@ -344,6 +408,27 @@ func (r *UnstructuredDataProductReconciler) SetupWithManager(mgr ctrl.Manager) e
Complete(r)
}

// needsDocumentProcessing checks if there are raw files without corresponding converted files
func needsDocumentProcessing(filePaths []string) bool {
rawFiles := unstructured.FilterRawFilePaths(filePaths)
convertedFiles := unstructured.FilterConvertedFilePaths(filePaths)

// Build map of converted raw file paths
convertedMap := make(map[string]bool)
for _, convertedPath := range convertedFiles {
rawPath := unstructured.GetRawFilePathFromConvertedFilePath(convertedPath)
convertedMap[rawPath] = true
}

// Check if any raw file doesn't have a converted file
for _, rawFile := range rawFiles {
if !convertedMap[rawFile] {
return true
}
}
return false
}

func (r *UnstructuredDataProductReconciler) handleError(ctx context.Context, unstructuredDataProductCR *operatorv1alpha1.UnstructuredDataProduct, err error) (ctrl.Result, error) {
logger := log.FromContext(ctx)
logger.Error(err, "encountered error")
Expand Down
4 changes: 3 additions & 1 deletion pkg/snowflake/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ func (c *Client) Put(ctx context.Context, roleName, filePath, databaseName,

func (c *Client) ListFilesFromStage(ctx context.Context, roleName, databaseName,
schemaName, stageName string, resources any) error {
query := fmt.Sprintf("SELECT $1 AS data FROM @%s.%s.%s;", databaseName, schemaName, stageName)
query := fmt.Sprintf(
"SELECT METADATA$FILENAME AS filename, $1 AS data FROM @%s.%s.%s;",
databaseName, schemaName, stageName)
rows, err := c.query(ctx, query, roleName)
if err != nil {
return err
Expand Down
Loading
Loading