diff --git a/table/arrow_scanner.go b/table/arrow_scanner.go index 26b18d191..02da7fa1d 100644 --- a/table/arrow_scanner.go +++ b/table/arrow_scanner.go @@ -391,6 +391,32 @@ func groupPosDeletesByFilePath(ctx context.Context, filePathCol, posCol *arrow.C return results, nil } +func collectPosDeletePositions(positionalDeletes positionDeletes) (set[int64], error) { + deletes := set[int64]{} + for _, chunk := range positionalDeletes { + if chunk == nil { + return nil, fmt.Errorf("%w: nil pos column chunk in position delete file", + iceberg.ErrInvalidSchema) + } + if chunk.DataType().ID() != arrow.INT64 { + return nil, fmt.Errorf("%w: unsupported pos column type %s in position delete file", + iceberg.ErrInvalidSchema, chunk.DataType()) + } + for _, arr := range chunk.Chunks() { + posArr, ok := arr.(*array.Int64) + if !ok { + return nil, fmt.Errorf("%w: unsupported pos chunk array type %T in position delete file", + iceberg.ErrInvalidSchema, arr) + } + for _, v := range posArr.Int64Values() { + deletes[v] = struct{}{} + } + } + } + + return deletes, nil +} + func releasePosDeletes(deletes map[string]*arrow.Chunked) { for _, chunk := range deletes { if chunk != nil { @@ -1060,13 +1086,9 @@ func (as *arrowScan) recordsFromTask(ctx context.Context, task internal.Enumerat } if applyPosDeletes { - deletes := set[int64]{} - for _, chunk := range positionalDeletes { - for _, a := range chunk.Chunks() { - for _, v := range a.(*array.Int64).Int64Values() { - deletes[v] = struct{}{} - } - } + deletes, err := collectPosDeletePositions(positionalDeletes) + if err != nil { + return err } pipeline = append(pipeline, processPositionalDeletes(ctx, deletes, posSource.cursor())) @@ -1151,13 +1173,9 @@ func (as *arrowScan) producePosDeletesFromTask(ctx context.Context, task interna pipeline := make([]recProcessFn, 0, 2) pipeline = append(pipeline, enrichRecordsWithPosDeleteFields(ctx, task.Value.File, posSource.cursor())) if len(positionalDeletes) > 0 { - deletes := set[int64]{} - for _, chunk := range positionalDeletes { - for _, a := range chunk.Chunks() { - for _, v := range a.(*array.Int64).Int64Values() { - deletes[v] = struct{}{} - } - } + deletes, err := collectPosDeletePositions(positionalDeletes) + if err != nil { + return err } pipeline = append(pipeline, processPositionalDeletes(ctx, deletes, posSource.cursor())) diff --git a/table/arrow_scanner_posdelete_regression_test.go b/table/arrow_scanner_posdelete_regression_test.go index e9bca3e9f..955809bca 100644 --- a/table/arrow_scanner_posdelete_regression_test.go +++ b/table/arrow_scanner_posdelete_regression_test.go @@ -250,6 +250,21 @@ func TestGroupPosDeletesByFilePathRejectsUnsupportedFilePathLayout(t *testing.T) } } +func TestCollectPosDeletePositionsRejectsUnsupportedPosType(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(t, 0) + + posArr := int32Array(mem, 1, 2) + defer posArr.Release() + + posCol := arrow.NewChunked(arrow.PrimitiveTypes.Int32, []arrow.Array{posArr}) + defer posCol.Release() + + _, err := collectPosDeletePositions(positionDeletes{posCol}) + require.ErrorIs(t, err, iceberg.ErrInvalidSchema) + assert.Contains(t, err.Error(), "unsupported pos column type") +} + // TestProcessPositionalDeletesAcrossBatches is the regression net for the // positional-delete index bug: processPositionalDeletes applies deletes one Arrow // batch at a time, but the surviving-row indices must index into the *current