Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 34 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,29 +60,29 @@ $ cd iceberg-go/cmd/iceberg && go build .

### Catalog Support

| Operation | REST | Hive | Glue | SQL |
|:----------------------------|:----:| :--: |:------:|:----:|
| Load Table | X | | X | X |
| List Tables | X | | X | X |
| Create Table | X | | X | X |
| Register Table | X | | X | |
| Update Current Snapshot | X | | X | X |
| Create New Snapshot | X | | X | X |
| Rename Table | X | | X | X |
| Drop Table | X | | X | X |
| Alter Table | X | | X | X |
| Check Table Exists | X | | X | X |
| Set Table Properties | X | | X | X |
| List Namespaces | X | | X | X |
| Create Namespace | X | | X | X |
| Check Namespace Exists | X | | X | X |
| Drop Namespace | X | | X | X |
| Update Namespace Properties | X | | X | X |
| Create View | X | | | X |
| Load View | | | | X |
| List View | X | | | X |
| Drop View | X | | | X |
| Check View Exists | X | | | X |
| Operation | REST | Hive | Glue | SQL |
|:----------------------------|:----:| :--: |:------:|:---:|
| Load Table | X | | X | X |
| List Tables | X | | X | X |
| Create Table | X | | X | X |
| Register Table | X | | X | |
| Update Current Snapshot | X | | X | X |
| Create New Snapshot | X | | X | X |
| Rename Table | X | | X | X |
| Drop Table | X | | X | X |
| Alter Table | X | | X | X |
| Check Table Exists | X | | X | X |
| Set Table Properties | X | | X | X |
| List Namespaces | X | | X | X |
| Create Namespace | X | | X | X |
| Check Namespace Exists | X | | X | X |
| Drop Namespace | X | | X | X |
| Update Namespace Properties | X | | X | X |
| Create View | X | | | X |
| Load View | | | | X |
| List View | X | | | X |
| Drop View | X | | | X |
| Check View Exists | X | | | X |

### Read/Write Data Support

Expand All @@ -93,17 +93,17 @@ $ cd iceberg-go/cmd/iceberg && go build .
As long as the FileSystem is supported and the Catalog supports altering
the table, the following tracks the current write support:

| Operation | Supported |
|:-----------------------:|:---------:|
| Append Stream | X |
| Append Data Files | X |
| Rewrite Files | |
| Rewrite manifests | |
| Overwrite Files | X |
| Copy-On-Write Delete | X |
| Write Pos Delete | |
| Write Eq Delete | |
| Row Delta | |
| Operation | Supported |
|:---------------------|:---------:|
| Append Stream | X |
| Append Data Files | X |
| Rewrite Files | |
| Rewrite manifests | |
| Overwrite Files | X |
| Copy-On-Write Delete | X |
| Write Pos Delete | X |
| Write Eq Delete | |
| Row Delta | |


### CLI Usage
Expand Down
8 changes: 4 additions & 4 deletions manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1136,7 +1136,7 @@ func (w *ManifestWriter) Close() error {
return w.writer.Close()
}

