-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Spark 4.1: Set data file sort_order_id in manifest for writes from Spark #15150
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f85f358
2dc028c
cb1136e
17d2d72
9b29946
21c4cdd
803a6da
67252a3
ed0356a
6fc1157
45ed1bc
e04a498
2ed7f54
bc52b9e
cc03355
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,6 +42,7 @@ | |
| import org.apache.iceberg.FileFormat; | ||
| import org.apache.iceberg.IsolationLevel; | ||
| import org.apache.iceberg.SnapshotSummary; | ||
| import org.apache.iceberg.SortOrder; | ||
| import org.apache.iceberg.Table; | ||
| import org.apache.iceberg.TableProperties; | ||
| import org.apache.iceberg.TableUtil; | ||
|
|
@@ -171,6 +172,25 @@ public int outputSpecId() { | |
| return outputSpecId; | ||
| } | ||
|
|
||
| public int outputSortOrderId(SparkWriteRequirements writeRequirements) { | ||
| Integer explicitId = | ||
| confParser.intConf().option(SparkWriteOptions.OUTPUT_SORT_ORDER_ID).parseOptional(); | ||
|
|
||
| if (explicitId != null) { | ||
| Preconditions.checkArgument( | ||
| table.sortOrders().containsKey(explicitId), | ||
| "Cannot use output sort order id %s because the table does not contain a sort order with that id", | ||
| explicitId); | ||
| return explicitId; | ||
| } | ||
|
|
||
| if (writeRequirements.hasOrdering()) { | ||
| return table.sortOrder().orderId(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we using the table's sort order and not the ordering from the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There's lots of great candor outlining this above, but essentially iceberg sort order != spark sort order -> why this PR has a bit of complexity. The logic performed above essentially is saying:
I do agree that it's a bit clever and I had originally opted for a bit more of an explicit approach as agreed upon with the maintainers above.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for elaborating on the approach. It would be good to be as explicit as possible.
I'm missing that part. We are making the assumption Iceberg's SortOrder matches Spark's SortOrder and just blindly returning the current table order. Would it make sense to build a Spark SortOrder from the Iceberg SortOrder and verify whether it matches the one from WriteRequirements?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does the thread in #15150 (comment) answer your question @mxm ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But essentially they often don't match in practice which is fine. There are various write configs + table configs that can change this. Additionally, using something like partitioning w/ ordering produces spark sort orders that have the partition keys & the prefix of the spark sort order followed by the actual iceberg sort order as the suffix
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And if you're interested in following things back in time, you can revisit the original PR that has a fair bit of commentary about this as well: #14683 (comment)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks! I'll do some more digging. It looks like Spark performs its own sort optimizations which may be different from the original table sort order. |
||
| } | ||
|
|
||
| return SortOrder.unsorted().orderId(); | ||
| } | ||
|
|
||
| public FileFormat dataFileFormat() { | ||
| String valueAsString = | ||
| confParser | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sense to me as a good fix for now. We already do this for a bunch of other things like (specs) and it should be relatively lightweight.
I'd ideally also still like to just get the ability to pass the ID directly into the factory but that could be a follow up issue someone can work on.