Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion core/src/main/java/org/apache/iceberg/SerializableTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.SerializableMap;

Expand Down Expand Up @@ -57,6 +58,7 @@ public class SerializableTable implements Table, HasTableOperations, Serializabl
private final int defaultSpecId;
private final Map<Integer, String> specAsJsonMap;
private final String sortOrderAsJson;
private final Map<Integer, String> sortOrderAsJsonMap;
private final FileIO io;
private final EncryptionManager encryption;
private final Map<String, SnapshotRef> refs;
Expand All @@ -68,6 +70,7 @@ public class SerializableTable implements Table, HasTableOperations, Serializabl
private transient volatile Schema lazySchema = null;
private transient volatile Map<Integer, PartitionSpec> lazySpecs = null;
private transient volatile SortOrder lazySortOrder = null;
private transient volatile Map<Integer, SortOrder> lazySortOrders = null;

protected SerializableTable(Table table) {
this.name = table.name();
Expand All @@ -80,6 +83,10 @@ protected SerializableTable(Table table) {
Map<Integer, PartitionSpec> specs = table.specs();
specs.forEach((specId, spec) -> specAsJsonMap.put(specId, PartitionSpecParser.toJson(spec)));
this.sortOrderAsJson = SortOrderParser.toJson(table.sortOrder());
this.sortOrderAsJsonMap = Maps.newHashMap();
table
.sortOrders()
.forEach((id, order) -> sortOrderAsJsonMap.put(id, SortOrderParser.toJson(order)));
this.io = table.io();
this.encryption = table.encryption();
this.locationProviderTry = Try.of(table::locationProvider);
Expand Down Expand Up @@ -240,7 +247,21 @@ public SortOrder sortOrder() {

@Override
public Map<Integer, SortOrder> sortOrders() {
return lazyTable().sortOrders();
if (lazySortOrders == null) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to me as a good fix for now. We already do this for a bunch of other things like (specs) and it should be relatively lightweight.

I'd ideally also still like to just get the ability to pass the ID directly into the factory but that could be a follow up issue someone can work on.

synchronized (this) {
if (lazySortOrders == null && lazyTable == null) {
ImmutableMap.Builder<Integer, SortOrder> sortOrders =
ImmutableMap.builderWithExpectedSize(sortOrderAsJsonMap.size());
sortOrderAsJsonMap.forEach(
(id, json) -> sortOrders.put(id, SortOrderParser.fromJson(schema(), json)));
this.lazySortOrders = sortOrders.build();
} else if (lazySortOrders == null) {
this.lazySortOrders = lazyTable.sortOrders();
}
}
}

return lazySortOrders;
}

@Override
Expand Down
17 changes: 17 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/SortOrderUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,23 @@ public static SortOrder buildSortOrder(Table table, SortOrder sortOrder) {
return buildSortOrder(table.schema(), table.spec(), sortOrder);
}

/**
* Attempts to match a user-supplied {@link SortOrder} with an equivalent sort order from a {@link
* Table}.
*
* @param table the table to try and match the sort order against
* @param userSuppliedSortOrder the user supplied sort order to try and match with a table sort
* order
* @return the matching {@link SortOrder} from the table (with the orderId set) or {@link
* SortOrder#unsorted()} if no match is found.
*/
public static SortOrder findTableSortOrder(Table table, SortOrder userSuppliedSortOrder) {
return table.sortOrders().values().stream()
.filter(sortOrder -> sortOrder.sameOrder(userSuppliedSortOrder))
.findFirst()
.orElseGet(SortOrder::unsorted);
}

/**
* Build a final sort order that satisfies the clustering required by the partition spec.
*
Expand Down
64 changes: 64 additions & 0 deletions core/src/test/java/org/apache/iceberg/util/TestSortOrderUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -287,4 +287,68 @@ public void testSortOrderClusteringWithRedundantPartitionFieldsMissing() {
.as("Should add spec fields as prefix")
.isEqualTo(expected);
}

@Test
public void testFindSortOrderForTable() {
PartitionSpec spec = PartitionSpec.unpartitioned();
SortOrder order = SortOrder.builderFor(SCHEMA).withOrderId(1).asc("id", NULLS_LAST).build();
TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, spec, order, 2);

SortOrder tableSortOrder = table.sortOrder();

SortOrder actualOrder = SortOrderUtil.findTableSortOrder(table, tableSortOrder);

assertThat(actualOrder).as("Should find current table sort order").isEqualTo(table.sortOrder());
}

@Test
public void testFindSortOrderForTableWithoutFieldId() {
PartitionSpec spec = PartitionSpec.unpartitioned();
SortOrder order = SortOrder.builderFor(SCHEMA).withOrderId(1).asc("id", NULLS_LAST).build();
TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, spec, order, 2);

SortOrder userSuppliedOrder =
SortOrder.builderFor(table.schema()).asc("id", NULLS_LAST).build();

SortOrder actualOrder = SortOrderUtil.findTableSortOrder(table, userSuppliedOrder);

assertThat(actualOrder).as("Should find current table sort order").isEqualTo(table.sortOrder());
}

@Test
public void testFindSortOrderForTableThatIsNotCurrentOrder() {
PartitionSpec spec = PartitionSpec.unpartitioned();
SortOrder order = SortOrder.builderFor(SCHEMA).withOrderId(1).asc("id", NULLS_LAST).build();
TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, spec, order, 2);

table.replaceSortOrder().asc("data").desc("ts").commit();

SortOrder userSuppliedOrder =
SortOrder.builderFor(table.schema()).asc("id", NULLS_LAST).build();

SortOrder actualOrder = SortOrderUtil.findTableSortOrder(table, userSuppliedOrder);

assertThat(actualOrder)
.as("Should find first sorted table sort order")
.isEqualTo(table.sortOrders().get(1));
}

@Test
public void testReturnsUnsortedForMissingSortOrder() {
PartitionSpec spec = PartitionSpec.unpartitioned();
SortOrder order = SortOrder.builderFor(SCHEMA).withOrderId(1).asc("id", NULLS_LAST).build();
TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, spec, order, 2);

table.replaceSortOrder().asc("data").desc("ts").commit();

SortOrder userSuppliedOrder =
SortOrder.builderFor(table.schema()).desc("id", NULLS_LAST).build();

SortOrder actualOrder = SortOrderUtil.findTableSortOrder(table, userSuppliedOrder);

assertThat(actualOrder)
.as(
"Should return unsorted order if user supplied order does not match any table sort order")
.isEqualTo(SortOrder.unsorted());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.ParameterizedTestExtension;
Expand Down Expand Up @@ -54,6 +55,25 @@ protected Map<String, String> extraTableProperties() {
TableProperties.DELETE_MODE, RowLevelOperationMode.COPY_ON_WRITE.modeName());
}

@TestTemplate
public void testCopyOnWriteDeleteSetsSortOrderIdOnRewrittenDataFiles() {
createAndInitTable(
"id INT, dep STRING",
"PARTITIONED BY (dep)",
"{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");

sql("ALTER TABLE %s WRITE ORDERED BY id", tableName);

sql("DELETE FROM %s WHERE id = 1", commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
assertThat(snapshot.addedDataFiles(table.io()))
.extracting(DataFile::sortOrderId)
.as("Rewritten data files should carry the table sort order id")
.containsOnly(table.sortOrder().orderId());
}

@TestTemplate
public void testRuntimeFilteringWithPreservedDataGrouping() throws NoSuchTableException {
createAndInitPartitionedTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.Collections;
import java.util.Map;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
Expand All @@ -45,6 +46,34 @@ protected Map<String, String> extraTableProperties() {
TableProperties.MERGE_MODE, RowLevelOperationMode.COPY_ON_WRITE.modeName());
}

@TestTemplate
public void testCopyOnWriteMergeSetsSortOrderIdOnRewrittenDataFiles() {
createAndInitTable("id INT, dep STRING");
sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);
sql("ALTER TABLE %s WRITE ORDERED BY id", tableName);

append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
createBranchIfNeeded();

createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());

sql(
"MERGE INTO %s t USING source s "
+ "ON t.id == s.value "
+ "WHEN MATCHED THEN "
+ " UPDATE SET dep = 'changed' "
+ "WHEN NOT MATCHED THEN "
+ " INSERT (id, dep) VALUES (s.value, 'new')",
commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
assertThat(snapshot.addedDataFiles(table.io()))
.extracting(DataFile::sortOrderId)
.as("Rewritten data files should carry the table sort order id")
.containsOnly(table.sortOrder().orderId());
}

@TestTemplate
public void testRuntimeFilteringWithReportedPartitioning() {
createAndInitTable("id INT, dep STRING");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.assertj.core.api.Assertions.assertThat;

import java.util.Map;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
Expand All @@ -43,6 +44,25 @@ protected Map<String, String> extraTableProperties() {
TableProperties.UPDATE_MODE, RowLevelOperationMode.COPY_ON_WRITE.modeName());
}

@TestTemplate
public void testCopyOnWriteUpdateSetsSortOrderIdOnRewrittenDataFiles() {
createAndInitTable(
"id INT, dep STRING",
"PARTITIONED BY (dep)",
"{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");

sql("ALTER TABLE %s WRITE ORDERED BY id", tableName);

sql("UPDATE %s SET dep = 'changed' WHERE id = 1", commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
assertThat(snapshot.addedDataFiles(table.io()))
.extracting(DataFile::sortOrderId)
.as("Rewritten data files should carry the table sort order id")
.containsOnly(table.sortOrder().orderId());
}

@TestTemplate
public void testRuntimeFilteringWithReportedPartitioning() {
createAndInitTable("id INT, dep STRING");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ParameterizedTestExtension;
Expand Down Expand Up @@ -136,6 +137,34 @@ public void testMergeWithDVAndHistoricalPositionDeletes() {
assertThat(dvs).allMatch(dv -> FileFormat.fromFileName(dv.location()) == FileFormat.PUFFIN);
}

@TestTemplate
public void testMergeOnReadMergeSetsSortOrderIdOnNewDataFiles() {
createAndInitTable(
"id INT, dep STRING",
"PARTITIONED BY (dep)",
"{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");

sql("ALTER TABLE %s WRITE ORDERED BY id", tableName);

createOrReplaceView("source", ImmutableList.of(1, 3), Encoders.INT());

sql(
"MERGE INTO %s AS t USING source AS s "
+ "ON t.id == s.value "
+ "WHEN MATCHED THEN "
+ " UPDATE SET id = id + 10 "
+ "WHEN NOT MATCHED THEN "
+ " INSERT (id, dep) VALUES (s.value, 'hr')",
commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
assertThat(snapshot.addedDataFiles(table.io()))
.extracting(DataFile::sortOrderId)
.as("All new data files should carry the table sort order id")
.containsOnly(table.sortOrder().orderId());
}

private void checkMergeDeleteGranularity(DeleteGranularity deleteGranularity) {
createTableWithDeleteGranularity(
"id INT, dep STRING", "PARTITIONED BY (dep)", deleteGranularity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ParameterizedTestExtension;
Expand Down Expand Up @@ -213,6 +214,25 @@ public void testUpdateWithDVAndHistoricalPositionDeletes() {
assertThat(dvs).allMatch(dv -> FileFormat.fromFileName(dv.location()) == FileFormat.PUFFIN);
}

@TestTemplate
public void testMergeOnReadUpdateSetsSortOrderIdOnNewDataFiles() {
createAndInitTable(
"id INT, dep STRING",
"PARTITIONED BY (dep)",
"{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");

sql("ALTER TABLE %s WRITE ORDERED BY id", tableName);

sql("UPDATE %s SET id = id + 10 WHERE id = 1", commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
assertThat(snapshot.addedDataFiles(table.io()))
.extracting(DataFile::sortOrderId)
.as("All new data files should carry the table sort order id")
.containsOnly(table.sortOrder().orderId());
}

private void initTable(String partitionedBy, DeleteGranularity deleteGranularity) {
createTableWithDeleteGranularity("id INT, dep STRING", partitionedBy, deleteGranularity);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.IsolationLevel;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableUtil;
Expand Down Expand Up @@ -171,6 +172,25 @@ public int outputSpecId() {
return outputSpecId;
}

public int outputSortOrderId(SparkWriteRequirements writeRequirements) {
Integer explicitId =
confParser.intConf().option(SparkWriteOptions.OUTPUT_SORT_ORDER_ID).parseOptional();

if (explicitId != null) {
Preconditions.checkArgument(
table.sortOrders().containsKey(explicitId),
"Cannot use output sort order id %s because the table does not contain a sort order with that id",
explicitId);
return explicitId;
}

if (writeRequirements.hasOrdering()) {
return table.sortOrder().orderId();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using the table's sort order and not the ordering from the writeRequirements?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using the table's sort order and not the ordering from the writeRequirements?

There's lots of great candor outlining this above, but essentially iceberg sort order != spark sort order -> why this PR has a bit of complexity.

The logic performed above essentially is saying:

  • if a sort order was explicitly pinned during a write w/ OUTPUT_SORT_ORDER (set by rewrite data files)
  • else, if spark has a sort order set, it must correspond to the iceberg sort order

I do agree that it's a bit clever and I had originally opted for a bit more of an explicit approach as agreed upon with the maintainers above.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for elaborating on the approach. It would be good to be as explicit as possible.

else, if spark has a sort order set, it must correspond to the iceberg sort order

I'm missing that part. We are making the assumption Iceberg's SortOrder matches Spark's SortOrder and just blindly returning the current table order. Would it make sense to build a Spark SortOrder from the Iceberg SortOrder and verify whether it matches the one from WriteRequirements?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the thread in #15150 (comment) answer your question @mxm ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But essentially they often don't match in practice which is fine. There are various write configs + table configs that can change this. Additionally, using something like partitioning w/ ordering produces spark sort orders that have the partition keys & the prefix of the spark sort order followed by the actual iceberg sort order as the suffix

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if you're interested in following things back in time, you can revisit the original PR that has a fair bit of commentary about this as well: #14683 (comment)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll do some more digging. It looks like Spark performs its own sort optimizations which may be different from the original table sort order.

}

return SortOrder.unsorted().orderId();
}

public FileFormat dataFileFormat() {
String valueAsString =
confParser
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ private SparkWriteOptions() {}
public static final String CHECK_ORDERING = "check-ordering";

public static final String OUTPUT_SPEC_ID = "output-spec-id";
public static final String OUTPUT_SORT_ORDER_ID = "output-sort-order-id";
Comment thread
RussellSpitzer marked this conversation as resolved.

public static final String OVERWRITE_MODE = "overwrite-mode";

Expand Down
Loading
Loading