diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/RollbackStagedTable.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/RollbackStagedTable.java index f1709277525a..bf7b0525c30c 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/RollbackStagedTable.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/RollbackStagedTable.java @@ -22,6 +22,7 @@ import java.util.Set; import java.util.function.Consumer; import java.util.function.Function; +import org.apache.spark.sql.connector.catalog.Column; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.StagedTable; import org.apache.spark.sql.connector.catalog.SupportsDeleteV2; @@ -85,11 +86,20 @@ public String name() { return table.name(); } + /** + * @deprecated since 1.12.0, use {@link #columns()} instead + */ + @Deprecated @Override public StructType schema() { return table.schema(); } + @Override + public Column[] columns() { + return table.columns(); + } + @Override public Transform[] partitioning() { return table.partitioning(); diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java index 064e4f7d6dc7..417f165ba8a7 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java @@ -74,10 +74,13 @@ import org.apache.spark.sql.connector.catalog.CatalogManager; import org.apache.spark.sql.connector.catalog.CatalogPlugin; import org.apache.spark.sql.connector.catalog.CatalogV2Implicits; +import org.apache.spark.sql.connector.catalog.CatalogV2Util; +import org.apache.spark.sql.connector.catalog.Column; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.connector.catalog.TableChange; +import org.apache.spark.sql.connector.catalog.TableInfo; import org.apache.spark.sql.connector.expressions.Expression; import org.apache.spark.sql.connector.expressions.Expressions; import org.apache.spark.sql.connector.expressions.Literal; @@ -109,6 +112,20 @@ public class Spark3Util { private Spark3Util() {} + static TableInfo tableInfo( + StructType schema, Transform[] transforms, Map properties) { + return tableInfo(CatalogV2Util.structTypeToV2Columns(schema), transforms, properties); + } + + public static TableInfo tableInfo( + Column[] columns, Transform[] transforms, Map properties) { + return new TableInfo.Builder() + .withColumns(columns) + .withPartitions(transforms) + .withProperties(properties) + .build(); + } + public static Map rebuildCreateProperties(Map createProperties) { ImmutableMap.Builder tableProperties = ImmutableMap.builder(); createProperties.entrySet().stream() diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 12eefe697348..0146df04b2c2 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -79,6 +79,7 @@ import org.apache.spark.sql.connector.catalog.TableChange.ColumnChange; import org.apache.spark.sql.connector.catalog.TableChange.RemoveProperty; import org.apache.spark.sql.connector.catalog.TableChange.SetProperty; +import org.apache.spark.sql.connector.catalog.TableInfo; import org.apache.spark.sql.connector.catalog.View; import org.apache.spark.sql.connector.catalog.ViewChange; import org.apache.spark.sql.connector.catalog.ViewInfo; @@ -181,18 +182,28 @@ public boolean tableExists(Identifier ident) { } } + /** + * @deprecated since 1.12.0, use {@link #createTable(Identifier, TableInfo)} instead. + */ + @Deprecated @Override public Table createTable( Identifier ident, StructType schema, Transform[] transforms, Map properties) throws TableAlreadyExistsException { - Schema icebergSchema = SparkSchemaUtil.convert(schema); + return createTable(ident, Spark3Util.tableInfo(schema, transforms, properties)); + } + + @Override + public Table createTable(Identifier ident, TableInfo tableInfo) + throws TableAlreadyExistsException { + Schema icebergSchema = SparkSchemaUtil.convert(tableInfo.schema()); try { Catalog.TableBuilder builder = newBuilder(ident, icebergSchema); org.apache.iceberg.Table icebergTable = builder - .withPartitionSpec(Spark3Util.toPartitionSpec(icebergSchema, transforms)) - .withLocation(properties.get("location")) - .withProperties(Spark3Util.rebuildCreateProperties(properties)) + .withPartitionSpec(Spark3Util.toPartitionSpec(icebergSchema, tableInfo.partitions())) + .withLocation(tableInfo.properties().get("location")) + .withProperties(Spark3Util.rebuildCreateProperties(tableInfo.properties())) .create(); return new SparkTable(icebergTable); } catch (AlreadyExistsException e) { @@ -200,18 +211,28 @@ public Table createTable( } } + /** + * @deprecated since 1.12.0, use {@link #stageCreate(Identifier, TableInfo)} instead. + */ + @Deprecated @Override public StagedTable stageCreate( Identifier ident, StructType schema, Transform[] transforms, Map properties) throws TableAlreadyExistsException { - Schema icebergSchema = SparkSchemaUtil.convert(schema); + return stageCreate(ident, Spark3Util.tableInfo(schema, transforms, properties)); + } + + @Override + public StagedTable stageCreate(Identifier ident, TableInfo tableInfo) + throws TableAlreadyExistsException { + Schema icebergSchema = SparkSchemaUtil.convert(tableInfo.schema()); try { Catalog.TableBuilder builder = newBuilder(ident, icebergSchema); Transaction transaction = builder - .withPartitionSpec(Spark3Util.toPartitionSpec(icebergSchema, transforms)) - .withLocation(properties.get("location")) - .withProperties(Spark3Util.rebuildCreateProperties(properties)) + .withPartitionSpec(Spark3Util.toPartitionSpec(icebergSchema, tableInfo.partitions())) + .withLocation(tableInfo.properties().get("location")) + .withProperties(Spark3Util.rebuildCreateProperties(tableInfo.properties())) .createTransaction(); return new StagedSparkTable(transaction); } catch (AlreadyExistsException e) { @@ -219,18 +240,28 @@ public StagedTable stageCreate( } } + /** + * @deprecated since 1.12.0, use {@link #stageReplace(Identifier, TableInfo)} instead. + */ + @Deprecated @Override public StagedTable stageReplace( Identifier ident, StructType schema, Transform[] transforms, Map properties) throws NoSuchTableException { - Schema icebergSchema = SparkSchemaUtil.convert(schema); + return stageReplace(ident, Spark3Util.tableInfo(schema, transforms, properties)); + } + + @Override + public StagedTable stageReplace(Identifier ident, TableInfo tableInfo) + throws NoSuchTableException { + Schema icebergSchema = SparkSchemaUtil.convert(tableInfo.schema()); try { Catalog.TableBuilder builder = newBuilder(ident, icebergSchema); Transaction transaction = builder - .withPartitionSpec(Spark3Util.toPartitionSpec(icebergSchema, transforms)) - .withLocation(properties.get("location")) - .withProperties(Spark3Util.rebuildCreateProperties(properties)) + .withPartitionSpec(Spark3Util.toPartitionSpec(icebergSchema, tableInfo.partitions())) + .withLocation(tableInfo.properties().get("location")) + .withProperties(Spark3Util.rebuildCreateProperties(tableInfo.properties())) .replaceTransaction(); return new StagedSparkTable(transaction); } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { @@ -238,16 +269,25 @@ public StagedTable stageReplace( } } + /** + * @deprecated since 1.12.0, use {@link #stageCreateOrReplace(Identifier, TableInfo)} instead. + */ + @Deprecated @Override public StagedTable stageCreateOrReplace( Identifier ident, StructType schema, Transform[] transforms, Map properties) { - Schema icebergSchema = SparkSchemaUtil.convert(schema); + return stageCreateOrReplace(ident, Spark3Util.tableInfo(schema, transforms, properties)); + } + + @Override + public StagedTable stageCreateOrReplace(Identifier ident, TableInfo tableInfo) { + Schema icebergSchema = SparkSchemaUtil.convert(tableInfo.schema()); Catalog.TableBuilder builder = newBuilder(ident, icebergSchema); Transaction transaction = builder - .withPartitionSpec(Spark3Util.toPartitionSpec(icebergSchema, transforms)) - .withLocation(properties.get("location")) - .withProperties(Spark3Util.rebuildCreateProperties(properties)) + .withPartitionSpec(Spark3Util.toPartitionSpec(icebergSchema, tableInfo.partitions())) + .withLocation(tableInfo.properties().get("location")) + .withProperties(Spark3Util.rebuildCreateProperties(tableInfo.properties())) .createOrReplaceTransaction(); return new StagedSparkTable(transaction); } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkRewriteTableCatalog.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkRewriteTableCatalog.java index a1016beb1886..8e6039b758a3 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkRewriteTableCatalog.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkRewriteTableCatalog.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.spark; -import java.util.Map; import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.spark.source.SparkRewriteTable; @@ -28,8 +27,7 @@ import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.connector.catalog.TableChange; -import org.apache.spark.sql.connector.expressions.Transform; -import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.connector.catalog.TableInfo; import org.apache.spark.sql.util.CaseInsensitiveStringMap; public class SparkRewriteTableCatalog implements TableCatalog, SupportsFunctions { @@ -74,8 +72,7 @@ public void invalidateTable(Identifier ident) { } @Override - public SparkTable createTable( - Identifier ident, StructType schema, Transform[] partitions, Map properties) + public SparkTable createTable(Identifier ident, TableInfo tableInfo) throws TableAlreadyExistsException { throw new UnsupportedOperationException(CLASS_NAME + " does not support creating tables"); } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java index 1169318e9406..ec37435ba119 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java @@ -46,6 +46,7 @@ import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.connector.catalog.TableChange; +import org.apache.spark.sql.connector.catalog.TableInfo; import org.apache.spark.sql.connector.catalog.View; import org.apache.spark.sql.connector.catalog.ViewCatalog; import org.apache.spark.sql.connector.catalog.ViewChange; @@ -181,28 +182,48 @@ public boolean tableExists(Identifier ident) { return icebergCatalog.tableExists(ident) || getSessionCatalog().tableExists(ident); } + /** + * @deprecated since 1.12.0, use {@link #createTable(Identifier, TableInfo)} instead. + */ + @Deprecated @Override public Table createTable( Identifier ident, StructType schema, Transform[] partitions, Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException { - String provider = properties.get("provider"); + return createTable(ident, Spark3Util.tableInfo(schema, partitions, properties)); + } + + @Override + public Table createTable(Identifier ident, TableInfo tableInfo) + throws TableAlreadyExistsException, NoSuchNamespaceException { + String provider = tableInfo.properties().get("provider"); if (useIceberg(provider)) { - return icebergCatalog.createTable(ident, schema, partitions, properties); + return icebergCatalog.createTable(ident, tableInfo); } else { // delegate to the session catalog - return getSessionCatalog().createTable(ident, schema, partitions, properties); + return getSessionCatalog().createTable(ident, tableInfo); } } + /** + * @deprecated since 1.12.0, use {@link #stageCreate(Identifier, TableInfo)} instead. + */ + @Deprecated @Override public StagedTable stageCreate( Identifier ident, StructType schema, Transform[] partitions, Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException { - String provider = properties.get("provider"); + return stageCreate(ident, Spark3Util.tableInfo(schema, partitions, properties)); + } + + @Override + public StagedTable stageCreate(Identifier ident, TableInfo tableInfo) + throws TableAlreadyExistsException, NoSuchNamespaceException { + String provider = tableInfo.properties().get("provider"); TableCatalog catalog; if (useIceberg(provider)) { if (asStagingCatalog != null) { - return asStagingCatalog.stageCreate(ident, schema, partitions, properties); + return asStagingCatalog.stageCreate(ident, tableInfo); } catalog = icebergCatalog; } else { @@ -211,19 +232,29 @@ public StagedTable stageCreate( // create the table with the session catalog, then wrap it in a staged table that will delete to // roll back - Table table = catalog.createTable(ident, schema, partitions, properties); + Table table = catalog.createTable(ident, tableInfo); return new RollbackStagedTable(catalog, ident, table); } + /** + * @deprecated since 1.12.0, use {@link #stageReplace(Identifier, TableInfo)} instead. + */ + @Deprecated @Override public StagedTable stageReplace( Identifier ident, StructType schema, Transform[] partitions, Map properties) throws NoSuchNamespaceException, NoSuchTableException { - String provider = properties.get("provider"); + return stageReplace(ident, Spark3Util.tableInfo(schema, partitions, properties)); + } + + @Override + public StagedTable stageReplace(Identifier ident, TableInfo tableInfo) + throws NoSuchNamespaceException, NoSuchTableException { + String provider = tableInfo.properties().get("provider"); TableCatalog catalog; if (useIceberg(provider)) { if (asStagingCatalog != null) { - return asStagingCatalog.stageReplace(ident, schema, partitions, properties); + return asStagingCatalog.stageReplace(ident, tableInfo); } catalog = icebergCatalog; } else { @@ -238,24 +269,34 @@ public StagedTable stageReplace( try { // create the table with the session catalog, then wrap it in a staged table that will delete // to roll back - Table table = catalog.createTable(ident, schema, partitions, properties); + Table table = catalog.createTable(ident, tableInfo); return new RollbackStagedTable(catalog, ident, table); } catch (TableAlreadyExistsException e) { // the table was deleted, but now already exists again. retry the replace. - return stageReplace(ident, schema, partitions, properties); + return stageReplace(ident, tableInfo); } } + /** + * @deprecated since 1.12.0, use {@link #stageCreateOrReplace(Identifier, TableInfo)} instead. + */ + @Deprecated @Override public StagedTable stageCreateOrReplace( Identifier ident, StructType schema, Transform[] partitions, Map properties) throws NoSuchNamespaceException { - String provider = properties.get("provider"); + return stageCreateOrReplace(ident, Spark3Util.tableInfo(schema, partitions, properties)); + } + + @Override + public StagedTable stageCreateOrReplace(Identifier ident, TableInfo tableInfo) + throws NoSuchNamespaceException { + String provider = tableInfo.properties().get("provider"); TableCatalog catalog; if (useIceberg(provider)) { if (asStagingCatalog != null) { - return asStagingCatalog.stageCreateOrReplace(ident, schema, partitions, properties); + return asStagingCatalog.stageCreateOrReplace(ident, tableInfo); } catalog = icebergCatalog; } else { @@ -268,12 +309,12 @@ public StagedTable stageCreateOrReplace( try { // create the table with the session catalog, then wrap it in a staged table that will delete // to roll back - Table sessionCatalogTable = catalog.createTable(ident, schema, partitions, properties); + Table sessionCatalogTable = catalog.createTable(ident, tableInfo); return new RollbackStagedTable(catalog, ident, sessionCatalogTable); } catch (TableAlreadyExistsException e) { // the table was deleted, but now already exists again. retry the replace. - return stageCreateOrReplace(ident, schema, partitions, properties); + return stageCreateOrReplace(ident, tableInfo); } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java index 37e1ec4ce788..d37d213d12bf 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java @@ -34,6 +34,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkCatalog; import org.apache.iceberg.spark.SparkSessionCatalog; import org.apache.iceberg.spark.source.StagedSparkTable; @@ -46,8 +47,6 @@ import org.apache.spark.sql.connector.catalog.StagingTableCatalog; import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.connector.catalog.V1Table; -import org.apache.spark.sql.connector.expressions.Transform; -import org.apache.spark.sql.types.StructType; abstract class BaseTableCreationSparkAction extends BaseSparkAction { private static final Set ALLOWED_SOURCES = @@ -156,10 +155,11 @@ protected StagingTableCatalog checkDestinationCatalog(CatalogPlugin catalog) { protected StagedSparkTable stageDestTable() { try { Map props = destTableProps(); - StructType schema = sourceTable.schema(); - Transform[] partitioning = sourceTable.partitioning(); return (StagedSparkTable) - destCatalog().stageCreate(destTableIdent(), schema, partitioning, props); + destCatalog() + .stageCreate( + destTableIdent(), + Spark3Util.tableInfo(sourceTable.columns(), sourceTable.partitioning(), props)); } catch (org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException e) { throw new NoSuchNamespaceException( "Cannot create table %s as the namespace does not exist", destTableIdent()); diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkTable.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkTable.java index a5d9293a9fc1..5c08c00a792e 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkTable.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkTable.java @@ -38,6 +38,8 @@ import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.CatalogV2Util; +import org.apache.spark.sql.connector.catalog.Column; import org.apache.spark.sql.connector.catalog.MetadataColumn; import org.apache.spark.sql.connector.catalog.SupportsMetadataColumns; import org.apache.spark.sql.connector.expressions.Transform; @@ -66,6 +68,7 @@ abstract class BaseSparkTable private SparkSession lazySpark = null; private StructType lazySparkSchema = null; + private Column[] lazySparkColumns = null; protected BaseSparkTable(Table table, Schema schema) { this.table = table; @@ -88,11 +91,29 @@ public String name() { return table.toString(); } + /** + * @deprecated since 1.12.0, use {@link #columns()} instead + */ + @Deprecated @Override public StructType schema() { + return sparkSchema(); + } + + @Override + public Column[] columns() { + if (lazySparkColumns == null) { + this.lazySparkColumns = CatalogV2Util.structTypeToV2Columns(sparkSchema()); + } + + return lazySparkColumns; + } + + private StructType sparkSchema() { if (lazySparkSchema == null) { this.lazySparkSchema = SparkSchemaUtil.convert(schema); } + return lazySparkSchema; } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogTable.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogTable.java index bdafca27fbb8..821deea2619a 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogTable.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogTable.java @@ -25,6 +25,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.CatalogV2Util; +import org.apache.spark.sql.connector.catalog.Column; import org.apache.spark.sql.connector.catalog.MetadataColumn; import org.apache.spark.sql.connector.catalog.SupportsMetadataColumns; import org.apache.spark.sql.connector.catalog.SupportsRead; @@ -46,6 +48,7 @@ public class SparkChangelogTable private SparkSession lazySpark = null; private StructType lazySparkSchema = null; + private Column[] lazySparkColumns = null; public SparkChangelogTable(Table table) { this.table = table; @@ -57,8 +60,25 @@ public String name() { return table.name() + "." + TABLE_NAME; } + /** + * @deprecated since 1.12.0, use {@link #columns()} instead + */ + @Deprecated @Override public StructType schema() { + return sparkSchema(); + } + + @Override + public Column[] columns() { + if (lazySparkColumns == null) { + this.lazySparkColumns = CatalogV2Util.structTypeToV2Columns(sparkSchema()); + } + + return lazySparkColumns; + } + + private StructType sparkSchema() { if (lazySparkSchema == null) { this.lazySparkSchema = SparkSchemaUtil.convert(schema); }