Spark 4.1: Set data file sort_order_id in manifest for writes from Spark#15150
Spark 4.1: Set data file sort_order_id in manifest for writes from Spark#15150RussellSpitzer merged 15 commits intoapache:mainfrom
Conversation
|
Thanks @jbewing. @RussellSpitzer - I know the Iceberg summit abstracts are keeping you busy, could you take a look when you get a chance? Thank you! |
| /** A set of requirements such as distribution and ordering reported to Spark during writes. */ | ||
| public class SparkWriteRequirements { | ||
|
|
||
| public static final long NO_ADVISORY_PARTITION_SIZE = 0; |
There was a problem hiding this comment.
How is this change related to the PR?
There was a problem hiding this comment.
This isn't directly related, but given that I was refactoring SparkWriteRequirements I stumbled upon this unnamed constant e.g. 0 and decided to pay it forwards into a named constant since I was modifying this class anyway and it took me a minute to figure out what 0 meant without context
There was a problem hiding this comment.
Happy to revert if you'd like, but I believe that the:
- Extraction into a named constant
- Shifting from read-time calculated many times to write-once calculated
is a net good change that I made along the way with adding the additional instance field for Iceberg sort order.
There was a problem hiding this comment.
I'd just pull this out completely into another PR now. I promise I will review and approve it within 24 hours if you do :)
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
Show resolved
Hide resolved
| .mode(SaveMode.Append) | ||
| .save(location.toString()); | ||
|
|
||
| createBranch(table); |
There was a problem hiding this comment.
Yeah I copied this test template from the others in this class. I believe they all/most have this and it looks to be leftover from #6965. Happy to remove, but I was blindly following the "template" that is in place here and going for consistency. So TL;DR: no good reason that we're branching except that almost all the tests in this class do it and I copied that template
There was a problem hiding this comment.
Yeah this whole file is full of some things i'm a little confused about, I don't think it's worth copying them. We can put a followup issue to clean up this file.
There was a problem hiding this comment.
+1 We should not be branching here (and elsewhere) for no reason.
|
|
||
| HadoopTables tables = new HadoopTables(CONF); | ||
| PartitionSpec spec = PartitionSpec.unpartitioned(); | ||
| Table table = tables.create(SCHEMA, spec, location.toString()); |
There was a problem hiding this comment.
Can't we set the sort order here?
| result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); | ||
| assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected); | ||
|
|
||
| List<DataFile> files = Lists.newArrayList(); |
There was a problem hiding this comment.
Could use -
table.newScan().planFiles()
spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
Show resolved
Hide resolved
|
|
||
| table.replaceSortOrder().asc("id").commit(); | ||
|
|
||
| List<SimpleRecord> expected = Lists.newArrayListWithCapacity(4000); |
There was a problem hiding this comment.
nit: we don't really need 4000 rows for this test (theoretically we only need 1) but probably wouldn't speed this test up if there where less rows.
There was a problem hiding this comment.
Can reduce. Full disclosure, this test is a copy-pasta derivative of our friend testUnpartitionedCreateWithTargetFileSizeViaTableProperties which is why they share a few similarities 😅
| assertThat(files) | ||
| .extracting(fileScanTask -> fileScanTask.file().sortOrderId()) | ||
| .containsOnly(sortOrder.orderId()); | ||
| } catch (IOException e) { |
There was a problem hiding this comment.
it's ok to just rethrow in the tests, we do that pattern pretty frequently
There was a problem hiding this comment.
private void dataFilesShould...() Throws IOException {
}There was a problem hiding this comment.
👍 will address this in the upcoming revision. I typically just like to get the checked exception over with, but will change it to propagate upwards in the test
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteRequirements.java
Show resolved
Hide resolved
RussellSpitzer
left a comment
There was a problem hiding this comment.
I could definitely be convinced otherwise, but it seems odd to me that we are carrying around two different sort objects in the writer. In my opinion, we should probably separate this so that at a certain point in the code we switch from an Iceberg ordering to a Spark Ordering and after that we no longer have any Iceberg objects in the code base. We would at that point just keep an integer with us to know what sort ordering we should be writing to the datafile.
I think @jbewing did it this way because Iceberg may request Spark to order by fields outside of just the table sort-order-id (for example, sort by row positions, etc.). We need the transform to convert them to Iceberg sort order id metadata. But I agree that carrying both the Spark and Iceberg sort orders is odd. Would it be too restrictive to put a constraint that we will only sort by table sort order id? Would it be simpler to always materialize the table sort order id in the metadata? (We don't do this validation currently. For example, we can pass any arbitrary field to RewriteDataFiles, and with this PR, it will be persisted in the metadata.) |
|
Addressed your feedback @RussellSpitzer. There are a few open discussion points, let me know your thoughts on how to proceed and I can steer things that way. |
This is why I suggest the other approach since it's more genericy. Instead of assuming what gets passed through is what is set, we can always just look at what Spark has chosen to use as it's distribution and ordering and figure out if that maps to a given sort Id. So the approach i'm describing is essentially "We look at what Spark actually ended up sorting by, match it against the table's known sort orders, and pass the matched ID to the file builder." While the current approach is "We decide the sort order ID up front when planning the write, then carry it alongside Spark's ordering all the way to the file builder." I did a full example here with some more tests - RussellSpitzer@c48c6a6 if you want to check it out I think we may have a larger disagreement here, and sorry for the amount of time it's taken me in between checks. Let me see if we can get @szehon-ho or @aokolnychyi to also take a look here. They are both very focused on the spark side and probably have some opinions on the right way to pass this through. |
|
No worries @RussellSpitzer. I missed some of your comments inadvertently so is absolutely goes both ways :sorry:!
So this is exactly what I did when I was first designing this PR. I really like the approach. On paper it's way simpler. Your cursor almost one shotted the fundamentals, it missed a few details I outlined in #15150 (comment). I agree, it's cleaner and it ought to work that way. That being said and not that it should matter in practice because per the spec position deletes don't have a defined sort order, is that in the scheme you outlined above, you cannot go backwards from the Spark order to the Iceberg order. Your implementation will return the table sort order for MoR position deletes for a sorted table (the spark ordering is just [POS_DEL]). A lot of this manifests from the fact that while a Spark ordering may exist when a table sort order isn't set in partitioned write scenarios. Typically, the iceberg sort order fields are a suffix of the Spark ordering, but in many cases the spark ordering isn't == iceberg ordering. I chose to make the relationship between Spark ordering & Iceberg ordering explicit in my PR to make it impossible for an implicit relationship between the two to inadvertently break things. Yes, it's slightly more verbose, but does reflect the reality that the physical ordering and logical ordering aren't always equal. The Spark ordering != Iceberg ordering was a big learning for me in the codebase—as it was one of my first modifications to iceberg when making this change—and I think the version I've proposed here makes things clearer for readers while being slightly less terse. All to say: I think both can work. It's a matter of personal preference. Is this blocking for you? |
|
Have you checked out RussellSpitzer/iceberg@c48c6a6? This is much closer to what you are doing just not passing through the actual ordering and also passes all the tests. I basically don't want to have to extend spark requirements, or have special requirements for rewrite files if we can help it. There are a few more tests there as well which we probably also should be adding to cover checking order assignment for MOR writes and Streaming writes. |
|
I think those edge cases apply to the inline code I posted in the comment (the orderingSatisfies approach), not the actual commit at RussellSpitzer/iceberg@c48c6a6. I abandoned that approach and Instead tried to boil it down to one method. outputSortOrderId(writeRequirements) just checks: did we ask Spark to sort? If hasOrdering() is true we know it came from the table sort order (that's what SparkWriteUtil builds), so we use table.sortOrder().orderId(). No suffix matching, no iterating table sort orders. For rewrites, the OUTPUT_SORT_ORDER_ID path is the same as yours. This avoids the class extension for SparkRequirements and removes the new requirements for the Sort Compaction case. |
|
Sorry for the delay here @RussellSpitzer . I've added some additional test coverage at your suggestion. I just solved some merge conflicts with master. And I've now gotten a chance to read your approach. The way I see it, your changeset has followed an extremely similar arc to mine. Started with trying to derive iceberg order from spark order. Then, moved to using the iceberg table ordering at write time (in combo w/ the various other misc options out there like explicitly setting a sort order for compactions/rewrites). They diverge by about ~60 LoC with the primary difference between them being: my changeset is a bit more explicit (e.g. uses stronger types & less implicit coupling between iceberg sort order & spark sort). And at this point, that's the total sum of the differences. You're a maintainer so you absolutely get the final call here, but as far as I see it they both have their advantages. I think my approach is probably slightly friendlier to non-long-term readers of the codebase, but it does also come w/ the downside of being slightly less performant in terms of serializing a slightly larger object to the executors. In practice, this has been running in production for us on larger tables (hundreds of TB) that get partially rewritten many many times per day and I've never noticed the increased overhead on a flamegraph, so I don't think that's a crazy concern. But yeah otherwise, I'm happy to change to yours if you think the 60 LoC difference / minute difference in approach makes a huge difference in readability. Having read both: we both converged on the same place for this changeset. They are functionally identical and just two different styles of expressing the same concept. |
|
I think we've tried to talk about this a bit before, but the main concern we have is changing existing classes and adding additional ones in order to essentially carry an integer through the pipeline. We are always attempting to make the least invasive changes possible and I think in this case we are modifying a class with a clear purpose (carry spark specific requirements which change how Spark performs the write) with an iceberg concern (how do we annotate the files produced). This is not a performance issue but an architectural one in the codebase. That's the kind of thing @aokolnychyi and I are mentioning. This is a pretty opinionated project on that sort of thing. I know this feels a little arbitrary but it is something we take pretty seriously. Ideally, we keep these concerns separate unless there is a really strong argument as to why they belong together. I'm glad to keep working with you on this PR but we need to pass the Iceberg information in its own class/pathway/container or just as an int. Anytime we have a "This and That" class it's going to be an issue. |
Yeah we have chatted a bit about it before ;). I'll point out that the main difference was really just whether it was passed through the pipeline as an integer or the SortOrder (iceberg) that it corresponded to. But I agree, no large difference and I moved over to your approach as you suggested over in 21c4cdd. The CI was just straight up broken after that hence the extra commits. I'll give pushing some additional extra comments a try to see if I can't unstick it and I'd appreciate. I apologize for the delay in getting back to you here and hope this is more of what you were looking for and I appreciate the stronger direction / opinion on where to go with the PR in terms of approach since it's harder for me to guess the more subtle architectural nuances of this project as I don't have as many changes under my belt as you. Let me know what you think about this change @RussellSpitzer . As I said before, CI is only failing as it was broken last time I was taking a look, so I'll be messing w/ pushing additional empty/amended comments to try and get a good trigger and hopefully avoid a few of the flakey tests I've noticed in other PRs... |
| * @return the matching {@link SortOrder} from the table (with the orderId set) or {@link | ||
| * SortOrder#unsorted()} if no match is found. | ||
| */ | ||
| public static SortOrder maybeFindTableSortOrder(Table table, SortOrder userSuppliedSortOrder) { |
There was a problem hiding this comment.
nit: findTableSortOrder? We tend not to use maybes in lookups methods.
| int id = Integer.parseInt(explicitId); | ||
| Preconditions.checkArgument( | ||
| table.sortOrders().containsKey(id), | ||
| "Output sort order id %s is not a valid sort order id for table", |
There was a problem hiding this comment.
nit: we do "Cannot x because y (optionally fix z)"
So "Cannot use output sort order id %d because the table does not contain a sort order with that id"
or something like that
| private static final Expression[] PARTITION_FILE_CLUSTERING = | ||
| clusterBy(SPEC_ID, PARTITION, FILE_PATH); | ||
|
|
||
| private static final SortOrder[] EMPTY_ORDERING = new SortOrder[0]; |
There was a problem hiding this comment.
I think we can revert these name changes now
spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
Show resolved
Hide resolved
spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
Show resolved
Hide resolved
| .sort(SortOrder.builderFor(table.schema()).asc("c3").build()) | ||
| .option(SizeBasedFileRewritePlanner.REWRITE_ALL, "true") | ||
| .option( | ||
| RewriteDataFiles.TARGET_FILE_SIZE_BYTES, |
There was a problem hiding this comment.
as a minor speedup to this test you don't need to mess with target file size. We do this in the other tests because we want to make sure there are multiple files, but in this test we only care about the sort order being set. So theoretically you could just make it a single file compaction and just check that it's properly labeled.
| } | ||
|
|
||
| @Test | ||
| public void testReturnsEmptyForFindingNonMatchingSortOrder() { |
There was a problem hiding this comment.
nit: testReturnsUnsortedForMissingSortOrder
Since it isn't empty, just unsorted.
|
Structurally I think this is all there, I think we have a few remaining test nits and some reversions of renames from previous prs and one extraction to a different PR but pretty close. @aokolnychyi Do you want to take a pass as well? |
| } | ||
|
|
||
| if (writeRequirements.hasOrdering()) { | ||
| return table.sortOrder().orderId(); |
There was a problem hiding this comment.
Why are we using the table's sort order and not the ordering from the writeRequirements?
There was a problem hiding this comment.
Why are we using the table's sort order and not the ordering from the writeRequirements?
There's lots of great candor outlining this above, but essentially iceberg sort order != spark sort order -> why this PR has a bit of complexity.
The logic performed above essentially is saying:
- if a sort order was explicitly pinned during a write w/ OUTPUT_SORT_ORDER (set by rewrite data files)
- else, if spark has a sort order set, it must correspond to the iceberg sort order
I do agree that it's a bit clever and I had originally opted for a bit more of an explicit approach as agreed upon with the maintainers above.
There was a problem hiding this comment.
Thanks for elaborating on the approach. It would be good to be as explicit as possible.
else, if spark has a sort order set, it must correspond to the iceberg sort order
I'm missing that part. We are making the assumption Iceberg's SortOrder matches Spark's SortOrder and just blindly returning the current table order. Would it make sense to build a Spark SortOrder from the Iceberg SortOrder and verify whether it matches the one from WriteRequirements?
There was a problem hiding this comment.
Does the thread in #15150 (comment) answer your question @mxm ?
There was a problem hiding this comment.
But essentially they often don't match in practice which is fine. There are various write configs + table configs that can change this. Additionally, using something like partitioning w/ ordering produces spark sort orders that have the partition keys & the prefix of the spark sort order followed by the actual iceberg sort order as the suffix
There was a problem hiding this comment.
And if you're interested in following things back in time, you can revisit the original PR that has a fair bit of commentary about this as well: #14683 (comment)
There was a problem hiding this comment.
Thanks! I'll do some more digging. It looks like Spark performs its own sort optimizations which may be different from the original table sort order.
| .mode(SaveMode.Append) | ||
| .save(location.toString()); | ||
|
|
||
| createBranch(table); |
There was a problem hiding this comment.
+1 We should not be branching here (and elsewhere) for no reason.
|
|
||
| if (sortOrderInJobSpec.isSorted() && maybeMatchingTableSortOrder.isUnsorted()) { | ||
| LOG.warn( | ||
| "Sort order specified for job {} doesn't match any table sort orders, so going to not mark rewritten files as sorted in the manifest files", |
There was a problem hiding this comment.
| "Sort order specified for job {} doesn't match any table sort orders, so going to not mark rewritten files as sorted in the manifest files", | |
| "Sort order specified for job {} doesn't match any table sort orders, rewritten files will not be marked as sorted in the manifest files", |
|
@mxm + @aokolnychyi Any blockers on this? I think we are good to go. We can clean up the testSpark classes in a followup , I did a quick check and I think we can remove like 100~ lines from that test class but I'd rather we do it all at once in a fresh pr. |
|
Sounds good. No blockers from my side. |
anuragmantri
left a comment
There was a problem hiding this comment.
I did another pass and the changes look good to me (apart from some nits and Russell's comments).
| } | ||
|
|
||
| public int outputSortOrderId(SparkWriteRequirements writeRequirements) { | ||
| String explicitId = |
There was a problem hiding this comment.
Nit: Can we do this in one statement just like outputSpecId() above?
Integer explicitId = confParser.intConf()
.option(SparkWriteOptions.OUTPUT_SORT_ORDER_ID)
.parseOptional();
if (explicitId != null) {
// Precondition
| Map<Integer, PartitionSpec> specs = table.specs(); | ||
| specs.forEach((specId, spec) -> specAsJsonMap.put(specId, PartitionSpecParser.toJson(spec))); | ||
| this.sortOrderAsJson = SortOrderParser.toJson(table.sortOrder()); | ||
| this.sortOrderAsJsonMap = Maps.newHashMap(); |
There was a problem hiding this comment.
Nit: Maybe use immutable map to prevent modification?
ImmutableMap.Builder<Integer, String> builder = ImmutableMap.builder();
table.sortOrders().forEach((id, order) -> builder.put(id, SortOrderParser.toJson(order)));
this.sortOrderAsJsonMap = builder.build();
There was a problem hiding this comment.
Not able to do it here (a kryo error get thrown as part of the serialization of this class then), but I believe I addressed your concern just the same with cc03355.
|
Talked to Anton who is on vacation (sorry for bothering you Anton) and he said he was good to go. |
|
Merged, Thanks @jbewing for your patience and hard work on this. Thanks @anuragmantri , @aokolnychyi and @mxm for your review. |
|
Thanks for all the help @RussellSpitzer, @anuragmantri , @aokolnychyi , @mxm ! See #15832 for the backport of these changes to Spark 4.0 & Spark 3.5 |
This PR updates all writes from Spark 4.0 & 3.5 to set the
sort_order_idof data files when applicable per the Iceberg Table Spec. I've opened this PR to:This is a successor to my initial PRs #13636 & #14683. The first of which has since been closed for being stale but I later revived for Spark 3.5 & 4.0 in #14683 at the request of @anuragmantri to complement a follow up change that we both converged on around using this optimization in conjunction w/ some Spark DSv2 APIs to report sort order when possible to optimize downstream query plans. After some discussion w/ @anuragmantr & @RussellSpitzer, I've forward ported the changes to spark 4.1 as the underlying "base" spark version has since changed. I've re-opened this PR as of late there has been some increased interest in this:
sort_order_idin manifest when writing/compacting data files #13636 (comment)So it appears that there is value to these changes being upstreamed instead of confined to a fork.
Testing
I've added tests for newer added utility functions and updated existing tests that write data files and compact data files in a sorted manner to verify that we're setting the
sort_order_identry in the manifests to the correct value. Additionally, I've used this patch on an internal fork and verified that it correctly sets this field during compaction and normal writes.Issue: #13634
cc @anuragmantri & @RussellSpitzer