[GLUTEN-12280][VL] Fix Spark 4 Arrow Python UDF stream writer#12345
Open
ReemaAlzaid wants to merge 4 commits into
Open
[GLUTEN-12280][VL] Fix Spark 4 Arrow Python UDF stream writer#12345ReemaAlzaid wants to merge 4 commits into
ReemaAlzaid wants to merge 4 commits into
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
Fixes Spark 4 Arrow Python UDF execution for the Velox backend by changing ColumnarArrowPythonRunner to keep a single Arrow IPC stream writer open across input batches (instead of effectively reopening the stream per batch), and adds a Spark-4-only regression test that exercises an Arrow-batched Python UDF over a Parquet scan.
Changes:
- Update Spark 4
writeNextInputToStreampath to reuseVectorSchemaRoot/VectorLoader/ArrowStreamWriteracross batches and close at task completion. - Add a regression test for Arrow-batched Python UDF over Parquet scan + aggregation.
- Add Spark 4 constructor handling in test UDF creation to require explicit return type.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala | Keep Arrow IPC stream writer alive across Spark 4 batch writes; close on completion. |
| backends-velox/src/test/scala/org/apache/gluten/execution/python/ArrowEvalPythonExecSuite.scala | Add Spark 4 regression test covering Arrow-batched Python UDF over Parquet scan. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+207
to
+216
| private def writeNextInputToStreamHelper(dataOut: DataOutputStream): Boolean = { | ||
| ensureNextInputWriter(dataOut) | ||
| if (!inputIterator.hasNext) { | ||
| closeNextInputWriter() | ||
| // See https://issues.apache.org/jira/browse/SPARK-44705: | ||
| // Starting from Spark 4.0, we should return false once the iterator is drained out, | ||
| // otherwise Spark won't stop calling this method repeatedly. | ||
| return false | ||
| } | ||
| val nextBatch = inputIterator.next() |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes are proposed in this pull request?
Fixes #12280.
Fix Spark 4 Arrow Python UDF execution with the Velox backend by keeping the Arrow stream writer alive across input batches instead of reopening the IPC stream per batch.
Also adds a regression test for Arrow Python UDF over Parquet scan
How was this patch tested?
Added
ArrowEvalPythonExecSuitecoverage.Verified locally on Spark 4.0.2 / Scala 2.13 / linux aarch64. The repro uses
ColumnarArrowPythonRunner, returnsmax(ship_len) = 7, and no longer fails withInvalid IPC stream