Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,25 @@ public synchronized void testDeleteWithConcurrentTableRefresh() throws Exception
assertThat(executorService.awaitTermination(2, TimeUnit.MINUTES)).as("Timeout").isTrue();
}

@TestTemplate
public void testCopyOnWriteDeleteSetsSortOrderIdOnRewrittenDataFiles() {
createAndInitTable(
"id INT, dep STRING",
"PARTITIONED BY (dep)",
"{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");

sql("ALTER TABLE %s WRITE ORDERED BY id", tableName);

sql("DELETE FROM %s WHERE id = 1", commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
assertThat(snapshot.addedDataFiles(table.io()))
.extracting(DataFile::sortOrderId)
.as("Rewritten data files should carry the table sort order id")
.containsOnly(table.sortOrder().orderId());
}

@TestTemplate
public void testRuntimeFilteringWithPreservedDataGrouping() throws NoSuchTableException {
createAndInitPartitionedTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,34 @@ public synchronized void testMergeWithConcurrentTableRefresh() throws Exception
assertThat(executorService.awaitTermination(2, TimeUnit.MINUTES)).as("Timeout").isTrue();
}

@TestTemplate
public void testCopyOnWriteMergeSetsSortOrderIdOnRewrittenDataFiles() {
createAndInitTable("id INT, dep STRING");
sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);
sql("ALTER TABLE %s WRITE ORDERED BY id", tableName);

append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
createBranchIfNeeded();

createOrReplaceView("source", Collections.singletonList(1), Encoders.INT());

sql(
"MERGE INTO %s t USING source s "
+ "ON t.id == s.value "
+ "WHEN MATCHED THEN "
+ " UPDATE SET dep = 'changed' "
+ "WHEN NOT MATCHED THEN "
+ " INSERT (id, dep) VALUES (s.value, 'new')",
commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
assertThat(snapshot.addedDataFiles(table.io()))
.extracting(DataFile::sortOrderId)
.as("Rewritten data files should carry the table sort order id")
.containsOnly(table.sortOrder().orderId());
}

@TestTemplate
public void testRuntimeFilteringWithReportedPartitioning() {
createAndInitTable("id INT, dep STRING");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,25 @@ public synchronized void testUpdateWithConcurrentTableRefresh() throws Exception
assertThat(executorService.awaitTermination(2, TimeUnit.MINUTES)).as("Timeout").isTrue();
}

@TestTemplate
public void testCopyOnWriteUpdateSetsSortOrderIdOnRewrittenDataFiles() {
createAndInitTable(
"id INT, dep STRING",
"PARTITIONED BY (dep)",
"{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");

sql("ALTER TABLE %s WRITE ORDERED BY id", tableName);

sql("UPDATE %s SET dep = 'changed' WHERE id = 1", commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
assertThat(snapshot.addedDataFiles(table.io()))
.extracting(DataFile::sortOrderId)
.as("Rewritten data files should carry the table sort order id")
.containsOnly(table.sortOrder().orderId());
}

@TestTemplate
public void testRuntimeFilteringWithReportedPartitioning() {
createAndInitTable("id INT, dep STRING");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ParameterizedTestExtension;
Expand Down Expand Up @@ -136,6 +137,34 @@ public void testMergeWithDVAndHistoricalPositionDeletes() {
assertThat(dvs).allMatch(dv -> FileFormat.fromFileName(dv.location()) == FileFormat.PUFFIN);
}

@TestTemplate
public void testMergeOnReadMergeSetsSortOrderIdOnNewDataFiles() {
createAndInitTable(
"id INT, dep STRING",
"PARTITIONED BY (dep)",
"{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");

sql("ALTER TABLE %s WRITE ORDERED BY id", tableName);

createOrReplaceView("source", ImmutableList.of(1, 3), Encoders.INT());

sql(
"MERGE INTO %s AS t USING source AS s "
+ "ON t.id == s.value "
+ "WHEN MATCHED THEN "
+ " UPDATE SET id = id + 10 "
+ "WHEN NOT MATCHED THEN "
+ " INSERT (id, dep) VALUES (s.value, 'hr')",
commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
assertThat(snapshot.addedDataFiles(table.io()))
.extracting(DataFile::sortOrderId)
.as("All new data files should carry the table sort order id")
.containsOnly(table.sortOrder().orderId());
}

private void checkMergeDeleteGranularity(DeleteGranularity deleteGranularity) {
createTableWithDeleteGranularity(
"id INT, dep STRING", "PARTITIONED BY (dep)", deleteGranularity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ParameterizedTestExtension;
Expand Down Expand Up @@ -224,6 +225,25 @@ public void testUpdateWithDVAndHistoricalPositionDeletes() {
assertThat(dvs).allMatch(dv -> FileFormat.fromFileName(dv.location()) == FileFormat.PUFFIN);
}

@TestTemplate
public void testMergeOnReadUpdateSetsSortOrderIdOnNewDataFiles() {
createAndInitTable(
"id INT, dep STRING",
"PARTITIONED BY (dep)",
"{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");

sql("ALTER TABLE %s WRITE ORDERED BY id", tableName);

sql("UPDATE %s SET id = id + 10 WHERE id = 1", commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
assertThat(snapshot.addedDataFiles(table.io()))
.extracting(DataFile::sortOrderId)
.as("All new data files should carry the table sort order id")
.containsOnly(table.sortOrder().orderId());
}

private void initTable(String partitionedBy, DeleteGranularity deleteGranularity) {
createTableWithDeleteGranularity("id INT, dep STRING", partitionedBy, deleteGranularity);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.IsolationLevel;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableUtil;
Expand Down Expand Up @@ -162,6 +163,25 @@ public int outputSpecId() {
return outputSpecId;
}

public int outputSortOrderId(SparkWriteRequirements writeRequirements) {
Integer explicitId =
confParser.intConf().option(SparkWriteOptions.OUTPUT_SORT_ORDER_ID).parseOptional();

if (explicitId != null) {
Preconditions.checkArgument(
table.sortOrders().containsKey(explicitId),
"Cannot use output sort order id %s because the table does not contain a sort order with that id",
explicitId);
return explicitId;
}

if (writeRequirements.hasOrdering()) {
return table.sortOrder().orderId();
}

return SortOrder.unsorted().orderId();
}

public FileFormat dataFileFormat() {
String valueAsString =
confParser
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ private SparkWriteOptions() {}
public static final String REWRITTEN_FILE_SCAN_TASK_SET_ID = "rewritten-file-scan-task-set-id";

public static final String OUTPUT_SPEC_ID = "output-spec-id";
public static final String OUTPUT_SORT_ORDER_ID = "output-sort-order-id";

public static final String OVERWRITE_MODE = "overwrite-mode";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,14 @@
import org.apache.spark.sql.connector.expressions.SortOrder;
import org.apache.spark.sql.connector.write.RequiresDistributionAndOrdering;
import org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

abstract class SparkShufflingFileRewriteRunner extends SparkDataFileRewriteRunner {

private static final Logger LOG = LoggerFactory.getLogger(SparkShufflingFileRewriteRunner.class);

/**
* The number of shuffle partitions to use for each output file. By default, this file rewriter
* assumes each shuffle partition would become a separate output file. Attempting to generate
Expand Down Expand Up @@ -119,13 +123,25 @@ public void doRewrite(String groupId, RewriteFileGroup fileGroup) {
spec(fileGroup.outputSpecId()),
fileGroup.expectedOutputFiles()));

org.apache.iceberg.SortOrder sortOrderInJobSpec = sortOrder();

org.apache.iceberg.SortOrder maybeMatchingTableSortOrder =
SortOrderUtil.findTableSortOrder(table(), sortOrder());

if (sortOrderInJobSpec.isSorted() && maybeMatchingTableSortOrder.isUnsorted()) {
LOG.warn(
"Sort order specified for job {} doesn't match any table sort orders, rewritten files will not be marked as sorted in the manifest files",
Spark3Util.describe(sortOrderInJobSpec));
}

sortedDF
.write()
.format("iceberg")
.option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId)
.option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, fileGroup.maxOutputFileSize())
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
.option(SparkWriteOptions.OUTPUT_SPEC_ID, fileGroup.outputSpecId())
.option(SparkWriteOptions.OUTPUT_SORT_ORDER_ID, maybeMatchingTableSortOrder.orderId())
.mode("append")
.save(groupId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
private final String branch;
private final Map<String, String> extraSnapshotMetadata;
private final SparkWriteRequirements writeRequirements;
private final int sortOrderId;
private final Context context;
private final Map<String, String> writeProperties;

Expand All @@ -135,6 +136,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
this.branch = writeConf.branch();
this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
this.writeRequirements = writeConf.positionDeltaRequirements(command);
this.sortOrderId = writeConf.outputSortOrderId(writeRequirements);
this.context = new Context(dataSchema, writeConf, info, writeRequirements);
this.writeProperties = writeConf.writeProperties();
}
Expand Down Expand Up @@ -180,7 +182,8 @@ public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
broadcastRewritableDeletes(),
command,
context,
writeProperties);
writeProperties,
sortOrderId);
}

private Broadcast<Map<String, DeleteFileSet>> broadcastRewritableDeletes() {
Expand Down Expand Up @@ -390,18 +393,21 @@ private static class PositionDeltaWriteFactory implements DeltaWriterFactory {
private final Command command;
private final Context context;
private final Map<String, String> writeProperties;
private final int sortOrderId;

PositionDeltaWriteFactory(
Broadcast<Table> tableBroadcast,
Broadcast<Map<String, DeleteFileSet>> rewritableDeletesBroadcast,
Command command,
Context context,
Map<String, String> writeProperties) {
Map<String, String> writeProperties,
int sortOrderId) {
this.tableBroadcast = tableBroadcast;
this.rewritableDeletesBroadcast = rewritableDeletesBroadcast;
this.command = command;
this.context = context;
this.writeProperties = writeProperties;
this.sortOrderId = sortOrderId;
}

@Override
Expand All @@ -428,6 +434,7 @@ public DeltaWriter<InternalRow> createWriter(int partitionId, long taskId) {
.deleteFileFormat(context.deleteFileFormat())
.positionDeleteSparkType(context.deleteSparkType())
.writeProperties(writeProperties)
.dataSortOrder(table.sortOrders().get(sortOrderId))
.build();

if (command == DELETE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ private WriterFactory createWriterFactory() {
// broadcast the table metadata as the writer factory will be sent to executors
Broadcast<Table> tableBroadcast =
sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
int sortOrderId = writeConf.outputSortOrderId(writeRequirements);
return new WriterFactory(
tableBroadcast,
queryId,
Expand All @@ -202,7 +203,8 @@ private WriterFactory createWriterFactory() {
writeSchema,
dsSchema,
useFanoutWriter,
writeProperties);
writeProperties,
sortOrderId);
}

private void commitOperation(SnapshotUpdate<?> operation, String description) {
Expand Down Expand Up @@ -672,6 +674,7 @@ private static class WriterFactory implements DataWriterFactory, StreamingDataWr
private final boolean useFanoutWriter;
private final String queryId;
private final Map<String, String> writeProperties;
private final int sortOrderId;

protected WriterFactory(
Broadcast<Table> tableBroadcast,
Expand All @@ -682,7 +685,8 @@ protected WriterFactory(
Schema writeSchema,
StructType dsSchema,
boolean useFanoutWriter,
Map<String, String> writeProperties) {
Map<String, String> writeProperties,
int sortOrderId) {
this.tableBroadcast = tableBroadcast;
this.format = format;
this.outputSpecId = outputSpecId;
Expand All @@ -692,6 +696,7 @@ protected WriterFactory(
this.useFanoutWriter = useFanoutWriter;
this.queryId = queryId;
this.writeProperties = writeProperties;
this.sortOrderId = sortOrderId;
}

@Override
Expand All @@ -716,6 +721,7 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long e
.dataSchema(writeSchema)
.dataSparkType(dsSchema)
.writeProperties(writeProperties)
.dataSortOrder(table.sortOrders().get(sortOrderId))
.build();

if (spec.isUnpartitioned()) {
Expand Down
Loading
Loading