func (w *ManifestWriter) ToManifestFile(location string, length int64) (ManifestFile, error) {
func (w *ManifestWriter) ToManifestFile(location string, length int64, content ManifestContent) (ManifestFile, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think table/internal.DataFileStatistics.ToDataFile signature changes are acceptable (it's internal so who care?), but ManifestWriter.ToManifestFile is externally visible and now requires a new content arg.

This is backward incompatible change so 2 questions:

  1. Is it really really needed?
  2. If it's needed - we need to state it in changelog.

if err := w.Close(); err != nil {
return nil, err
}
Expand All @@ -1155,7 +1155,7 @@ func (w *ManifestWriter) ToManifestFile(location string, length int64) (Manifest
Path: location,
Len: length,
SpecID: int32(w.spec.id),
Content: ManifestContentData,
Content: content,
SeqNumber: -1,
MinSeqNumber: w.minSeqNum,
AddedSnapshotID: w.snapshotID,
Expand Down Expand Up @@ -1500,7 +1500,7 @@ func WriteManifest(
return nil, err
}

return w.ToManifestFile(filename, cnt.Count)
return w.ToManifestFile(filename, cnt.Count, ManifestContentData)
}

// ManifestEntryStatus defines constants for the entry status of
Expand Down Expand Up @@ -2314,5 +2314,5 @@ type ManifestEntry interface {

var PositionalDeleteSchema = NewSchema(0,
NestedField{ID: 2147483546, Type: PrimitiveTypes.String, Name: "file_path", Required: true},
NestedField{ID: 2147483545, Type: PrimitiveTypes.Int32, Name: "pos", Required: true},
NestedField{ID: 2147483545, Type: PrimitiveTypes.Int64, Name: "pos", Required: true},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spec says that pos is a long so I updated this to be Int64 rather than Int32.

)
123 changes: 123 additions & 0 deletions table/arrow_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package table

import (
"context"
"errors"
"io"
"iter"
"strconv"
Expand All @@ -42,6 +43,8 @@ const (
ScanOptionArrowUseLargeTypes = "arrow.use_large_types"
)

var PositionalDeleteArrowSchema, _ = SchemaToArrowSchema(iceberg.PositionalDeleteSchema, nil, true, false)

type (
positionDeletes = []*arrow.Chunked
perFilePosDeletes = map[string]positionDeletes
Expand Down Expand Up @@ -189,6 +192,54 @@ func processPositionalDeletes(ctx context.Context, deletes set[int64]) recProces
}
}

// enrichRecordsWithPosDeleteFields enriches a RecordBatch with the columns declared in the PositionalDeleteArrowSchema
// so that during the pipeline filtering stages that sheds filtered out records, we still have a way to
// preserve the original position of those records.
func enrichRecordsWithPosDeleteFields(ctx context.Context, filePath iceberg.DataFile) recProcessFn {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add focused unit tests for enrichRecordsWithPosDeleteFields:

  • monotonically increasing pos across multiple input batches
  • exact appended schema/column ordering
  • empty batch behavior
  • failure mode if required fields are missing from PositionalDeleteArrowSchema.

nextIdx, mem := int64(0), compute.GetAllocator(ctx)

return func(inData arrow.RecordBatch) (outData arrow.RecordBatch, err error) {
defer inData.Release()

schema := inData.Schema()
fieldIdx := schema.NumFields()
filePathField, ok := PositionalDeleteArrowSchema.FieldsByName("file_path")
if !ok {
return nil, errors.New("position delete schema should have required field 'file_path'")
}
posField, ok := PositionalDeleteArrowSchema.FieldsByName("pos")
if !ok {
return nil, errors.New("position delete schema should have required field 'pos'")
}
schema, err = schema.AddField(fieldIdx, filePathField[0])
if err != nil {
return nil, err
}
schema, err = schema.AddField(fieldIdx+1, posField[0])
if err != nil {
return nil, err
}

filePathBuilder := array.NewStringBuilder(mem)
defer filePathBuilder.Release()
posBuilder := array.NewInt64Builder(mem)
defer posBuilder.Release()

startPos := nextIdx
nextIdx += inData.NumRows()

for i := startPos; i < nextIdx; i++ {
filePathBuilder.Append(filePath.FilePath())
posBuilder.Append(i)
}

columns := append(inData.Columns(), filePathBuilder.NewArray(), posBuilder.NewArray())
outData = array.NewRecordBatch(schema, columns, inData.NumRows())

return outData, err
}
}

func filterRecords(ctx context.Context, recordFilter expr.Expression) recProcessFn {
return func(rec arrow.RecordBatch) (arrow.RecordBatch, error) {
defer rec.Release()
Expand Down Expand Up @@ -454,6 +505,78 @@ func (as *arrowScan) recordsFromTask(ctx context.Context, task internal.Enumerat
return err
}

func (as *arrowScan) producePosDeletesFromTask(ctx context.Context, task internal.Enumerated[FileScanTask], positionalDeletes positionDeletes, out chan<- enumeratedRecord) (err error) {
defer func() {
if err != nil {
out <- enumeratedRecord{Task: task, Err: err}
}
}()

var (
rdr internal.FileReader
iceSchema *iceberg.Schema
colIndices []int
filterFunc recProcessFn
dropFile bool
)

iceSchema, colIndices, rdr, err = as.prepareToRead(ctx, task.Value.File)
if err != nil {
return err
}
defer iceinternal.CheckedClose(rdr, &err)

fields := append(iceSchema.Fields(), iceberg.PositionalDeleteSchema.Fields()...)
enrichedIcebergSchema := iceberg.NewSchema(iceSchema.ID+1, fields...)

pipeline := make([]recProcessFn, 0, 2)
pipeline = append(pipeline, enrichRecordsWithPosDeleteFields(ctx, task.Value.File))
if len(positionalDeletes) > 0 {
deletes := set[int64]{}
for _, chunk := range positionalDeletes {
for _, a := range chunk.Chunks() {
for _, v := range a.(*array.Int64).Int64Values() {
deletes[v] = struct{}{}
}
}
}

pipeline = append(pipeline, processPositionalDeletes(ctx, deletes))
}

filterFunc, dropFile, err = as.getRecordFilter(ctx, iceSchema)
if err != nil {
return err
}

// Nothing to delete in a dropped file
if dropFile {
var emptySchema *arrow.Schema
emptySchema, err = SchemaToArrowSchema(as.projectedSchema, nil, false, as.useLargeTypes)
if err != nil {
return err
}
out <- enumeratedRecord{Task: task, Record: internal.Enumerated[arrow.RecordBatch]{
Value: array.NewRecordBatch(emptySchema, nil, 0), Index: 0, Last: true,
}}

return err
}

if filterFunc != nil {
pipeline = append(pipeline, filterFunc)
}
pipeline = append(pipeline, func(r arrow.RecordBatch) (arrow.RecordBatch, error) {
defer r.Release()

return ToRequestedSchema(ctx, iceberg.PositionalDeleteSchema, enrichedIcebergSchema, r, false, true, as.useLargeTypes)
})

err = as.processRecords(ctx, task, iceSchema, rdr, colIndices, pipeline, out)

return err
}

func createIterator(ctx context.Context, numWorkers uint, records <-chan enumeratedRecord, deletesPerFile perFilePosDeletes, cancel context.CancelCauseFunc, rowLimit int64) iter.Seq2[arrow.RecordBatch, error] {
isBeforeAny := func(batch enumeratedRecord) bool {
return batch.Task.Index < 0
Expand Down
87 changes: 79 additions & 8 deletions table/arrow_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1258,7 +1258,7 @@ func filesToDataFiles(ctx context.Context, fileIO iceio.IO, meta *MetadataBuilde
}
}

df := statistics.ToDataFile(currentSchema, currentSpec, filePath, iceberg.ParquetFile, rdr.SourceFileSize(), partitionValues)
df := statistics.ToDataFile(currentSchema, currentSpec, filePath, iceberg.ParquetFile, iceberg.EntryContentData, rdr.SourceFileSize(), partitionValues)
if !yield(df, nil) {
return
}
Expand Down Expand Up @@ -1335,6 +1335,9 @@ func recordsToDataFiles(ctx context.Context, rootLocation string, meta *Metadata
panic(fmt.Errorf("%w: cannot write files without a current spec", err))
}

cw := newConcurrentDataFileWriter(func(rootLocation string, fs iceio.WriteFileIO, meta *MetadataBuilder, props iceberg.Properties, opts ...dataFileWriterOption) (dataFileWriter, error) {
return newDataFileWriter(rootLocation, fs, meta, props, opts...)
})
nextCount, stopCount := iter.Pull(args.counter)
if currentSpec.IsUnpartitioned() {
tasks := func(yield func(WriteTask) bool) {
Expand All @@ -1358,13 +1361,81 @@ func recordsToDataFiles(ctx context.Context, rootLocation string, meta *Metadata
}
}

return writeFiles(ctx, rootLocation, args.fs, meta, nil, tasks)
} else {
partitionWriter := newPartitionedFanoutWriter(*currentSpec, meta.CurrentSchema(), args.itr)
rollingDataWriters := NewWriterFactory(rootLocation, args, meta, taskSchema, targetFileSize)
partitionWriter.writers = &rollingDataWriters
workers := config.EnvConfig.MaxWorkers
return cw.writeFiles(ctx, rootLocation, args.fs, meta, meta.props, nil, tasks)
}

partitionWriter := newPartitionedFanoutWriter(*currentSpec, cw, meta.CurrentSchema(), args.itr)
partitionWriter.writers = newWriterFactory(rootLocation, args, meta, taskSchema, targetFileSize)
workers := config.EnvConfig.MaxWorkers

return partitionWriter.Write(ctx, workers)
}

func positionDeleteRecordsToDataFiles(ctx context.Context, rootLocation string, meta *MetadataBuilder, partitionDataPerFile map[string]map[int]any, args recordWritingArgs) (ret iter.Seq2[iceberg.DataFile, error]) {
if args.counter == nil {
args.counter = internal.Counter(0)
}

defer func() {
if r := recover(); r != nil {
var err error
switch e := r.(type) {
case string:
err = fmt.Errorf("error encountered during position delete file writing %s", e)
case error:
err = fmt.Errorf("error encountered during position delete file writing: %w", e)
}
ret = func(yield func(iceberg.DataFile, error) bool) {
yield(nil, err)
}
}
}()

return partitionWriter.Write(ctx, workers)
if args.writeUUID == nil {
u := uuid.Must(uuid.NewRandom())
args.writeUUID = &u
}

targetFileSize := int64(meta.props.GetInt(WriteTargetFileSizeBytesKey,
WriteTargetFileSizeBytesDefault))

currentSpec, err := meta.CurrentSpec()
if err != nil || currentSpec == nil {
panic(fmt.Errorf("%w: cannot write files without a current spec", err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid panic/recover for expected failures here?
CurrentSpec() and schema conversion errors are regular error cases and returning an error iterator directly would be clearer/safer than panicking. Also, the recover block only handles string and error; any other panic type could produce yield(nil, nil), which masks failure.

}

cw := newConcurrentDataFileWriter(func(rootLocation string, fs iceio.WriteFileIO, meta *MetadataBuilder, props iceberg.Properties, opts ...dataFileWriterOption) (dataFileWriter, error) {
return newPositionDeleteWriter(rootLocation, fs, meta, props, opts...)
}, withSchemaSanitization(false))
nextCount, stopCount := iter.Pull(args.counter)
if currentSpec.IsUnpartitioned() {
tasks := func(yield func(WriteTask) bool) {
defer stopCount()

fileCount := 0
for batch := range binPackRecords(args.itr, defaultBinPackLookback, targetFileSize) {
cnt, _ := nextCount()
fileCount++
t := WriteTask{
Uuid: *args.writeUUID,
ID: cnt,
PartitionID: iceberg.UnpartitionedSpec.ID(),
FileCount: fileCount,
Schema: iceberg.PositionalDeleteSchema,
Batches: batch,
}
if !yield(t) {
return
}
}
}

return cw.writeFiles(ctx, rootLocation, args.fs, meta, meta.props, nil, tasks)
}

partitionWriter := newPositionDeletePartitionedFanoutWriter(*currentSpec, cw, partitionDataPerFile, args.itr)
partitionWriter.writers = newWriterFactory(rootLocation, args, meta, iceberg.PositionalDeleteSchema, targetFileSize)
workers := config.EnvConfig.MaxWorkers

return partitionWriter.Write(ctx, workers)
}
2 changes: 1 addition & 1 deletion table/arrow_utils_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (suite *FileStatsMetricsSuite) getDataFile(meta iceberg.Properties, writeSt
stats := format.DataFileStatsFromMeta(fileMeta, collector, mapping)

return stats.ToDataFile(tableMeta.CurrentSchema(), tableMeta.PartitionSpec(), "fake-path.parquet",
iceberg.ParquetFile, fileMeta.GetSourceFileSize(), nil)
iceberg.ParquetFile, iceberg.EntryContentData, fileMeta.GetSourceFileSize(), nil)
}

func (suite *FileStatsMetricsSuite) TestRecordCount() {
Expand Down
1 change: 1 addition & 0 deletions table/internal/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,5 @@ type WriteFileInfo struct {
FileName string
StatsCols map[int]StatisticsCollector
WriteProps any
Content iceberg.ManifestEntryContent
}
2 changes: 1 addition & 1 deletion table/internal/parquet_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (p parquetFormat) WriteDataFile(ctx context.Context, fs iceio.WriteFileIO,
}

return p.DataFileStatsFromMeta(filemeta, info.StatsCols, colMapping).
ToDataFile(info.FileSchema, info.Spec, info.FileName, iceberg.ParquetFile, cntWriter.Count, partitionValues), nil
ToDataFile(info.FileSchema, info.Spec, info.FileName, iceberg.ParquetFile, info.Content, cntWriter.Count, partitionValues), nil
}

type decAsIntAgg[T int32 | int64] struct {
Expand Down
Loading
Loading