Skip to content

[GLUTEN-12280][VL] Fix Spark 4 Arrow Python UDF stream writer#12345

Open
ReemaAlzaid wants to merge 4 commits into
apache:mainfrom
ReemaAlzaid:fix-pyarrow
Open

[GLUTEN-12280][VL] Fix Spark 4 Arrow Python UDF stream writer#12345
ReemaAlzaid wants to merge 4 commits into
apache:mainfrom
ReemaAlzaid:fix-pyarrow

Conversation

@ReemaAlzaid

Copy link
Copy Markdown
Contributor

What changes are proposed in this pull request?

Fixes #12280.

Fix Spark 4 Arrow Python UDF execution with the Velox backend by keeping the Arrow stream writer alive across input batches instead of reopening the IPC stream per batch.

Also adds a regression test for Arrow Python UDF over Parquet scan

How was this patch tested?

Added ArrowEvalPythonExecSuite coverage.

Verified locally on Spark 4.0.2 / Scala 2.13 / linux aarch64. The repro uses ColumnarArrowPythonRunner, returns max(ship_len) = 7, and no longer fails with Invalid IPC stream

@github-actions github-actions Bot added the VELOX label Jun 23, 2026

@liuneng1994 liuneng1994 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes Spark 4 Arrow Python UDF execution for the Velox backend by changing ColumnarArrowPythonRunner to keep a single Arrow IPC stream writer open across input batches (instead of effectively reopening the stream per batch), and adds a Spark-4-only regression test that exercises an Arrow-batched Python UDF over a Parquet scan.

Changes:

  • Update Spark 4 writeNextInputToStream path to reuse VectorSchemaRoot/VectorLoader/ArrowStreamWriter across batches and close at task completion.
  • Add a regression test for Arrow-batched Python UDF over Parquet scan + aggregation.
  • Add Spark 4 constructor handling in test UDF creation to require explicit return type.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala Keep Arrow IPC stream writer alive across Spark 4 batch writes; close on completion.
backends-velox/src/test/scala/org/apache/gluten/execution/python/ArrowEvalPythonExecSuite.scala Add Spark 4 regression test covering Arrow-batched Python UDF over Parquet scan.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +207 to +216
private def writeNextInputToStreamHelper(dataOut: DataOutputStream): Boolean = {
ensureNextInputWriter(dataOut)
if (!inputIterator.hasNext) {
closeNextInputWriter()
// See https://issues.apache.org/jira/browse/SPARK-44705:
// Starting from Spark 4.0, we should return false once the iterator is drained out,
// otherwise Spark won't stop calling this method repeatedly.
return false
}
val nextBatch = inputIterator.next()
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[VL] pyarrow UDF is broken

3 participants