Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 32 additions & 14 deletions table/arrow_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,32 @@ func groupPosDeletesByFilePath(ctx context.Context, filePathCol, posCol *arrow.C
return results, nil
}

func collectPosDeletePositions(positionalDeletes positionDeletes) (set[int64], error) {
deletes := set[int64]{}
for _, chunk := range positionalDeletes {
if chunk == nil {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the nil-chunk guard meant purely as defensiveness? groupPosDeletesByFilePath only ever stores retained, non-nil chunks into the map, so this branch shouldn't be reachable from the scan path — just want to make sure I'm not missing a caller that can pass a nil chunk in.

return nil, fmt.Errorf("%w: nil pos column chunk in position delete file",
iceberg.ErrInvalidSchema)
}
if chunk.DataType().ID() != arrow.INT64 {
return nil, fmt.Errorf("%w: unsupported pos column type %s in position delete file",
iceberg.ErrInvalidSchema, chunk.DataType())
}
for _, arr := range chunk.Chunks() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inner arr.(*array.Int64) assertion is unreachable here — once the outer chunk.DataType().ID() == arrow.INT64 check at line 401 passes, every chunk of that column is already *array.Int64, so the
!ok branch and its separate "unsupported pos chunk array type" message can't fire. Not a problem to leave as defensive code, but you could drop the , ok check (or fold the two error messages into one) to
keep it tight.

posArr, ok := arr.(*array.Int64)
if !ok {
return nil, fmt.Errorf("%w: unsupported pos chunk array type %T in position delete file",
iceberg.ErrInvalidSchema, arr)
}
for _, v := range posArr.Int64Values() {
deletes[v] = struct{}{}
}
}
}

return deletes, nil
}

func releasePosDeletes(deletes map[string]*arrow.Chunked) {
for _, chunk := range deletes {
if chunk != nil {
Expand Down Expand Up @@ -1060,13 +1086,9 @@ func (as *arrowScan) recordsFromTask(ctx context.Context, task internal.Enumerat
}

if applyPosDeletes {
deletes := set[int64]{}
for _, chunk := range positionalDeletes {
for _, a := range chunk.Chunks() {
for _, v := range a.(*array.Int64).Int64Values() {
deletes[v] = struct{}{}
}
}
deletes, err := collectPosDeletePositions(positionalDeletes)
if err != nil {
return err
}

pipeline = append(pipeline, processPositionalDeletes(ctx, deletes, posSource.cursor()))
Expand Down Expand Up @@ -1151,13 +1173,9 @@ func (as *arrowScan) producePosDeletesFromTask(ctx context.Context, task interna
pipeline := make([]recProcessFn, 0, 2)
pipeline = append(pipeline, enrichRecordsWithPosDeleteFields(ctx, task.Value.File, posSource.cursor()))
if len(positionalDeletes) > 0 {
deletes := set[int64]{}
for _, chunk := range positionalDeletes {
for _, a := range chunk.Chunks() {
for _, v := range a.(*array.Int64).Int64Values() {
deletes[v] = struct{}{}
}
}
deletes, err := collectPosDeletePositions(positionalDeletes)
if err != nil {
return err
}

pipeline = append(pipeline, processPositionalDeletes(ctx, deletes, posSource.cursor()))
Expand Down
15 changes: 15 additions & 0 deletions table/arrow_scanner_posdelete_regression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,21 @@ func TestGroupPosDeletesByFilePathRejectsUnsupportedFilePathLayout(t *testing.T)
}
}

func TestCollectPosDeletePositionsRejectsUnsupportedPosType(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)

posArr := int32Array(mem, 1, 2)
defer posArr.Release()

posCol := arrow.NewChunked(arrow.PrimitiveTypes.Int32, []arrow.Array{posArr})
defer posCol.Release()

_, err := collectPosDeletePositions(positionDeletes{posCol})
require.ErrorIs(t, err, iceberg.ErrInvalidSchema)
assert.Contains(t, err.Error(), "unsupported pos column type")
}

// TestProcessPositionalDeletesAcrossBatches is the regression net for the
// positional-delete index bug: processPositionalDeletes applies deletes one Arrow
// batch at a time, but the surviving-row indices must index into the *current
Expand Down
Loading