-
Notifications
You must be signed in to change notification settings - Fork 146
feat(table): add support for merge-on-read delete #721
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1136,7 +1136,7 @@ func (w *ManifestWriter) Close() error { | |
| return w.writer.Close() | ||
| } | ||
|
|
||
| func (w *ManifestWriter) ToManifestFile(location string, length int64) (ManifestFile, error) { | ||
| func (w *ManifestWriter) ToManifestFile(location string, length int64, content ManifestContent) (ManifestFile, error) { | ||
| if err := w.Close(); err != nil { | ||
| return nil, err | ||
| } | ||
|
|
@@ -1155,7 +1155,7 @@ func (w *ManifestWriter) ToManifestFile(location string, length int64) (Manifest | |
| Path: location, | ||
| Len: length, | ||
| SpecID: int32(w.spec.id), | ||
| Content: ManifestContentData, | ||
| Content: content, | ||
| SeqNumber: -1, | ||
| MinSeqNumber: w.minSeqNum, | ||
| AddedSnapshotID: w.snapshotID, | ||
|
|
@@ -1500,7 +1500,7 @@ func WriteManifest( | |
| return nil, err | ||
| } | ||
|
|
||
| return w.ToManifestFile(filename, cnt.Count) | ||
| return w.ToManifestFile(filename, cnt.Count, ManifestContentData) | ||
| } | ||
|
|
||
| // ManifestEntryStatus defines constants for the entry status of | ||
|
|
@@ -2314,5 +2314,5 @@ type ManifestEntry interface { | |
|
|
||
| var PositionalDeleteSchema = NewSchema(0, | ||
| NestedField{ID: 2147483546, Type: PrimitiveTypes.String, Name: "file_path", Required: true}, | ||
| NestedField{ID: 2147483545, Type: PrimitiveTypes.Int32, Name: "pos", Required: true}, | ||
| NestedField{ID: 2147483545, Type: PrimitiveTypes.Int64, Name: "pos", Required: true}, | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The spec says that |
||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ package table | |
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "io" | ||
| "iter" | ||
| "strconv" | ||
|
|
@@ -42,6 +43,8 @@ const ( | |
| ScanOptionArrowUseLargeTypes = "arrow.use_large_types" | ||
| ) | ||
|
|
||
| var PositionalDeleteArrowSchema, _ = SchemaToArrowSchema(iceberg.PositionalDeleteSchema, nil, true, false) | ||
|
|
||
| type ( | ||
| positionDeletes = []*arrow.Chunked | ||
| perFilePosDeletes = map[string]positionDeletes | ||
|
|
@@ -189,6 +192,54 @@ func processPositionalDeletes(ctx context.Context, deletes set[int64]) recProces | |
| } | ||
| } | ||
|
|
||
| // enrichRecordsWithPosDeleteFields enriches a RecordBatch with the columns declared in the PositionalDeleteArrowSchema | ||
| // so that during the pipeline filtering stages that sheds filtered out records, we still have a way to | ||
| // preserve the original position of those records. | ||
| func enrichRecordsWithPosDeleteFields(ctx context.Context, filePath iceberg.DataFile) recProcessFn { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add focused unit tests for enrichRecordsWithPosDeleteFields:
|
||
| nextIdx, mem := int64(0), compute.GetAllocator(ctx) | ||
|
|
||
| return func(inData arrow.RecordBatch) (outData arrow.RecordBatch, err error) { | ||
| defer inData.Release() | ||
|
|
||
| schema := inData.Schema() | ||
| fieldIdx := schema.NumFields() | ||
| filePathField, ok := PositionalDeleteArrowSchema.FieldsByName("file_path") | ||
| if !ok { | ||
| return nil, errors.New("position delete schema should have required field 'file_path'") | ||
| } | ||
| posField, ok := PositionalDeleteArrowSchema.FieldsByName("pos") | ||
| if !ok { | ||
| return nil, errors.New("position delete schema should have required field 'pos'") | ||
| } | ||
| schema, err = schema.AddField(fieldIdx, filePathField[0]) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| schema, err = schema.AddField(fieldIdx+1, posField[0]) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| filePathBuilder := array.NewStringBuilder(mem) | ||
| defer filePathBuilder.Release() | ||
| posBuilder := array.NewInt64Builder(mem) | ||
| defer posBuilder.Release() | ||
|
|
||
| startPos := nextIdx | ||
| nextIdx += inData.NumRows() | ||
|
|
||
| for i := startPos; i < nextIdx; i++ { | ||
| filePathBuilder.Append(filePath.FilePath()) | ||
| posBuilder.Append(i) | ||
| } | ||
|
|
||
| columns := append(inData.Columns(), filePathBuilder.NewArray(), posBuilder.NewArray()) | ||
| outData = array.NewRecordBatch(schema, columns, inData.NumRows()) | ||
|
|
||
| return outData, err | ||
| } | ||
| } | ||
|
|
||
| func filterRecords(ctx context.Context, recordFilter expr.Expression) recProcessFn { | ||
| return func(rec arrow.RecordBatch) (arrow.RecordBatch, error) { | ||
| defer rec.Release() | ||
|
|
@@ -454,6 +505,78 @@ func (as *arrowScan) recordsFromTask(ctx context.Context, task internal.Enumerat | |
| return err | ||
| } | ||
|
|
||
| func (as *arrowScan) producePosDeletesFromTask(ctx context.Context, task internal.Enumerated[FileScanTask], positionalDeletes positionDeletes, out chan<- enumeratedRecord) (err error) { | ||
| defer func() { | ||
| if err != nil { | ||
| out <- enumeratedRecord{Task: task, Err: err} | ||
| } | ||
| }() | ||
|
|
||
| var ( | ||
| rdr internal.FileReader | ||
| iceSchema *iceberg.Schema | ||
| colIndices []int | ||
| filterFunc recProcessFn | ||
| dropFile bool | ||
| ) | ||
|
|
||
| iceSchema, colIndices, rdr, err = as.prepareToRead(ctx, task.Value.File) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| defer iceinternal.CheckedClose(rdr, &err) | ||
|
|
||
| fields := append(iceSchema.Fields(), iceberg.PositionalDeleteSchema.Fields()...) | ||
| enrichedIcebergSchema := iceberg.NewSchema(iceSchema.ID+1, fields...) | ||
|
|
||
| pipeline := make([]recProcessFn, 0, 2) | ||
| pipeline = append(pipeline, enrichRecordsWithPosDeleteFields(ctx, task.Value.File)) | ||
| if len(positionalDeletes) > 0 { | ||
| deletes := set[int64]{} | ||
| for _, chunk := range positionalDeletes { | ||
| for _, a := range chunk.Chunks() { | ||
| for _, v := range a.(*array.Int64).Int64Values() { | ||
| deletes[v] = struct{}{} | ||
| } | ||
| } | ||
| } | ||
|
|
||
| pipeline = append(pipeline, processPositionalDeletes(ctx, deletes)) | ||
| } | ||
|
|
||
| filterFunc, dropFile, err = as.getRecordFilter(ctx, iceSchema) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Nothing to delete in a dropped file | ||
| if dropFile { | ||
| var emptySchema *arrow.Schema | ||
| emptySchema, err = SchemaToArrowSchema(as.projectedSchema, nil, false, as.useLargeTypes) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| out <- enumeratedRecord{Task: task, Record: internal.Enumerated[arrow.RecordBatch]{ | ||
| Value: array.NewRecordBatch(emptySchema, nil, 0), Index: 0, Last: true, | ||
| }} | ||
|
|
||
| return err | ||
| } | ||
|
|
||
| if filterFunc != nil { | ||
| pipeline = append(pipeline, filterFunc) | ||
| } | ||
| pipeline = append(pipeline, func(r arrow.RecordBatch) (arrow.RecordBatch, error) { | ||
| defer r.Release() | ||
|
|
||
| return ToRequestedSchema(ctx, iceberg.PositionalDeleteSchema, enrichedIcebergSchema, r, false, true, as.useLargeTypes) | ||
| }) | ||
|
|
||
| err = as.processRecords(ctx, task, iceSchema, rdr, colIndices, pipeline, out) | ||
|
|
||
| return err | ||
| } | ||
|
|
||
| func createIterator(ctx context.Context, numWorkers uint, records <-chan enumeratedRecord, deletesPerFile perFilePosDeletes, cancel context.CancelCauseFunc, rowLimit int64) iter.Seq2[arrow.RecordBatch, error] { | ||
| isBeforeAny := func(batch enumeratedRecord) bool { | ||
| return batch.Task.Index < 0 | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1258,7 +1258,7 @@ func filesToDataFiles(ctx context.Context, fileIO iceio.IO, meta *MetadataBuilde | |
| } | ||
| } | ||
|
|
||
| df := statistics.ToDataFile(currentSchema, currentSpec, filePath, iceberg.ParquetFile, rdr.SourceFileSize(), partitionValues) | ||
| df := statistics.ToDataFile(currentSchema, currentSpec, filePath, iceberg.ParquetFile, iceberg.EntryContentData, rdr.SourceFileSize(), partitionValues) | ||
| if !yield(df, nil) { | ||
| return | ||
| } | ||
|
|
@@ -1335,6 +1335,9 @@ func recordsToDataFiles(ctx context.Context, rootLocation string, meta *Metadata | |
| panic(fmt.Errorf("%w: cannot write files without a current spec", err)) | ||
| } | ||
|
|
||
| cw := newConcurrentDataFileWriter(func(rootLocation string, fs iceio.WriteFileIO, meta *MetadataBuilder, props iceberg.Properties, opts ...dataFileWriterOption) (dataFileWriter, error) { | ||
| return newDataFileWriter(rootLocation, fs, meta, props, opts...) | ||
| }) | ||
| nextCount, stopCount := iter.Pull(args.counter) | ||
| if currentSpec.IsUnpartitioned() { | ||
| tasks := func(yield func(WriteTask) bool) { | ||
|
|
@@ -1358,13 +1361,81 @@ func recordsToDataFiles(ctx context.Context, rootLocation string, meta *Metadata | |
| } | ||
| } | ||
|
|
||
| return writeFiles(ctx, rootLocation, args.fs, meta, nil, tasks) | ||
| } else { | ||
| partitionWriter := newPartitionedFanoutWriter(*currentSpec, meta.CurrentSchema(), args.itr) | ||
| rollingDataWriters := NewWriterFactory(rootLocation, args, meta, taskSchema, targetFileSize) | ||
| partitionWriter.writers = &rollingDataWriters | ||
| workers := config.EnvConfig.MaxWorkers | ||
| return cw.writeFiles(ctx, rootLocation, args.fs, meta, meta.props, nil, tasks) | ||
| } | ||
|
|
||
| partitionWriter := newPartitionedFanoutWriter(*currentSpec, cw, meta.CurrentSchema(), args.itr) | ||
| partitionWriter.writers = newWriterFactory(rootLocation, args, meta, taskSchema, targetFileSize) | ||
| workers := config.EnvConfig.MaxWorkers | ||
|
|
||
| return partitionWriter.Write(ctx, workers) | ||
| } | ||
|
|
||
| func positionDeleteRecordsToDataFiles(ctx context.Context, rootLocation string, meta *MetadataBuilder, partitionDataPerFile map[string]map[int]any, args recordWritingArgs) (ret iter.Seq2[iceberg.DataFile, error]) { | ||
| if args.counter == nil { | ||
| args.counter = internal.Counter(0) | ||
| } | ||
|
|
||
| defer func() { | ||
| if r := recover(); r != nil { | ||
| var err error | ||
| switch e := r.(type) { | ||
| case string: | ||
| err = fmt.Errorf("error encountered during position delete file writing %s", e) | ||
| case error: | ||
| err = fmt.Errorf("error encountered during position delete file writing: %w", e) | ||
| } | ||
| ret = func(yield func(iceberg.DataFile, error) bool) { | ||
| yield(nil, err) | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| return partitionWriter.Write(ctx, workers) | ||
| if args.writeUUID == nil { | ||
| u := uuid.Must(uuid.NewRandom()) | ||
| args.writeUUID = &u | ||
| } | ||
|
|
||
| targetFileSize := int64(meta.props.GetInt(WriteTargetFileSizeBytesKey, | ||
| WriteTargetFileSizeBytesDefault)) | ||
|
|
||
| currentSpec, err := meta.CurrentSpec() | ||
| if err != nil || currentSpec == nil { | ||
| panic(fmt.Errorf("%w: cannot write files without a current spec", err)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we avoid panic/recover for expected failures here? |
||
| } | ||
|
|
||
| cw := newConcurrentDataFileWriter(func(rootLocation string, fs iceio.WriteFileIO, meta *MetadataBuilder, props iceberg.Properties, opts ...dataFileWriterOption) (dataFileWriter, error) { | ||
| return newPositionDeleteWriter(rootLocation, fs, meta, props, opts...) | ||
| }, withSchemaSanitization(false)) | ||
| nextCount, stopCount := iter.Pull(args.counter) | ||
| if currentSpec.IsUnpartitioned() { | ||
| tasks := func(yield func(WriteTask) bool) { | ||
| defer stopCount() | ||
|
|
||
| fileCount := 0 | ||
| for batch := range binPackRecords(args.itr, defaultBinPackLookback, targetFileSize) { | ||
| cnt, _ := nextCount() | ||
| fileCount++ | ||
| t := WriteTask{ | ||
| Uuid: *args.writeUUID, | ||
| ID: cnt, | ||
| PartitionID: iceberg.UnpartitionedSpec.ID(), | ||
| FileCount: fileCount, | ||
| Schema: iceberg.PositionalDeleteSchema, | ||
| Batches: batch, | ||
| } | ||
| if !yield(t) { | ||
| return | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return cw.writeFiles(ctx, rootLocation, args.fs, meta, meta.props, nil, tasks) | ||
| } | ||
|
|
||
| partitionWriter := newPositionDeletePartitionedFanoutWriter(*currentSpec, cw, partitionDataPerFile, args.itr) | ||
| partitionWriter.writers = newWriterFactory(rootLocation, args, meta, iceberg.PositionalDeleteSchema, targetFileSize) | ||
| workers := config.EnvConfig.MaxWorkers | ||
|
|
||
| return partitionWriter.Write(ctx, workers) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think table/internal.DataFileStatistics.ToDataFile signature changes are acceptable (it's internal so who care?), but ManifestWriter.ToManifestFile is externally visible and now requires a new content arg.
This is backward incompatible change so 2 questions: