Skip to content

Guard cached parquet GPU columns during compression#15083

Open
fallintoplace wants to merge 1 commit into
NVIDIA:mainfrom
fallintoplace:fix-cached-parquet-column-lifecycle
Open

Guard cached parquet GPU columns during compression#15083
fallintoplace wants to merge 1 commit into
NVIDIA:mainfrom
fallintoplace:fix-cached-parquet-column-lifecycle

Conversation

@fallintoplace

@fallintoplace fallintoplace commented Jun 14, 2026

Copy link
Copy Markdown

No issue filed.

Description

Guard the GPU column vectors created during cached-Parquet compression until they are handed off to a ColumnarBatch. safeMap closes partially produced vectors if a later conversion fails, and closeOnExcept protects the handoff into the batch.

Checklists

Documentation

  • Updated for new or modified user-facing features or behaviors
  • No user-facing change

Testing

  • Added or modified tests to cover new code paths
  • Covered by existing tests
    (Please provide the names of the existing tests in the PR description.)
  • Not required

Performance

  • Tests ran and results are added in the PR description
  • Issue filed with a link in the PR description
  • Not required

@greptile-apps

greptile-apps Bot commented Jun 14, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR adds exception-safety guards for the GPU ColumnVector instances produced inside compressColumnarBatchWithParquet. Previously, if any column conversion failed mid-loop, already-produced GpuColumnVector objects would be leaked; the same was true if ColumnarBatch construction threw after all columns were built.

  • Replaces the plain for-comprehension over column indices with .toArray.safeMap, so that GpuColumnVector objects accumulated before a failing iteration are automatically closed on exception.
  • Wraps the new ColumnarBatch(columns, numRows) call in closeOnExcept(columns), ensuring all column vectors are released if the batch constructor ever throws before it can take ownership.

Confidence Score: 5/5

Safe to merge. The two-line mechanical change is internally consistent: safeMap correctly tracks owned GpuColumnVector outputs, and closeOnExcept correctly guards the handoff into ColumnarBatch.

Both new constructs (safeMap and closeOnExcept on the columns array) are used exactly as documented in the project's Arm.scala and RapidsPluginImplicits, and the type algebra works out cleanly at runtime due to Java array covariance. No incorrect behavior, double-close hazard, or new allocation without a guard is introduced by the diff itself.

No files require special attention.

Important Files Changed

Filename Overview
sql-plugin/src/main/scala/com/nvidia/spark/rapids/parquet/ParquetCachedBatchSerializer.scala Adds safeMap + closeOnExcept guards around GPU column vector production in compressColumnarBatchWithParquet; import additions are minimal and correct.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant C as compressColumnarBatch
    participant SM as safeMap
    participant CC as ColumnCastUtil
    participant GCV as GpuColumnVector.from
    participant COE as closeOnExcept(columns)
    participant CB as ColumnarBatch
    participant WR as withResource

    C->>SM: (0 until numCols).toArray.safeMap
    loop for each column i
        SM->>CC: ifTrueThenDeepConvertTypeAtoTypeB(...)
        CC-->>SM: v (ColumnVector)
        SM->>GCV: from(v, dataType)
        alt GpuColumnVector.from throws
            SM->>SM: safeClose already-produced vectors
            SM-->>C: rethrows
        else success
            GCV-->>SM: GpuColumnVector
        end
    end
    SM-->>C: columns: Array[SparkColumnVector]
    C->>COE: closeOnExcept(columns)
    COE->>CB: new ColumnarBatch(columns, numRows)
    alt ColumnarBatch ctor throws
        COE->>COE: safeClose all columns
        COE-->>C: rethrows
    else success
        CB-->>COE: newGpuCB
        COE-->>C: newGpuCB
    end
    C->>WR: withResource(newGpuCB)
    WR->>WR: split/write columns
    WR->>CB: close() closes all GpuColumnVectors
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant C as compressColumnarBatch
    participant SM as safeMap
    participant CC as ColumnCastUtil
    participant GCV as GpuColumnVector.from
    participant COE as closeOnExcept(columns)
    participant CB as ColumnarBatch
    participant WR as withResource

    C->>SM: (0 until numCols).toArray.safeMap
    loop for each column i
        SM->>CC: ifTrueThenDeepConvertTypeAtoTypeB(...)
        CC-->>SM: v (ColumnVector)
        SM->>GCV: from(v, dataType)
        alt GpuColumnVector.from throws
            SM->>SM: safeClose already-produced vectors
            SM-->>C: rethrows
        else success
            GCV-->>SM: GpuColumnVector
        end
    end
    SM-->>C: columns: Array[SparkColumnVector]
    C->>COE: closeOnExcept(columns)
    COE->>CB: new ColumnarBatch(columns, numRows)
    alt ColumnarBatch ctor throws
        COE->>COE: safeClose all columns
        COE-->>C: rethrows
    else success
        CB-->>COE: newGpuCB
        COE-->>C: newGpuCB
    end
    C->>WR: withResource(newGpuCB)
    WR->>WR: split/write columns
    WR->>CB: close() closes all GpuColumnVectors
Loading

Reviews (1): Last reviewed commit: "Guard cached parquet GPU columns during ..." | Re-trigger Greptile

Comment on lines 368 to 386
@@ -385,7 +385,10 @@ class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer {
)
GpuColumnVector.from(v, schema(i).dataType)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Intermediate v resource still unguarded on exception path

ifTrueThenDeepConvertTypeAtoTypeB always returns an owned ColumnVector — either copyToColumnVector() (new allocation) or incRefCount() (bumped ref). If GpuColumnVector.from(v, schema(i).dataType) throws (e.g., the assert typeConversionAllowed(...) fires when running with -ea), v is leaked because nothing closes it on the failure path. Wrapping the from call with closeOnExcept(v) { v => GpuColumnVector.from(v, schema(i).dataType) } would close v only on exception while correctly leaving ownership with the new GpuColumnVector on success.

Signed-off-by: Minh Vu <vuhoangminh97@gmail.com>
@fallintoplace fallintoplace force-pushed the fix-cached-parquet-column-lifecycle branch from 5a54cb4 to 881e71d Compare June 14, 2026 10:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants