From 6d41651052e30a70f18f7a050fd9c81b679f674a Mon Sep 17 00:00:00 2001 From: Alex Normand Date: Fri, 6 Feb 2026 14:13:30 -0800 Subject: [PATCH] feat(table) add support for merge-on-read delete --- README.md | 68 ++--- manifest.go | 8 +- table/arrow_scanner.go | 123 +++++++++ table/arrow_utils.go | 87 +++++- table/arrow_utils_internal_test.go | 2 +- table/internal/interfaces.go | 1 + table/internal/parquet_files.go | 2 +- table/internal/parquet_files_test.go | 4 +- table/internal/utils.go | 4 +- table/partitioned_fanout_writer.go | 34 ++- table/partitioned_fanout_writer_test.go | 9 +- table/pos_delete_partitioned_fanout_writer.go | 116 ++++++++ table/rolling_data_writer.go | 54 ++-- table/scanner.go | 20 +- table/snapshot_producers.go | 129 +++++---- table/table_test.go | 5 +- table/transaction.go | 248 +++++++++++++++--- table/transaction_test.go | 114 ++++++++ table/writer.go | 133 +++++++--- 19 files changed, 935 insertions(+), 226 deletions(-) create mode 100644 table/pos_delete_partitioned_fanout_writer.go diff --git a/README.md b/README.md index 0d3f2dea0..0c445ab63 100644 --- a/README.md +++ b/README.md @@ -60,29 +60,29 @@ $ cd iceberg-go/cmd/iceberg && go build . ### Catalog Support -| Operation | REST | Hive | Glue | SQL | -|:----------------------------|:----:| :--: |:------:|:----:| -| Load Table | X | | X | X | -| List Tables | X | | X | X | -| Create Table | X | | X | X | -| Register Table | X | | X | | -| Update Current Snapshot | X | | X | X | -| Create New Snapshot | X | | X | X | -| Rename Table | X | | X | X | -| Drop Table | X | | X | X | -| Alter Table | X | | X | X | -| Check Table Exists | X | | X | X | -| Set Table Properties | X | | X | X | -| List Namespaces | X | | X | X | -| Create Namespace | X | | X | X | -| Check Namespace Exists | X | | X | X | -| Drop Namespace | X | | X | X | -| Update Namespace Properties | X | | X | X | -| Create View | X | | | X | -| Load View | | | | X | -| List View | X | | | X | -| Drop View | X | | | X | -| Check View Exists | X | | | X | +| Operation | REST | Hive | Glue | SQL | +|:----------------------------|:----:| :--: |:------:|:---:| +| Load Table | X | | X | X | +| List Tables | X | | X | X | +| Create Table | X | | X | X | +| Register Table | X | | X | | +| Update Current Snapshot | X | | X | X | +| Create New Snapshot | X | | X | X | +| Rename Table | X | | X | X | +| Drop Table | X | | X | X | +| Alter Table | X | | X | X | +| Check Table Exists | X | | X | X | +| Set Table Properties | X | | X | X | +| List Namespaces | X | | X | X | +| Create Namespace | X | | X | X | +| Check Namespace Exists | X | | X | X | +| Drop Namespace | X | | X | X | +| Update Namespace Properties | X | | X | X | +| Create View | X | | | X | +| Load View | | | | X | +| List View | X | | | X | +| Drop View | X | | | X | +| Check View Exists | X | | | X | ### Read/Write Data Support @@ -93,17 +93,17 @@ $ cd iceberg-go/cmd/iceberg && go build . As long as the FileSystem is supported and the Catalog supports altering the table, the following tracks the current write support: -| Operation | Supported | -|:-----------------------:|:---------:| -| Append Stream | X | -| Append Data Files | X | -| Rewrite Files | | -| Rewrite manifests | | -| Overwrite Files | X | -| Copy-On-Write Delete | X | -| Write Pos Delete | | -| Write Eq Delete | | -| Row Delta | | +| Operation | Supported | +|:---------------------|:---------:| +| Append Stream | X | +| Append Data Files | X | +| Rewrite Files | | +| Rewrite manifests | | +| Overwrite Files | X | +| Copy-On-Write Delete | X | +| Write Pos Delete | X | +| Write Eq Delete | | +| Row Delta | | ### CLI Usage diff --git a/manifest.go b/manifest.go index 48bae7a62..3084f8196 100644 --- a/manifest.go +++ b/manifest.go @@ -1136,7 +1136,7 @@ func (w *ManifestWriter) Close() error { return w.writer.Close() } -func (w *ManifestWriter) ToManifestFile(location string, length int64) (ManifestFile, error) { +func (w *ManifestWriter) ToManifestFile(location string, length int64, content ManifestContent) (ManifestFile, error) { if err := w.Close(); err != nil { return nil, err } @@ -1155,7 +1155,7 @@ func (w *ManifestWriter) ToManifestFile(location string, length int64) (Manifest Path: location, Len: length, SpecID: int32(w.spec.id), - Content: ManifestContentData, + Content: content, SeqNumber: -1, MinSeqNumber: w.minSeqNum, AddedSnapshotID: w.snapshotID, @@ -1500,7 +1500,7 @@ func WriteManifest( return nil, err } - return w.ToManifestFile(filename, cnt.Count) + return w.ToManifestFile(filename, cnt.Count, ManifestContentData) } // ManifestEntryStatus defines constants for the entry status of @@ -2314,5 +2314,5 @@ type ManifestEntry interface { var PositionalDeleteSchema = NewSchema(0, NestedField{ID: 2147483546, Type: PrimitiveTypes.String, Name: "file_path", Required: true}, - NestedField{ID: 2147483545, Type: PrimitiveTypes.Int32, Name: "pos", Required: true}, + NestedField{ID: 2147483545, Type: PrimitiveTypes.Int64, Name: "pos", Required: true}, ) diff --git a/table/arrow_scanner.go b/table/arrow_scanner.go index 9063f48b4..08dcf87d6 100644 --- a/table/arrow_scanner.go +++ b/table/arrow_scanner.go @@ -19,6 +19,7 @@ package table import ( "context" + "errors" "io" "iter" "strconv" @@ -42,6 +43,8 @@ const ( ScanOptionArrowUseLargeTypes = "arrow.use_large_types" ) +var PositionalDeleteArrowSchema, _ = SchemaToArrowSchema(iceberg.PositionalDeleteSchema, nil, true, false) + type ( positionDeletes = []*arrow.Chunked perFilePosDeletes = map[string]positionDeletes @@ -189,6 +192,54 @@ func processPositionalDeletes(ctx context.Context, deletes set[int64]) recProces } } +// enrichRecordsWithPosDeleteFields enriches a RecordBatch with the columns declared in the PositionalDeleteArrowSchema +// so that during the pipeline filtering stages that sheds filtered out records, we still have a way to +// preserve the original position of those records. +func enrichRecordsWithPosDeleteFields(ctx context.Context, filePath iceberg.DataFile) recProcessFn { + nextIdx, mem := int64(0), compute.GetAllocator(ctx) + + return func(inData arrow.RecordBatch) (outData arrow.RecordBatch, err error) { + defer inData.Release() + + schema := inData.Schema() + fieldIdx := schema.NumFields() + filePathField, ok := PositionalDeleteArrowSchema.FieldsByName("file_path") + if !ok { + return nil, errors.New("position delete schema should have required field 'file_path'") + } + posField, ok := PositionalDeleteArrowSchema.FieldsByName("pos") + if !ok { + return nil, errors.New("position delete schema should have required field 'pos'") + } + schema, err = schema.AddField(fieldIdx, filePathField[0]) + if err != nil { + return nil, err + } + schema, err = schema.AddField(fieldIdx+1, posField[0]) + if err != nil { + return nil, err + } + + filePathBuilder := array.NewStringBuilder(mem) + defer filePathBuilder.Release() + posBuilder := array.NewInt64Builder(mem) + defer posBuilder.Release() + + startPos := nextIdx + nextIdx += inData.NumRows() + + for i := startPos; i < nextIdx; i++ { + filePathBuilder.Append(filePath.FilePath()) + posBuilder.Append(i) + } + + columns := append(inData.Columns(), filePathBuilder.NewArray(), posBuilder.NewArray()) + outData = array.NewRecordBatch(schema, columns, inData.NumRows()) + + return outData, err + } +} + func filterRecords(ctx context.Context, recordFilter expr.Expression) recProcessFn { return func(rec arrow.RecordBatch) (arrow.RecordBatch, error) { defer rec.Release() @@ -454,6 +505,78 @@ func (as *arrowScan) recordsFromTask(ctx context.Context, task internal.Enumerat return err } +func (as *arrowScan) producePosDeletesFromTask(ctx context.Context, task internal.Enumerated[FileScanTask], positionalDeletes positionDeletes, out chan<- enumeratedRecord) (err error) { + defer func() { + if err != nil { + out <- enumeratedRecord{Task: task, Err: err} + } + }() + + var ( + rdr internal.FileReader + iceSchema *iceberg.Schema + colIndices []int + filterFunc recProcessFn + dropFile bool + ) + + iceSchema, colIndices, rdr, err = as.prepareToRead(ctx, task.Value.File) + if err != nil { + return err + } + defer iceinternal.CheckedClose(rdr, &err) + + fields := append(iceSchema.Fields(), iceberg.PositionalDeleteSchema.Fields()...) + enrichedIcebergSchema := iceberg.NewSchema(iceSchema.ID+1, fields...) + + pipeline := make([]recProcessFn, 0, 2) + pipeline = append(pipeline, enrichRecordsWithPosDeleteFields(ctx, task.Value.File)) + if len(positionalDeletes) > 0 { + deletes := set[int64]{} + for _, chunk := range positionalDeletes { + for _, a := range chunk.Chunks() { + for _, v := range a.(*array.Int64).Int64Values() { + deletes[v] = struct{}{} + } + } + } + + pipeline = append(pipeline, processPositionalDeletes(ctx, deletes)) + } + + filterFunc, dropFile, err = as.getRecordFilter(ctx, iceSchema) + if err != nil { + return err + } + + // Nothing to delete in a dropped file + if dropFile { + var emptySchema *arrow.Schema + emptySchema, err = SchemaToArrowSchema(as.projectedSchema, nil, false, as.useLargeTypes) + if err != nil { + return err + } + out <- enumeratedRecord{Task: task, Record: internal.Enumerated[arrow.RecordBatch]{ + Value: array.NewRecordBatch(emptySchema, nil, 0), Index: 0, Last: true, + }} + + return err + } + + if filterFunc != nil { + pipeline = append(pipeline, filterFunc) + } + pipeline = append(pipeline, func(r arrow.RecordBatch) (arrow.RecordBatch, error) { + defer r.Release() + + return ToRequestedSchema(ctx, iceberg.PositionalDeleteSchema, enrichedIcebergSchema, r, false, true, as.useLargeTypes) + }) + + err = as.processRecords(ctx, task, iceSchema, rdr, colIndices, pipeline, out) + + return err +} + func createIterator(ctx context.Context, numWorkers uint, records <-chan enumeratedRecord, deletesPerFile perFilePosDeletes, cancel context.CancelCauseFunc, rowLimit int64) iter.Seq2[arrow.RecordBatch, error] { isBeforeAny := func(batch enumeratedRecord) bool { return batch.Task.Index < 0 diff --git a/table/arrow_utils.go b/table/arrow_utils.go index 35d3b5385..5f56975bd 100644 --- a/table/arrow_utils.go +++ b/table/arrow_utils.go @@ -1258,7 +1258,7 @@ func filesToDataFiles(ctx context.Context, fileIO iceio.IO, meta *MetadataBuilde } } - df := statistics.ToDataFile(currentSchema, currentSpec, filePath, iceberg.ParquetFile, rdr.SourceFileSize(), partitionValues) + df := statistics.ToDataFile(currentSchema, currentSpec, filePath, iceberg.ParquetFile, iceberg.EntryContentData, rdr.SourceFileSize(), partitionValues) if !yield(df, nil) { return } @@ -1335,6 +1335,9 @@ func recordsToDataFiles(ctx context.Context, rootLocation string, meta *Metadata panic(fmt.Errorf("%w: cannot write files without a current spec", err)) } + cw := newConcurrentDataFileWriter(func(rootLocation string, fs iceio.WriteFileIO, meta *MetadataBuilder, props iceberg.Properties, opts ...dataFileWriterOption) (dataFileWriter, error) { + return newDataFileWriter(rootLocation, fs, meta, props, opts...) + }) nextCount, stopCount := iter.Pull(args.counter) if currentSpec.IsUnpartitioned() { tasks := func(yield func(WriteTask) bool) { @@ -1358,13 +1361,81 @@ func recordsToDataFiles(ctx context.Context, rootLocation string, meta *Metadata } } - return writeFiles(ctx, rootLocation, args.fs, meta, nil, tasks) - } else { - partitionWriter := newPartitionedFanoutWriter(*currentSpec, meta.CurrentSchema(), args.itr) - rollingDataWriters := NewWriterFactory(rootLocation, args, meta, taskSchema, targetFileSize) - partitionWriter.writers = &rollingDataWriters - workers := config.EnvConfig.MaxWorkers + return cw.writeFiles(ctx, rootLocation, args.fs, meta, meta.props, nil, tasks) + } + + partitionWriter := newPartitionedFanoutWriter(*currentSpec, cw, meta.CurrentSchema(), args.itr) + partitionWriter.writers = newWriterFactory(rootLocation, args, meta, taskSchema, targetFileSize) + workers := config.EnvConfig.MaxWorkers + + return partitionWriter.Write(ctx, workers) +} + +func positionDeleteRecordsToDataFiles(ctx context.Context, rootLocation string, meta *MetadataBuilder, partitionDataPerFile map[string]map[int]any, args recordWritingArgs) (ret iter.Seq2[iceberg.DataFile, error]) { + if args.counter == nil { + args.counter = internal.Counter(0) + } + + defer func() { + if r := recover(); r != nil { + var err error + switch e := r.(type) { + case string: + err = fmt.Errorf("error encountered during position delete file writing %s", e) + case error: + err = fmt.Errorf("error encountered during position delete file writing: %w", e) + } + ret = func(yield func(iceberg.DataFile, error) bool) { + yield(nil, err) + } + } + }() - return partitionWriter.Write(ctx, workers) + if args.writeUUID == nil { + u := uuid.Must(uuid.NewRandom()) + args.writeUUID = &u } + + targetFileSize := int64(meta.props.GetInt(WriteTargetFileSizeBytesKey, + WriteTargetFileSizeBytesDefault)) + + currentSpec, err := meta.CurrentSpec() + if err != nil || currentSpec == nil { + panic(fmt.Errorf("%w: cannot write files without a current spec", err)) + } + + cw := newConcurrentDataFileWriter(func(rootLocation string, fs iceio.WriteFileIO, meta *MetadataBuilder, props iceberg.Properties, opts ...dataFileWriterOption) (dataFileWriter, error) { + return newPositionDeleteWriter(rootLocation, fs, meta, props, opts...) + }, withSchemaSanitization(false)) + nextCount, stopCount := iter.Pull(args.counter) + if currentSpec.IsUnpartitioned() { + tasks := func(yield func(WriteTask) bool) { + defer stopCount() + + fileCount := 0 + for batch := range binPackRecords(args.itr, defaultBinPackLookback, targetFileSize) { + cnt, _ := nextCount() + fileCount++ + t := WriteTask{ + Uuid: *args.writeUUID, + ID: cnt, + PartitionID: iceberg.UnpartitionedSpec.ID(), + FileCount: fileCount, + Schema: iceberg.PositionalDeleteSchema, + Batches: batch, + } + if !yield(t) { + return + } + } + } + + return cw.writeFiles(ctx, rootLocation, args.fs, meta, meta.props, nil, tasks) + } + + partitionWriter := newPositionDeletePartitionedFanoutWriter(*currentSpec, cw, partitionDataPerFile, args.itr) + partitionWriter.writers = newWriterFactory(rootLocation, args, meta, iceberg.PositionalDeleteSchema, targetFileSize) + workers := config.EnvConfig.MaxWorkers + + return partitionWriter.Write(ctx, workers) } diff --git a/table/arrow_utils_internal_test.go b/table/arrow_utils_internal_test.go index d32298225..e463c748f 100644 --- a/table/arrow_utils_internal_test.go +++ b/table/arrow_utils_internal_test.go @@ -200,7 +200,7 @@ func (suite *FileStatsMetricsSuite) getDataFile(meta iceberg.Properties, writeSt stats := format.DataFileStatsFromMeta(fileMeta, collector, mapping) return stats.ToDataFile(tableMeta.CurrentSchema(), tableMeta.PartitionSpec(), "fake-path.parquet", - iceberg.ParquetFile, fileMeta.GetSourceFileSize(), nil) + iceberg.ParquetFile, iceberg.EntryContentData, fileMeta.GetSourceFileSize(), nil) } func (suite *FileStatsMetricsSuite) TestRecordCount() { diff --git a/table/internal/interfaces.go b/table/internal/interfaces.go index 0fdb8097b..48830154c 100644 --- a/table/internal/interfaces.go +++ b/table/internal/interfaces.go @@ -105,4 +105,5 @@ type WriteFileInfo struct { FileName string StatsCols map[int]StatisticsCollector WriteProps any + Content iceberg.ManifestEntryContent } diff --git a/table/internal/parquet_files.go b/table/internal/parquet_files.go index 8e7cf252b..2d0334c52 100644 --- a/table/internal/parquet_files.go +++ b/table/internal/parquet_files.go @@ -282,7 +282,7 @@ func (p parquetFormat) WriteDataFile(ctx context.Context, fs iceio.WriteFileIO, } return p.DataFileStatsFromMeta(filemeta, info.StatsCols, colMapping). - ToDataFile(info.FileSchema, info.Spec, info.FileName, iceberg.ParquetFile, cntWriter.Count, partitionValues), nil + ToDataFile(info.FileSchema, info.Spec, info.FileName, iceberg.ParquetFile, info.Content, cntWriter.Count, partitionValues), nil } type decAsIntAgg[T int32 | int64] struct { diff --git a/table/internal/parquet_files_test.go b/table/internal/parquet_files_test.go index 85480da08..42adc22cb 100644 --- a/table/internal/parquet_files_test.go +++ b/table/internal/parquet_files_test.go @@ -261,7 +261,7 @@ func TestMetricsPrimitiveTypes(t *testing.T) { stats := format.DataFileStatsFromMeta(internal.Metadata(meta), getCollector(), mapping) df := stats.ToDataFile(tblMeta.CurrentSchema(), tblMeta.PartitionSpec(), "fake-path.parquet", - iceberg.ParquetFile, meta.GetSourceFileSize(), nil) + iceberg.ParquetFile, iceberg.EntryContentData, meta.GetSourceFileSize(), nil) assert.Len(t, df.ValueCounts(), 15) assert.Len(t, df.NullValueCounts(), 15) @@ -463,7 +463,7 @@ func TestDecimalPhysicalTypes(t *testing.T) { require.NotNil(t, stats) df := stats.ToDataFile(tableMeta.CurrentSchema(), tableMeta.PartitionSpec(), "test.parquet", - iceberg.ParquetFile, meta.GetSourceFileSize(), nil) + iceberg.ParquetFile, iceberg.EntryContentData, meta.GetSourceFileSize(), nil) // Verify bounds are correctly extracted require.Contains(t, df.LowerBoundValues(), 1) diff --git a/table/internal/utils.go b/table/internal/utils.go index 6227d7463..4e9a5a055 100644 --- a/table/internal/utils.go +++ b/table/internal/utils.go @@ -234,7 +234,7 @@ func (d *DataFileStatistics) PartitionValue(field iceberg.PartitionField, sc *ic return lowerT.Val.Any() } -func (d *DataFileStatistics) ToDataFile(schema *iceberg.Schema, spec iceberg.PartitionSpec, path string, format iceberg.FileFormat, filesize int64, partitionValues map[int]any) iceberg.DataFile { +func (d *DataFileStatistics) ToDataFile(schema *iceberg.Schema, spec iceberg.PartitionSpec, path string, format iceberg.FileFormat, content iceberg.ManifestEntryContent, filesize int64, partitionValues map[int]any) iceberg.DataFile { var fieldIDToPartitionData map[int]any fieldIDToLogicalType := make(map[int]avro.LogicalType) fieldIDToFixedSize := make(map[int]int) @@ -276,7 +276,7 @@ func (d *DataFileStatistics) ToDataFile(schema *iceberg.Schema, spec iceberg.Par } } - bldr, err := iceberg.NewDataFileBuilder(spec, iceberg.EntryContentData, + bldr, err := iceberg.NewDataFileBuilder(spec, content, path, format, fieldIDToPartitionData, fieldIDToLogicalType, fieldIDToFixedSize, d.RecordCount, filesize) if err != nil { panic(err) diff --git a/table/partitioned_fanout_writer.go b/table/partitioned_fanout_writer.go index 838513f5d..2cef8ce9e 100644 --- a/table/partitioned_fanout_writer.go +++ b/table/partitioned_fanout_writer.go @@ -32,14 +32,15 @@ import ( "golang.org/x/sync/errgroup" ) -// PartitionedFanoutWriter distributes Arrow records across multiple partitions based on +// partitionedFanoutWriter distributes Arrow records across multiple partitions based on // a partition specification, writing data to separate files for each partition using // a fanout pattern with configurable parallelism. type partitionedFanoutWriter struct { - partitionSpec iceberg.PartitionSpec - schema *iceberg.Schema - itr iter.Seq2[arrow.RecordBatch, error] - writers *writerFactory + partitionSpec iceberg.PartitionSpec + schema *iceberg.Schema + itr iter.Seq2[arrow.RecordBatch, error] + writers *writerFactory + concurrentDataFileWriter *concurrentDataFileWriter } // PartitionInfo holds the row indices and partition values for a specific partition, @@ -52,11 +53,12 @@ type partitionInfo struct { // NewPartitionedFanoutWriter creates a new PartitionedFanoutWriter with the specified // partition specification, schema, and record iterator. -func newPartitionedFanoutWriter(partitionSpec iceberg.PartitionSpec, schema *iceberg.Schema, itr iter.Seq2[arrow.RecordBatch, error]) *partitionedFanoutWriter { +func newPartitionedFanoutWriter(partitionSpec iceberg.PartitionSpec, concurrentWriter *concurrentDataFileWriter, schema *iceberg.Schema, itr iter.Seq2[arrow.RecordBatch, error]) *partitionedFanoutWriter { return &partitionedFanoutWriter{ - partitionSpec: partitionSpec, - schema: schema, - itr: itr, + partitionSpec: partitionSpec, + schema: schema, + itr: itr, + concurrentDataFileWriter: concurrentWriter, } } @@ -72,7 +74,7 @@ func (p *partitionedFanoutWriter) Write(ctx context.Context, workers int) iter.S outputDataFilesCh := make(chan iceberg.DataFile, workers) fanoutWorkers, ctx := errgroup.WithContext(ctx) - p.startRecordFeeder(ctx, fanoutWorkers, inputRecordsCh) + startRecordFeeder(ctx, p.itr, fanoutWorkers, inputRecordsCh) for range workers { fanoutWorkers.Go(func() error { @@ -83,11 +85,11 @@ func (p *partitionedFanoutWriter) Write(ctx context.Context, workers int) iter.S return p.yieldDataFiles(fanoutWorkers, outputDataFilesCh) } -func (p *partitionedFanoutWriter) startRecordFeeder(ctx context.Context, fanoutWorkers *errgroup.Group, inputRecordsCh chan<- arrow.RecordBatch) { +func startRecordFeeder(ctx context.Context, itr iter.Seq2[arrow.RecordBatch, error], fanoutWorkers *errgroup.Group, inputRecordsCh chan<- arrow.RecordBatch) { fanoutWorkers.Go(func() error { defer close(inputRecordsCh) - for record, err := range p.itr { + for record, err := range itr { if err != nil { return err } @@ -136,7 +138,7 @@ func (p *partitionedFanoutWriter) fanout(ctx context.Context, inputRecordsCh <-c } partitionPath := p.partitionPath(val.partitionRec) - rollingDataWriter, err := p.writers.getOrCreateRollingDataWriter(ctx, partitionPath, val.partitionValues, dataFilesChannel) + rollingDataWriter, err := p.writers.getOrCreateRollingDataWriter(ctx, p.concurrentDataFileWriter, partitionPath, val.partitionValues, dataFilesChannel) if err != nil { return err } @@ -151,13 +153,17 @@ func (p *partitionedFanoutWriter) fanout(ctx context.Context, inputRecordsCh <-c } func (p *partitionedFanoutWriter) yieldDataFiles(fanoutWorkers *errgroup.Group, outputDataFilesCh chan iceberg.DataFile) iter.Seq2[iceberg.DataFile, error] { + return yieldDataFiles(p.writers, fanoutWorkers, outputDataFilesCh) +} + +func yieldDataFiles(writers *writerFactory, fanoutWorkers *errgroup.Group, outputDataFilesCh chan iceberg.DataFile) iter.Seq2[iceberg.DataFile, error] { // Use a channel to safely communicate the error from the goroutine // to avoid a data race between writing err in the goroutine and reading it in the iterator. errCh := make(chan error, 1) go func() { defer close(outputDataFilesCh) err := fanoutWorkers.Wait() - err = errors.Join(err, p.writers.closeAll()) + err = errors.Join(err, writers.closeAll()) errCh <- err close(errCh) }() diff --git a/table/partitioned_fanout_writer_test.go b/table/partitioned_fanout_writer_test.go index b7e0f6d8b..b0782bffb 100644 --- a/table/partitioned_fanout_writer_test.go +++ b/table/partitioned_fanout_writer_test.go @@ -134,10 +134,11 @@ func (s *FanoutWriterTestSuite) testTransformPartition(transform iceberg.Transfo taskSchema, err := ArrowSchemaToIceberg(args.sc, false, nameMapping) s.Require().NoError(err) - partitionWriter := newPartitionedFanoutWriter(spec, taskSchema, args.itr) - rollingDataWriters := NewWriterFactory(loc, args, metaBuilder, icebergSchema, 1024*1024) - - partitionWriter.writers = &rollingDataWriters + cw := newConcurrentDataFileWriter(func(rootLocation string, fs iceio.WriteFileIO, meta *MetadataBuilder, props iceberg.Properties, opts ...dataFileWriterOption) (dataFileWriter, error) { + return newDataFileWriter(rootLocation, fs, meta, props, opts...) + }) + partitionWriter := newPartitionedFanoutWriter(spec, cw, taskSchema, args.itr) + partitionWriter.writers = newWriterFactory(loc, args, metaBuilder, icebergSchema, 1024*1024) workers := config.EnvConfig.MaxWorkers dataFiles := partitionWriter.Write(s.ctx, workers) diff --git a/table/pos_delete_partitioned_fanout_writer.go b/table/pos_delete_partitioned_fanout_writer.go new file mode 100644 index 000000000..70f49dc99 --- /dev/null +++ b/table/pos_delete_partitioned_fanout_writer.go @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "context" + "iter" + "path" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/iceberg-go" + "golang.org/x/sync/errgroup" +) + +// positionDeletePartitionedFanoutWriter distributes Arrow position delete records across multiple partitions based on +// a partition specification, writing data to separate delete files for each partition using +// a fanout pattern with configurable parallelism. +type positionDeletePartitionedFanoutWriter struct { + partitionSpec iceberg.PartitionSpec + partitionDataByFilePath map[string]map[int]any + schema *iceberg.Schema + itr iter.Seq2[arrow.RecordBatch, error] + writers *writerFactory + concurrentDataFileWriter *concurrentDataFileWriter +} + +// newPositionDeletePartitionedFanoutWriter creates a new PartitionedFanoutWriter with the specified +// partition specification, schema, and record iterator. +func newPositionDeletePartitionedFanoutWriter(partitionSpec iceberg.PartitionSpec, concurrentWriter *concurrentDataFileWriter, partitionDataByFilePath map[string]map[int]any, itr iter.Seq2[arrow.RecordBatch, error]) *positionDeletePartitionedFanoutWriter { + return &positionDeletePartitionedFanoutWriter{ + partitionSpec: partitionSpec, + partitionDataByFilePath: partitionDataByFilePath, + schema: iceberg.PositionalDeleteSchema, + itr: itr, + concurrentDataFileWriter: concurrentWriter, + } +} + +// Write writes the Arrow records to the specified location using a fanout pattern with +// the specified number of workers. The returned iterator yields the data files written +// by the fanout process. +func (p *positionDeletePartitionedFanoutWriter) Write(ctx context.Context, workers int) iter.Seq2[iceberg.DataFile, error] { + inputRecordsCh := make(chan arrow.RecordBatch, workers) + outputDataFilesCh := make(chan iceberg.DataFile, workers) + + fanoutWorkers, ctx := errgroup.WithContext(ctx) + startRecordFeeder(ctx, p.itr, fanoutWorkers, inputRecordsCh) + + for range workers { + fanoutWorkers.Go(func() error { + return p.fanout(ctx, inputRecordsCh, outputDataFilesCh) + }) + } + + return p.yieldDataFiles(fanoutWorkers, outputDataFilesCh) +} + +func (p *positionDeletePartitionedFanoutWriter) fanout(ctx context.Context, inputRecordsCh <-chan arrow.RecordBatch, dataFilesChannel chan<- iceberg.DataFile) error { + for { + select { + case <-ctx.Done(): + return context.Cause(ctx) + + case record, ok := <-inputRecordsCh: + if !ok { + return nil + } + defer record.Release() + + select { + case <-ctx.Done(): + return context.Cause(ctx) + default: + } + + if record.NumRows() == 0 { + continue + } + + columns := record.Columns() + filePaths := columns[0].(*array.String) + partitionPath := path.Dir(filePaths.Value(0)) + + partitionValues := p.partitionDataByFilePath[partitionPath] + rollingDataWriter, err := p.writers.getOrCreateRollingDataWriter(ctx, p.concurrentDataFileWriter, partitionPath, partitionValues, dataFilesChannel) + if err != nil { + return err + } + + err = rollingDataWriter.Add(record) + if err != nil { + return err + } + } + } +} + +func (p *positionDeletePartitionedFanoutWriter) yieldDataFiles(fanoutWorkers *errgroup.Group, outputDataFilesCh chan iceberg.DataFile) iter.Seq2[iceberg.DataFile, error] { + return yieldDataFiles(p.writers, fanoutWorkers, outputDataFilesCh) +} diff --git a/table/rolling_data_writer.go b/table/rolling_data_writer.go index 020effddd..337315c92 100644 --- a/table/rolling_data_writer.go +++ b/table/rolling_data_writer.go @@ -45,12 +45,12 @@ type writerFactory struct { mu sync.Mutex } -// NewWriterFactory creates a new WriterFactory with the specified configuration +// newWriterFactory creates a new WriterFactory with the specified configuration // for managing rolling data writers across partitions. -func NewWriterFactory(rootLocation string, args recordWritingArgs, meta *MetadataBuilder, taskSchema *iceberg.Schema, targetFileSize int64) writerFactory { +func newWriterFactory(rootLocation string, args recordWritingArgs, meta *MetadataBuilder, taskSchema *iceberg.Schema, targetFileSize int64) *writerFactory { nextCount, stopCount := iter.Pull(args.counter) - return writerFactory{ + return &writerFactory{ rootLocation: rootLocation, args: args, meta: meta, @@ -65,32 +65,34 @@ func NewWriterFactory(rootLocation string, args recordWritingArgs, meta *Metadat // them to data files when the target file size is reached, implementing a rolling // file strategy to manage file sizes. type RollingDataWriter struct { - partitionKey string - partitionID int // unique ID for this partition - fileCount atomic.Int64 // counter for files in this partition - recordCh chan arrow.RecordBatch - errorCh chan error - factory *writerFactory - partitionValues map[int]any - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup + partitionKey string + partitionID int // unique ID for this partition + fileCount atomic.Int64 // counter for files in this partition + recordCh chan arrow.RecordBatch + errorCh chan error + factory *writerFactory + partitionValues map[int]any + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + concurrentWriter *concurrentDataFileWriter } // NewRollingDataWriter creates a new RollingDataWriter for the specified partition // with the given partition values. -func (w *writerFactory) NewRollingDataWriter(ctx context.Context, partition string, partitionValues map[int]any, outputDataFilesCh chan<- iceberg.DataFile) *RollingDataWriter { +func (w *writerFactory) NewRollingDataWriter(ctx context.Context, concurrentWriter *concurrentDataFileWriter, partition string, partitionValues map[int]any, outputDataFilesCh chan<- iceberg.DataFile) *RollingDataWriter { ctx, cancel := context.WithCancel(ctx) partitionID := int(w.partitionIDCounter.Add(1) - 1) writer := &RollingDataWriter{ - partitionKey: partition, - partitionID: partitionID, - recordCh: make(chan arrow.RecordBatch, 64), - errorCh: make(chan error, 1), - factory: w, - partitionValues: partitionValues, - ctx: ctx, - cancel: cancel, + partitionKey: partition, + partitionID: partitionID, + recordCh: make(chan arrow.RecordBatch, 64), + errorCh: make(chan error, 1), + factory: w, + partitionValues: partitionValues, + ctx: ctx, + concurrentWriter: concurrentWriter, + cancel: cancel, } writer.wg.Add(1) @@ -99,7 +101,7 @@ func (w *writerFactory) NewRollingDataWriter(ctx context.Context, partition stri return writer } -func (w *writerFactory) getOrCreateRollingDataWriter(ctx context.Context, partition string, partitionValues map[int]any, outputDataFilesCh chan<- iceberg.DataFile) (*RollingDataWriter, error) { +func (w *writerFactory) getOrCreateRollingDataWriter(ctx context.Context, concurrentWriter *concurrentDataFileWriter, partition string, partitionValues map[int]any, outputDataFilesCh chan<- iceberg.DataFile) (*RollingDataWriter, error) { w.mu.Lock() defer w.mu.Unlock() @@ -111,7 +113,7 @@ func (w *writerFactory) getOrCreateRollingDataWriter(ctx context.Context, partit return nil, fmt.Errorf("invalid writer type for partition: %s", partition) } - writer := w.NewRollingDataWriter(ctx, partition, partitionValues, outputDataFilesCh) + writer := w.NewRollingDataWriter(ctx, concurrentWriter, partition, partitionValues, outputDataFilesCh) w.writers.Store(partition, writer) return writer, nil @@ -165,7 +167,7 @@ func (r *RollingDataWriter) flushToDataFile(batch []arrow.RecordBatch, outputDat return nil } - task := iter.Seq[WriteTask](func(yield func(WriteTask) bool) { + tasks := iter.Seq[WriteTask](func(yield func(WriteTask) bool) { cnt, _ := r.factory.nextCount() fileCount := int(r.fileCount.Add(1)) @@ -190,7 +192,7 @@ func (r *RollingDataWriter) flushToDataFile(batch []arrow.RecordBatch, outputDat } partitionMeta.props[WriteDataPathKey] = parseDataLoc.JoinPath("data").JoinPath(r.partitionKey).String() - outputDataFiles := writeFiles(r.ctx, r.factory.rootLocation, r.factory.args.fs, &partitionMeta, r.partitionValues, task) + outputDataFiles := r.concurrentWriter.writeFiles(r.ctx, r.factory.rootLocation, r.factory.args.fs, &partitionMeta, partitionMeta.props, r.partitionValues, tasks) for dataFile, err := range outputDataFiles { if err != nil { return err diff --git a/table/scanner.go b/table/scanner.go index 1bc9d005f..bc3c09027 100644 --- a/table/scanner.go +++ b/table/scanner.go @@ -242,23 +242,31 @@ func (scan *Scan) Projection() (*iceberg.Schema, error) { } func (scan *Scan) buildPartitionProjection(specID int) (iceberg.BooleanExpression, error) { - spec := scan.metadata.PartitionSpecByID(specID) + return buildPartitionProjection(specID, scan.metadata, scan.rowFilter, scan.caseSensitive) +} + +func buildPartitionProjection(specID int, meta Metadata, rowFilter iceberg.BooleanExpression, caseSensitive bool) (iceberg.BooleanExpression, error) { + spec := meta.PartitionSpecByID(specID) if spec == nil { return nil, fmt.Errorf("%w: id %d", ErrPartitionSpecNotFound, specID) } - project := newInclusiveProjection(scan.metadata.CurrentSchema(), *spec, true) + project := newInclusiveProjection(meta.CurrentSchema(), *spec, caseSensitive) - return project(scan.rowFilter) + return project(rowFilter) } func (scan *Scan) buildManifestEvaluator(specID int) (func(iceberg.ManifestFile) (bool, error), error) { - spec := scan.metadata.PartitionSpecByID(specID) + return buildManifestEvaluator(specID, scan.metadata, scan.partitionFilters, scan.caseSensitive) +} + +func buildManifestEvaluator(specID int, metadata Metadata, partitionFilters *keyDefaultMap[int, iceberg.BooleanExpression], caseSensitive bool) (func(iceberg.ManifestFile) (bool, error), error) { + spec := metadata.PartitionSpecByID(specID) if spec == nil { return nil, fmt.Errorf("%w: id %d", ErrPartitionSpecNotFound, specID) } - return newManifestEvaluator(*spec, scan.metadata.CurrentSchema(), - scan.partitionFilters.Get(specID), scan.caseSensitive) + return newManifestEvaluator(*spec, metadata.CurrentSchema(), + partitionFilters.Get(specID), caseSensitive) } func (scan *Scan) buildPartitionEvaluator(specID int) (func(iceberg.DataFile) (bool, error), error) { diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go index 8022f5a48..333fe6175 100644 --- a/table/snapshot_producers.go +++ b/table/snapshot_producers.go @@ -182,7 +182,7 @@ func (of *overwriteFiles) existingManifests() ([]iceberg.ManifestFile, error) { return nil, err } - return wr.ToManifestFile(path, counter.Count) + return wr.ToManifestFile(path, counter.Count, m.ManifestContent()) } mf, err := rewriteManifest(m, notDeleted) @@ -305,7 +305,7 @@ func (m *manifestMergeManager) createManifest(specID int, bin []iceberg.Manifest return nil, err } - return wr.ToManifestFile(path, counter.Count) + return wr.ToManifestFile(path, counter.Count, iceberg.ManifestContentData) } func (m *manifestMergeManager) mergeGroup(firstManifest iceberg.ManifestFile, specID int, manifests []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) { @@ -427,16 +427,17 @@ func (m *mergeAppendFiles) processManifests(manifests []iceberg.ManifestFile) ([ type snapshotProducer struct { producerImpl - commitUuid uuid.UUID - io iceio.WriteFileIO - txn *Transaction - op Operation - snapshotID int64 - parentSnapshotID int64 - addedFiles []iceberg.DataFile - manifestCount atomic.Int32 - deletedFiles map[string]iceberg.DataFile - snapshotProps iceberg.Properties + commitUuid uuid.UUID + io iceio.WriteFileIO + txn *Transaction + op Operation + snapshotID int64 + parentSnapshotID int64 + addedFiles []iceberg.DataFile + positionDeleteFiles []iceberg.DataFile + manifestCount atomic.Int32 + deletedFiles map[string]iceberg.DataFile + snapshotProps iceberg.Properties } func createSnapshotProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) *snapshotProducer { @@ -482,6 +483,12 @@ func (sp *snapshotProducer) appendDataFile(df iceberg.DataFile) *snapshotProduce return sp } +func (sp *snapshotProducer) appendPositionDeleteFile(df iceberg.DataFile) *snapshotProducer { + sp.positionDeleteFiles = append(sp.positionDeleteFiles, df) + + return sp +} + func (sp *snapshotProducer) deleteDataFile(df iceberg.DataFile) *snapshotProducer { sp.deletedFiles[df.FilePath()] = df @@ -531,49 +538,17 @@ func (sp *snapshotProducer) manifests() (_ []iceberg.ManifestFile, err error) { var g errgroup.Group - results := [...][]iceberg.ManifestFile{nil, nil, nil} + addedManifests := make([]iceberg.ManifestFile, 0) + positionDeleteManifests := make([]iceberg.ManifestFile, 0) + var deletedFilesManifests []iceberg.ManifestFile + var existingManifests []iceberg.ManifestFile if len(sp.addedFiles) > 0 { - g.Go(func() (err error) { - out, path, err := sp.newManifestOutput() - if err != nil { - return err - } - defer internal.CheckedClose(out, &err) - - counter := &internal.CountingWriter{W: out} - currentSpec, err := sp.txn.meta.CurrentSpec() - if err != nil || currentSpec == nil { - return fmt.Errorf("could not get current partition spec: %w", err) - } - wr, err := iceberg.NewManifestWriter(sp.txn.meta.formatVersion, counter, - *currentSpec, sp.txn.meta.CurrentSchema(), - sp.snapshotID) - if err != nil { - return err - } - defer internal.CheckedClose(wr, &err) - - for _, df := range sp.addedFiles { - err := wr.Add(iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &sp.snapshotID, - nil, nil, df)) - if err != nil { - return err - } - } - - // close the writer to force a flush and ensure counter.Count is accurate - if err := wr.Close(); err != nil { - return err - } - - mf, err := wr.ToManifestFile(path, counter.Count) - if err == nil { - results[0] = append(results[0], mf) - } + g.Go(sp.manifestProducer(iceberg.ManifestContentData, sp.addedFiles, &addedManifests)) + } - return err - }) + if len(sp.positionDeleteFiles) > 0 { + g.Go(sp.manifestProducer(iceberg.ManifestContentDeletes, sp.positionDeleteFiles, &positionDeleteManifests)) } if len(deleted) > 0 { @@ -607,7 +582,7 @@ func (sp *snapshotProducer) manifests() (_ []iceberg.ManifestFile, err error) { if err != nil { return err } - results[1] = append(results[1], mf) + deletedFilesManifests = append(deletedFilesManifests, mf) } return nil @@ -619,7 +594,7 @@ func (sp *snapshotProducer) manifests() (_ []iceberg.ManifestFile, err error) { if err != nil { return err } - results[2] = m + existingManifests = m return nil }) @@ -628,11 +603,55 @@ func (sp *snapshotProducer) manifests() (_ []iceberg.ManifestFile, err error) { return nil, err } - manifests := slices.Concat(results[0], results[1], results[2]) + manifests := slices.Concat(addedManifests, positionDeleteManifests, deletedFilesManifests, existingManifests) return sp.processManifests(manifests) } +func (sp *snapshotProducer) manifestProducer(content iceberg.ManifestContent, files []iceberg.DataFile, output *[]iceberg.ManifestFile) func() (err error) { + return func() (err error) { + out, path, err := sp.newManifestOutput() + if err != nil { + return err + } + defer internal.CheckedClose(out, &err) + + counter := &internal.CountingWriter{W: out} + currentSpec, err := sp.txn.meta.CurrentSpec() + if err != nil || currentSpec == nil { + return fmt.Errorf("could not get current partition spec: %w", err) + } + wr, err := iceberg.NewManifestWriter(sp.txn.meta.formatVersion, counter, + *currentSpec, sp.txn.meta.CurrentSchema(), + sp.snapshotID) + if err != nil { + return err + } + defer internal.CheckedClose(wr, &err) + + for _, df := range files { + err := wr.Add(iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &sp.snapshotID, + nil, nil, df)) + if err != nil { + return err + } + } + + // close the writer to force a flush and ensure counter.Count is accurate + if err := wr.Close(); err != nil { + return err + } + + mf, err := wr.ToManifestFile(path, counter.Count, content) + if err != nil { + return err + } + *output = []iceberg.ManifestFile{mf} + + return nil + } +} + func (sp *snapshotProducer) summary(props iceberg.Properties) (Summary, error) { var ssc SnapshotSummaryCollector partitionSummaryLimit := sp.txn.meta.props. diff --git a/table/table_test.go b/table/table_test.go index ae1bbddee..228b58d3f 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -22,7 +22,6 @@ import ( "compress/gzip" "context" "encoding/json" - "errors" "fmt" "io" "io/fs" @@ -1682,7 +1681,7 @@ func (t *TableWritingTestSuite) TestDelete() { expectedErr: nil, }, { - name: "abort on merge-on-read", + name: "success with merge-on-read", table: t.createTableWithProps( table.Identifier{"default", "overwrite_record_wrapper_v" + strconv.Itoa(t.formatVersion)}, map[string]string{ @@ -1691,7 +1690,7 @@ func (t *TableWritingTestSuite) TestDelete() { }, t.tableSchema, ), - expectedErr: errors.New("only 'copy-on-write' is currently supported"), + expectedErr: nil, }, } diff --git a/table/transaction.go b/table/transaction.go index 06b016312..4c7438a5d 100644 --- a/table/transaction.go +++ b/table/transaction.go @@ -23,6 +23,8 @@ import ( "encoding/json" "errors" "fmt" + "iter" + "path" "runtime" "slices" "sync" @@ -30,8 +32,11 @@ import ( "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/compute/exprs" "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/io" + "github.com/apache/iceberg-go/table/internal" + "github.com/apache/iceberg-go/table/substrait" "github.com/google/uuid" "golang.org/x/sync/errgroup" ) @@ -649,7 +654,7 @@ func (t *Transaction) performCopyOnWriteDeletion(ctx context.Context, operation commitUUID := uuid.New() updater := t.updateSnapshot(fs, snapshotProps, operation).mergeOverwrite(&commitUUID) - filesToDelete, filesToRewrite, err := t.classifyFilesForOverwrite(ctx, fs, filter, caseSensitive, concurrency) + filesToDelete, filesToRewrite, err := t.classifyFilesForDeletions(ctx, fs, filter, caseSensitive, concurrency) if err != nil { return nil, err } @@ -667,6 +672,45 @@ func (t *Transaction) performCopyOnWriteDeletion(ctx context.Context, operation return updater, nil } +func (t *Transaction) performMergeOnReadDeletion(ctx context.Context, snapshotProps iceberg.Properties, filter iceberg.BooleanExpression, caseSensitive bool, concurrency int) (*snapshotProducer, error) { + fs, err := t.tbl.fsF(ctx) + if err != nil { + return nil, err + } + + if t.meta.NameMapping() == nil { + nameMapping := t.meta.CurrentSchema().NameMapping() + mappingJson, err := json.Marshal(nameMapping) + if err != nil { + return nil, err + } + err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey: string(mappingJson)}) + if err != nil { + return nil, err + } + } + + commitUUID := uuid.New() + updater := t.updateSnapshot(fs, snapshotProps, OpDelete).mergeOverwrite(&commitUUID) + + filesToDelete, withPartialDeletions, err := t.classifyFilesForDeletions(ctx, fs, filter, caseSensitive, concurrency) + if err != nil { + return nil, err + } + + for _, df := range filesToDelete { + updater.deleteDataFile(df) + } + + if len(withPartialDeletions) > 0 { + if err := t.writePositionDeletesForFiles(ctx, fs, updater, withPartialDeletions, filter, caseSensitive, concurrency); err != nil { + return nil, err + } + } + + return updater, nil +} + type DeleteOption func(deleteOp *deleteOperation) type deleteOperation struct { @@ -710,7 +754,7 @@ func WithDeleteCaseInsensitive() DeleteOption { // // The concurrency parameter controls the level of parallelism for manifest processing and file rewriting and // can be overridden using the WithOverwriteConcurrency option. Defaults to runtime.GOMAXPROCS(0). -func (t *Transaction) Delete(ctx context.Context, filter iceberg.BooleanExpression, snapshotProps iceberg.Properties, opts ...DeleteOption) error { +func (t *Transaction) Delete(ctx context.Context, filter iceberg.BooleanExpression, snapshotProps iceberg.Properties, opts ...DeleteOption) (err error) { deleteOp := deleteOperation{ concurrency: runtime.GOMAXPROCS(0), caseSensitive: true, @@ -719,14 +763,23 @@ func (t *Transaction) Delete(ctx context.Context, filter iceberg.BooleanExpressi apply(&deleteOp) } + var updater *snapshotProducer writeDeleteMode := t.meta.props.Get(WriteDeleteModeKey, WriteDeleteModeDefault) - if writeDeleteMode != WriteModeCopyOnWrite { - return fmt.Errorf("'%s' is set to '%s' but only '%s' is currently supported", WriteDeleteModeKey, writeDeleteMode, WriteModeCopyOnWrite) - } - updater, err := t.performCopyOnWriteDeletion(ctx, OpDelete, snapshotProps, filter, deleteOp.caseSensitive, deleteOp.concurrency) - if err != nil { - return err + switch writeDeleteMode { + case WriteModeCopyOnWrite: + updater, err = t.performCopyOnWriteDeletion(ctx, OpDelete, snapshotProps, filter, deleteOp.caseSensitive, deleteOp.concurrency) + if err != nil { + return err + } + case WriteModeMergeOnRead: + updater, err = t.performMergeOnReadDeletion(ctx, snapshotProps, filter, deleteOp.caseSensitive, deleteOp.concurrency) + if err != nil { + return err + } + default: + return fmt.Errorf("unsupported write mode: '%s'", writeDeleteMode) } + updates, reqs, err := updater.commit() if err != nil { return err @@ -735,9 +788,9 @@ func (t *Transaction) Delete(ctx context.Context, filter iceberg.BooleanExpressi return t.apply(updates, reqs) } -// classifyFilesForOverwrite classifies existing data files based on the provided filter. +// classifyFilesForDeletions classifies existing data files based on the provided filter. // Returns files to delete completely, files to rewrite partially, and any error. -func (t *Transaction) classifyFilesForOverwrite(ctx context.Context, fs io.IO, filter iceberg.BooleanExpression, caseSensitive bool, concurrency int) (filesToDelete, filesToRewrite []iceberg.DataFile, err error) { +func (t *Transaction) classifyFilesForDeletions(ctx context.Context, fs io.IO, filter iceberg.BooleanExpression, caseSensitive bool, concurrency int) (filesToDelete, filesWithPartialDeletions []iceberg.DataFile, err error) { s := t.meta.currentSnapshot() if s == nil { return nil, nil, nil @@ -753,16 +806,46 @@ func (t *Transaction) classifyFilesForOverwrite(ctx context.Context, fs io.IO, f } } - return filesToDelete, filesToRewrite, nil + return filesToDelete, filesWithPartialDeletions, nil } - return t.classifyFilesForFilteredOverwrite(ctx, fs, filter, caseSensitive, concurrency) + return t.classifyFilesForFilteredDeletions(ctx, fs, filter, caseSensitive, concurrency) } -// classifyFilesForFilteredOverwrite classifies files for filtered overwrite operations. +type fileClassificationTask struct { + meta Metadata + partitionFilters *keyDefaultMap[int, iceberg.BooleanExpression] + caseSensitive bool + rowFilter iceberg.BooleanExpression +} + +func newFileClassificationTask(meta Metadata, rowFilter iceberg.BooleanExpression, caseSensitive bool) *fileClassificationTask { + classificationTask := &fileClassificationTask{ + meta: meta, + caseSensitive: caseSensitive, + rowFilter: rowFilter, + } + classificationTask.partitionFilters = newKeyDefaultMapWrapErr(classificationTask.buildPartitionProjection) + + return classificationTask +} + +func (t *fileClassificationTask) buildManifestEvaluator(specID int) (func(iceberg.ManifestFile) (bool, error), error) { + return buildManifestEvaluator(specID, t.meta, t.partitionFilters, t.caseSensitive) +} + +func (t *fileClassificationTask) buildPartitionProjection(specID int) (iceberg.BooleanExpression, error) { + return buildPartitionProjection(specID, t.meta, t.rowFilter, t.caseSensitive) +} + +// classifyFilesForFilteredDeletions classifies files for filtered overwrite operations. // Returns files to delete completely, files to rewrite partially, and any error. -func (t *Transaction) classifyFilesForFilteredOverwrite(ctx context.Context, fs io.IO, filter iceberg.BooleanExpression, caseSensitive bool, concurrency int) (filesToDelete, filesToRewrite []iceberg.DataFile, err error) { +func (t *Transaction) classifyFilesForFilteredDeletions(ctx context.Context, fs io.IO, filter iceberg.BooleanExpression, caseSensitive bool, concurrency int) (filesToDelete, filesWithPartialDeletes []iceberg.DataFile, err error) { schema := t.meta.CurrentSchema() + meta, err := t.meta.Build() + if err != nil { + return nil, nil, err + } inclusiveEvaluator, err := newInclusiveMetricsEvaluator(schema, filter, caseSensitive, false) if err != nil { @@ -774,18 +857,8 @@ func (t *Transaction) classifyFilesForFilteredOverwrite(ctx context.Context, fs return nil, nil, fmt.Errorf("failed to create strict metrics evaluator: %w", err) } - var manifestEval func(iceberg.ManifestFile) (bool, error) - meta, err := t.meta.Build() - if err != nil { - return nil, nil, fmt.Errorf("failed to build metadata: %w", err) - } - spec := meta.PartitionSpec() - if !spec.IsUnpartitioned() { - manifestEval, err = newManifestEvaluator(spec, schema, filter, caseSensitive) - if err != nil { - return nil, nil, fmt.Errorf("failed to create manifest evaluator: %w", err) - } - } + classificationTask := newFileClassificationTask(meta, filter, caseSensitive) + manifestEvaluators := newKeyDefaultMapWrapErr(classificationTask.buildManifestEvaluator) s := t.meta.currentSnapshot() var manifests []iceberg.ManifestFile @@ -796,11 +869,7 @@ func (t *Transaction) classifyFilesForFilteredOverwrite(ctx context.Context, fs } } - var ( - mu sync.Mutex - allFilesToDel []iceberg.DataFile - allFilesToRewr []iceberg.DataFile - ) + var mu sync.Mutex g, _ := errgroup.WithContext(ctx) g.SetLimit(min(concurrency, len(manifests))) @@ -808,6 +877,7 @@ func (t *Transaction) classifyFilesForFilteredOverwrite(ctx context.Context, fs for _, manifest := range manifests { manifest := manifest // capture loop variable g.Go(func() error { + manifestEval := manifestEvaluators.Get(int(manifest.PartitionSpecID())) if manifestEval != nil { match, err := manifestEval(manifest) if err != nil { @@ -859,8 +929,8 @@ func (t *Transaction) classifyFilesForFilteredOverwrite(ctx context.Context, fs if len(localDelete) > 0 || len(localRewrite) > 0 { mu.Lock() - allFilesToDel = append(allFilesToDel, localDelete...) - allFilesToRewr = append(allFilesToRewr, localRewrite...) + filesToDelete = append(filesToDelete, localDelete...) + filesWithPartialDeletes = append(filesWithPartialDeletes, localRewrite...) mu.Unlock() } @@ -872,7 +942,7 @@ func (t *Transaction) classifyFilesForFilteredOverwrite(ctx context.Context, fs return nil, nil, err } - return allFilesToDel, allFilesToRewr, nil + return filesToDelete, filesWithPartialDeletes, nil } // rewriteFilesWithFilter rewrites data files by preserving only rows that do NOT match the filter @@ -964,6 +1034,116 @@ func (t *Transaction) rewriteSingleFile(ctx context.Context, fs io.IO, originalF return result, nil } +// writePositionDeletesForFiles rewrites data files by preserving only rows that do NOT match the filter +func (t *Transaction) writePositionDeletesForFiles(ctx context.Context, fs io.IO, updater *snapshotProducer, files []iceberg.DataFile, filter iceberg.BooleanExpression, caseSensitive bool, concurrency int) error { + posDeleteRecIter, err := t.makePositionDeleteRecordsForFilter(ctx, fs, files, filter, caseSensitive, concurrency) + if err != nil { + return err + } + + partitionDataPerFile := make(map[string]map[int]any, len(files)) + for _, df := range files { + partitionDataPerFile[path.Dir(df.FilePath())] = df.Partition() + } + + posDeleteFiles := positionDeleteRecordsToDataFiles(ctx, t.tbl.Location(), t.meta, partitionDataPerFile, recordWritingArgs{ + sc: PositionalDeleteArrowSchema, + itr: posDeleteRecIter, + fs: fs.(io.WriteFileIO), + }) + + for f, err := range posDeleteFiles { + if err != nil { + return err + } + updater.appendPositionDeleteFile(f) + } + + return nil +} + +func (t *Transaction) makePositionDeleteRecordsForFilter(ctx context.Context, fs io.IO, files []iceberg.DataFile, filter iceberg.BooleanExpression, caseSensitive bool, concurrency int) (seq2 iter.Seq2[arrow.RecordBatch, error], err error) { + tasks := make([]FileScanTask, 0, len(files)) + for _, f := range files { + tasks = append(tasks, FileScanTask{ + File: f, + Start: 0, + Length: f.FileSizeBytes(), + }) + } + + boundFilter, err := iceberg.BindExpr(t.meta.CurrentSchema(), filter, caseSensitive) + if err != nil { + return nil, fmt.Errorf("failed to bind filter: %w", err) + } + + meta, err := t.meta.Build() + if err != nil { + return nil, fmt.Errorf("failed to build metadata: %w", err) + } + + scanner := &arrowScan{ + metadata: meta, + fs: fs, + projectedSchema: t.meta.CurrentSchema(), + boundRowFilter: boundFilter, + caseSensitive: caseSensitive, + rowLimit: -1, // No limit + concurrency: concurrency, + } + + deletesPerFile, err := readAllDeleteFiles(ctx, fs, tasks, concurrency) + if err != nil { + return nil, err + } + + extSet := substrait.NewExtensionSet() + + ctx, cancel := context.WithCancelCause(exprs.WithExtensionIDSet(ctx, extSet)) + taskChan := make(chan internal.Enumerated[FileScanTask], len(tasks)) + + numWorkers := min(concurrency, len(tasks)) + records := make(chan enumeratedRecord, numWorkers) + + var wg sync.WaitGroup + wg.Add(numWorkers) + for i := 0; i < numWorkers; i++ { + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case task, ok := <-taskChan: + if !ok { + return + } + + if err := scanner.producePosDeletesFromTask(ctx, task, deletesPerFile[task.Value.File.FilePath()], records); err != nil { + cancel(err) + + return + } + } + } + }() + } + + go func() { + for i, t := range tasks { + taskChan <- internal.Enumerated[FileScanTask]{ + Value: t, Index: i, Last: i == len(tasks)-1, + } + } + close(taskChan) + + wg.Wait() + close(records) + }() + + return createIterator(ctx, uint(numWorkers), records, deletesPerFile, cancel, scanner.rowLimit), nil +} + func (t *Transaction) Scan(opts ...ScanOption) (*Scan, error) { updatedMeta, err := t.meta.Build() if err != nil { diff --git a/table/transaction_test.go b/table/transaction_test.go index 1f64adf1c..4bf076760 100644 --- a/table/transaction_test.go +++ b/table/transaction_test.go @@ -535,6 +535,120 @@ func (s *SparkIntegrationTestSuite) TestDeleteInsensitive() { +----------+---------+---+`) } +func (s *SparkIntegrationTestSuite) TestDeleteMergeOnReadUnpartitioned() { + icebergSchema := iceberg.NewSchema(0, + iceberg.NestedField{ID: 1, Name: "first_name", Type: iceberg.PrimitiveTypes.String}, + iceberg.NestedField{ID: 2, Name: "last_name", Type: iceberg.PrimitiveTypes.String}, + iceberg.NestedField{ID: 3, Name: "age", Type: iceberg.PrimitiveTypes.Int32}, + ) + + tbl, err := s.cat.CreateTable(s.ctx, catalog.ToIdentifier("default", "go_test_merge_on_read_delete"), icebergSchema, + catalog.WithProperties( + map[string]string{ + table.WriteDeleteModeKey: table.WriteModeMergeOnRead, + }, + ), + ) + s.Require().NoError(err) + + arrowSchema, err := table.SchemaToArrowSchema(icebergSchema, nil, true, false) + s.Require().NoError(err) + + initialTable, err := array.TableFromJSON(memory.DefaultAllocator, arrowSchema, []string{ + `[ + {"first_name": "alan", "last_name": "gopher", "age": 7}, + {"first_name": "steve", "last_name": "gopher", "age": 5}, + {"first_name": "dead", "last_name": "gopher", "age": 97} + ]`, + }) + s.Require().NoError(err) + defer initialTable.Release() + + tx := tbl.NewTransaction() + err = tx.AppendTable(s.ctx, initialTable, 3, nil) + s.Require().NoError(err) + tbl, err = tx.Commit(s.ctx) + s.Require().NoError(err) + + // Delete the dead gopher and confirm that alan and steve are still present + filter := iceberg.EqualTo(iceberg.Reference("first_name"), "dead") + tx = tbl.NewTransaction() + err = tx.Delete(s.ctx, filter, nil) + s.Require().NoError(err) + _, err = tx.Commit(s.ctx) + s.Require().NoError(err) + + output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--sql", "SELECT * FROM default.go_test_merge_on_read_delete ORDER BY age") + s.Require().NoError(err) + s.Require().Contains(output, `|first_name|last_name|age| ++----------+---------+---+ +|steve |gopher |5 | +|alan |gopher |7 | ++----------+---------+---+`) +} + +func (s *SparkIntegrationTestSuite) TestDeleteMergeOnReadPartitioned() { + icebergSchema := iceberg.NewSchema(0, + iceberg.NestedField{ID: 1, Name: "first_name", Type: iceberg.PrimitiveTypes.String}, + iceberg.NestedField{ID: 2, Name: "last_name", Type: iceberg.PrimitiveTypes.String}, + iceberg.NestedField{ID: 3, Name: "age", Type: iceberg.PrimitiveTypes.Int32}, + ) + + spec := iceberg.NewPartitionSpec(iceberg.PartitionField{ + SourceID: 3, + Name: "age_bucket", + Transform: iceberg.BucketTransform{ + NumBuckets: 2, + }, + }) + tbl, err := s.cat.CreateTable(s.ctx, catalog.ToIdentifier("default", "go_test_merge_on_read_delete_partitioned"), icebergSchema, + catalog.WithProperties( + map[string]string{ + table.WriteDeleteModeKey: table.WriteModeMergeOnRead, + }, + ), + catalog.WithPartitionSpec(&spec), + ) + s.Require().NoError(err) + + arrowSchema, err := table.SchemaToArrowSchema(icebergSchema, nil, true, false) + s.Require().NoError(err) + + initialTable, err := array.TableFromJSON(memory.DefaultAllocator, arrowSchema, []string{ + `[ + {"first_name": "alan", "last_name": "gopher", "age": 7}, + {"first_name": "steve", "last_name": "gopher", "age": 5}, + {"first_name": "dead", "last_name": "gopher", "age": 97} + {"first_name": "uncle", "last_name": "gopher", "age": 90} + ]`, + }) + s.Require().NoError(err) + defer initialTable.Release() + + tx := tbl.NewTransaction() + err = tx.AppendTable(s.ctx, initialTable, 3, nil) + s.Require().NoError(err) + tbl, err = tx.Commit(s.ctx) + s.Require().NoError(err) + + // Delete the dead gopher and confirm that alan and steve are still present + filter := iceberg.NewAnd(iceberg.GreaterThan(iceberg.Reference("age"), "50"), iceberg.EqualTo(iceberg.Reference("first_name"), "dead")) + tx = tbl.NewTransaction() + err = tx.Delete(s.ctx, filter, nil) + s.Require().NoError(err) + _, err = tx.Commit(s.ctx) + s.Require().NoError(err) + + output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--sql", "SELECT * FROM default.go_test_merge_on_read_delete_partitioned ORDER BY age") + s.Require().NoError(err) + s.Require().Contains(output, `|first_name|last_name|age| ++----------+---------+---+ +|steve |gopher |5 | +|alan |gopher |7 | +|uncle |gopher |90 | ++----------+---------+---+`) +} + func TestSparkIntegration(t *testing.T) { suite.Run(t, new(SparkIntegrationTestSuite)) } diff --git a/table/writer.go b/table/writer.go index 8952b6c62..cdda03578 100644 --- a/table/writer.go +++ b/table/writer.go @@ -47,16 +47,58 @@ func (w WriteTask) GenerateDataFileName(extension string) string { return fmt.Sprintf("%05d-%d-%s-%05d.%s", w.PartitionID, w.ID, w.Uuid, w.FileCount, extension) } -type writer struct { +type defaultDataFileWriter struct { loc LocationProvider fs io.WriteFileIO fileSchema *iceberg.Schema format internal.FileFormat - props any + props iceberg.Properties + content iceberg.ManifestEntryContent meta *MetadataBuilder } -func (w *writer) writeFile(ctx context.Context, partitionValues map[int]any, task WriteTask) (iceberg.DataFile, error) { +type dataFileWriterOption func(writer *defaultDataFileWriter) + +func withFormat(format internal.FileFormat) dataFileWriterOption { + return func(writer *defaultDataFileWriter) { + writer.format = format + } +} + +func withFileSchema(schema *iceberg.Schema) dataFileWriterOption { + return func(writer *defaultDataFileWriter) { + writer.fileSchema = schema + } +} + +func withContent(content iceberg.ManifestEntryContent) dataFileWriterOption { + return func(writer *defaultDataFileWriter) { + writer.content = content + } +} + +func newDataFileWriter(rootLocation string, fs io.WriteFileIO, meta *MetadataBuilder, props iceberg.Properties, opts ...dataFileWriterOption) (*defaultDataFileWriter, error) { + locProvider, err := LoadLocationProvider(rootLocation, props) + if err != nil { + return nil, err + } + w := defaultDataFileWriter{ + loc: locProvider, + fs: fs, + fileSchema: meta.CurrentSchema(), + format: internal.GetFileFormat(iceberg.ParquetFile), + content: iceberg.EntryContentData, + props: props, + meta: meta, + } + for _, apply := range opts { + apply(&w) + } + + return &w, nil +} + +func (w *defaultDataFileWriter) writeFile(ctx context.Context, partitionValues map[int]any, task WriteTask) (iceberg.DataFile, error) { defer func() { for _, b := range task.Batches { b.Release() @@ -89,49 +131,76 @@ func (w *writer) writeFile(ctx context.Context, partitionValues map[int]any, tas return w.format.WriteDataFile(ctx, w.fs, partitionValues, internal.WriteFileInfo{ FileSchema: w.fileSchema, + Content: w.content, FileName: filePath, StatsCols: statsCols, - WriteProps: w.props, + WriteProps: w.format.GetWriteProperties(w.props), Spec: *currentSpec, }, batches) } -func writeFiles(ctx context.Context, rootLocation string, fs io.WriteFileIO, meta *MetadataBuilder, partitionValues map[int]any, tasks iter.Seq[WriteTask]) iter.Seq2[iceberg.DataFile, error] { - locProvider, err := LoadLocationProvider(rootLocation, meta.props) - if err != nil { - return func(yield func(iceberg.DataFile, error) bool) { - yield(nil, err) - } +type dataFileWriter interface { + writeFile(ctx context.Context, partitionValues map[int]any, task WriteTask) (iceberg.DataFile, error) +} + +func newPositionDeleteWriter(rootLocation string, fs io.WriteFileIO, meta *MetadataBuilder, props iceberg.Properties, opts ...dataFileWriterOption) (*defaultDataFileWriter, error) { + // Always enforce the file schema to be the Positional Delete Schema by appending the option at the very end + return newDataFileWriter(rootLocation, fs, meta, props, append(opts, withFileSchema(iceberg.PositionalDeleteSchema), withContent(iceberg.EntryContentPosDeletes))...) +} + +type dataFileWriterMaker func(rootLocation string, fs io.WriteFileIO, meta *MetadataBuilder, props iceberg.Properties, opts ...dataFileWriterOption) (dataFileWriter, error) + +type concurrentDataFileWriter struct { + newDataFileWriter dataFileWriterMaker + sanitizeSchema bool +} + +type concurrentDataFileWriterOption func(w *concurrentDataFileWriter) + +func withSchemaSanitization(enabled bool) concurrentDataFileWriterOption { + return func(w *concurrentDataFileWriter) { + w.sanitizeSchema = enabled } +} - format := internal.GetFileFormat(iceberg.ParquetFile) +func newConcurrentDataFileWriter(newDataFileWriter dataFileWriterMaker, opts ...concurrentDataFileWriterOption) *concurrentDataFileWriter { + w := concurrentDataFileWriter{ + newDataFileWriter: newDataFileWriter, + sanitizeSchema: true, + } + for _, apply := range opts { + apply(&w) + } + + return &w +} + +func (w *concurrentDataFileWriter) writeFiles(ctx context.Context, rootLocation string, fs io.WriteFileIO, meta *MetadataBuilder, props iceberg.Properties, partitionValues map[int]any, tasks iter.Seq[WriteTask]) iter.Seq2[iceberg.DataFile, error] { fileSchema := meta.CurrentSchema() - sanitized, err := iceberg.SanitizeColumnNames(fileSchema) - if err != nil { - return func(yield func(iceberg.DataFile, error) bool) { - yield(nil, err) + if w.sanitizeSchema { + sanitized, err := iceberg.SanitizeColumnNames(fileSchema) + if err != nil { + return func(yield func(iceberg.DataFile, error) bool) { + yield(nil, err) + } } - } - // if the schema needs to be transformed, use the transformed schema - // and adjust the arrow schema appropriately. otherwise we just - // use the original schema. - if !sanitized.Equals(fileSchema) { - fileSchema = sanitized + // if the schema needs to be transformed, use the transformed schema + // and adjust the arrow schema appropriately. otherwise we just + // use the original schema. + if !sanitized.Equals(fileSchema) { + fileSchema = sanitized + } } - w := &writer{ - loc: locProvider, - fs: fs, - fileSchema: fileSchema, - format: format, - props: format.GetWriteProperties(meta.props), - meta: meta, + fw, err := w.newDataFileWriter(rootLocation, fs, meta, props, withFileSchema(fileSchema)) + if err != nil { + return func(yield func(iceberg.DataFile, error) bool) { + yield(nil, err) + } } - nworkers := config.EnvConfig.MaxWorkers - - return internal.MapExec(nworkers, tasks, func(t WriteTask) (iceberg.DataFile, error) { - return w.writeFile(ctx, partitionValues, t) + return internal.MapExec(config.EnvConfig.MaxWorkers, tasks, func(t WriteTask) (iceberg.DataFile, error) { + return fw.writeFile(ctx, partitionValues, t) }) }