diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index b73af5e61a43..7ab7bcd9a9c6 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 1 + "modification": 2 } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java index dc8106321db7..7e0eeecd9e5a 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java @@ -31,6 +31,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -97,7 +98,6 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchTableException; @@ -532,17 +532,23 @@ private Table getOrCreateTable(String filePath, FileFormat format) throws IOExce org.apache.iceberg.Schema schema = getSchema(filePath, format); PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields, schema); SortOrder sortOrder = SortOrderUtils.toSortOrder(sortFields, schema); - - Catalog.TableBuilder builder = - catalogConfig - .catalog() - .buildTable(tableId, schema) - .withPartitionSpec(spec) - .withSortOrder(sortOrder); - if (tableProps != null) { - builder.withProperties(tableProps); + Map properties = + tableProps != null ? new HashMap<>(tableProps) : new HashMap<>(); + if (properties.get(TableProperties.DEFAULT_NAME_MAPPING) == null) { + // Forces Name based resolution instead of position based resolution + NameMapping mapping = MappingUtil.create(schema); + String mappingJson = NameMappingParser.toJson(mapping); + properties.put(TableProperties.DEFAULT_NAME_MAPPING, mappingJson); } - return builder.create(); + + return catalogConfig + .catalog() + .buildTable(tableId, schema) + .withPartitionSpec(spec) + .withSortOrder(sortOrder) + .withProperties(properties) + .create(); + } catch (AlreadyExistsException e2) { // if table already exists, just load it return catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier)); }