Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.spark.sql.connector.catalog.Column;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.StagedTable;
import org.apache.spark.sql.connector.catalog.SupportsDeleteV2;
Expand Down Expand Up @@ -85,11 +86,20 @@ public String name() {
return table.name();
}

/**
* @deprecated since 1.12.0, use {@link #columns()} instead
*/
@Deprecated
@Override
public StructType schema() {
return table.schema();
}

@Override
public Column[] columns() {
return table.columns();
}

@Override
public Transform[] partitioning() {
return table.partitioning();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,13 @@
import org.apache.spark.sql.connector.catalog.CatalogManager;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits;
import org.apache.spark.sql.connector.catalog.CatalogV2Util;
import org.apache.spark.sql.connector.catalog.Column;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.catalog.TableChange;
import org.apache.spark.sql.connector.catalog.TableInfo;
import org.apache.spark.sql.connector.expressions.Expression;
import org.apache.spark.sql.connector.expressions.Expressions;
import org.apache.spark.sql.connector.expressions.Literal;
Expand Down Expand Up @@ -109,6 +112,20 @@ public class Spark3Util {

private Spark3Util() {}

static TableInfo tableInfo(
StructType schema, Transform[] transforms, Map<String, String> properties) {
return tableInfo(CatalogV2Util.structTypeToV2Columns(schema), transforms, properties);
}

public static TableInfo tableInfo(
Column[] columns, Transform[] transforms, Map<String, String> properties) {
return new TableInfo.Builder()
.withColumns(columns)
.withPartitions(transforms)
.withProperties(properties)
.build();
}

public static Map<String, String> rebuildCreateProperties(Map<String, String> createProperties) {
ImmutableMap.Builder<String, String> tableProperties = ImmutableMap.builder();
createProperties.entrySet().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.spark.sql.connector.catalog.TableChange.ColumnChange;
import org.apache.spark.sql.connector.catalog.TableChange.RemoveProperty;
import org.apache.spark.sql.connector.catalog.TableChange.SetProperty;
import org.apache.spark.sql.connector.catalog.TableInfo;
import org.apache.spark.sql.connector.catalog.View;
import org.apache.spark.sql.connector.catalog.ViewChange;
import org.apache.spark.sql.connector.catalog.ViewInfo;
Expand Down Expand Up @@ -181,73 +182,112 @@ public boolean tableExists(Identifier ident) {
}
}

/**
* @deprecated since 1.12.0, use {@link #createTable(Identifier, TableInfo)} instead.
*/
@Deprecated
@Override
public Table createTable(
Identifier ident, StructType schema, Transform[] transforms, Map<String, String> properties)
throws TableAlreadyExistsException {
Schema icebergSchema = SparkSchemaUtil.convert(schema);
return createTable(ident, Spark3Util.tableInfo(schema, transforms, properties));
}

@Override
public Table createTable(Identifier ident, TableInfo tableInfo)
throws TableAlreadyExistsException {
Schema icebergSchema = SparkSchemaUtil.convert(tableInfo.schema());
try {
Catalog.TableBuilder builder = newBuilder(ident, icebergSchema);
org.apache.iceberg.Table icebergTable =
builder
.withPartitionSpec(Spark3Util.toPartitionSpec(icebergSchema, transforms))
.withLocation(properties.get("location"))
.withProperties(Spark3Util.rebuildCreateProperties(properties))
.withPartitionSpec(Spark3Util.toPartitionSpec(icebergSchema, tableInfo.partitions()))
.withLocation(tableInfo.properties().get("location"))
.withProperties(Spark3Util.rebuildCreateProperties(tableInfo.properties()))
.create();
return new SparkTable(icebergTable);
} catch (AlreadyExistsException e) {
throw new TableAlreadyExistsException(ident);
}
}

/**
* @deprecated since 1.12.0, use {@link #stageCreate(Identifier, TableInfo)} instead.
*/
@Deprecated
@Override
public StagedTable stageCreate(
Identifier ident, StructType schema, Transform[] transforms, Map<String, String> properties)
throws TableAlreadyExistsException {
Schema icebergSchema = SparkSchemaUtil.convert(schema);
return stageCreate(ident, Spark3Util.tableInfo(schema, transforms, properties));
}

@Override
public StagedTable stageCreate(Identifier ident, TableInfo tableInfo)
throws TableAlreadyExistsException {
Schema icebergSchema = SparkSchemaUtil.convert(tableInfo.schema());
try {
Catalog.TableBuilder builder = newBuilder(ident, icebergSchema);
Transaction transaction =
builder
.withPartitionSpec(Spark3Util.toPartitionSpec(icebergSchema, transforms))
.withLocation(properties.get("location"))
.withProperties(Spark3Util.rebuildCreateProperties(properties))
.withPartitionSpec(Spark3Util.toPartitionSpec(icebergSchema, tableInfo.partitions()))
.withLocation(tableInfo.properties().get("location"))
.withProperties(Spark3Util.rebuildCreateProperties(tableInfo.properties()))
.createTransaction();
return new StagedSparkTable(transaction);
} catch (AlreadyExistsException e) {
throw new TableAlreadyExistsException(ident);
}
}

/**
* @deprecated since 1.12.0, use {@link #stageReplace(Identifier, TableInfo)} instead.
*/
@Deprecated
@Override
public StagedTable stageReplace(
Identifier ident, StructType schema, Transform[] transforms, Map<String, String> properties)
throws NoSuchTableException {
Schema icebergSchema = SparkSchemaUtil.convert(schema);
return stageReplace(ident, Spark3Util.tableInfo(schema, transforms, properties));
}

@Override
public StagedTable stageReplace(Identifier ident, TableInfo tableInfo)
throws NoSuchTableException {
Schema icebergSchema = SparkSchemaUtil.convert(tableInfo.schema());
try {
Catalog.TableBuilder builder = newBuilder(ident, icebergSchema);
Transaction transaction =
builder
.withPartitionSpec(Spark3Util.toPartitionSpec(icebergSchema, transforms))
.withLocation(properties.get("location"))
.withProperties(Spark3Util.rebuildCreateProperties(properties))
.withPartitionSpec(Spark3Util.toPartitionSpec(icebergSchema, tableInfo.partitions()))
.withLocation(tableInfo.properties().get("location"))
.withProperties(Spark3Util.rebuildCreateProperties(tableInfo.properties()))
.replaceTransaction();
return new StagedSparkTable(transaction);
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
throw new NoSuchTableException(ident);
}
}

/**
* @deprecated since 1.12.0, use {@link #stageCreateOrReplace(Identifier, TableInfo)} instead.
*/
@Deprecated
@Override
public StagedTable stageCreateOrReplace(
Identifier ident, StructType schema, Transform[] transforms, Map<String, String> properties) {
Schema icebergSchema = SparkSchemaUtil.convert(schema);
return stageCreateOrReplace(ident, Spark3Util.tableInfo(schema, transforms, properties));
}

@Override
public StagedTable stageCreateOrReplace(Identifier ident, TableInfo tableInfo) {
Schema icebergSchema = SparkSchemaUtil.convert(tableInfo.schema());
Catalog.TableBuilder builder = newBuilder(ident, icebergSchema);
Transaction transaction =
builder
.withPartitionSpec(Spark3Util.toPartitionSpec(icebergSchema, transforms))
.withLocation(properties.get("location"))
.withProperties(Spark3Util.rebuildCreateProperties(properties))
.withPartitionSpec(Spark3Util.toPartitionSpec(icebergSchema, tableInfo.partitions()))
.withLocation(tableInfo.properties().get("location"))
.withProperties(Spark3Util.rebuildCreateProperties(tableInfo.properties()))
.createOrReplaceTransaction();
return new StagedSparkTable(transaction);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.iceberg.spark;

import java.util.Map;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.source.SparkRewriteTable;
Expand All @@ -28,8 +27,7 @@
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.catalog.TableChange;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.connector.catalog.TableInfo;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

public class SparkRewriteTableCatalog implements TableCatalog, SupportsFunctions {
Expand Down Expand Up @@ -74,8 +72,7 @@ public void invalidateTable(Identifier ident) {
}

@Override
public SparkTable createTable(
Identifier ident, StructType schema, Transform[] partitions, Map<String, String> properties)
public SparkTable createTable(Identifier ident, TableInfo tableInfo)
throws TableAlreadyExistsException {
throw new UnsupportedOperationException(CLASS_NAME + " does not support creating tables");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.catalog.TableChange;
import org.apache.spark.sql.connector.catalog.TableInfo;
import org.apache.spark.sql.connector.catalog.View;
import org.apache.spark.sql.connector.catalog.ViewCatalog;
import org.apache.spark.sql.connector.catalog.ViewChange;
Expand Down Expand Up @@ -181,28 +182,48 @@ public boolean tableExists(Identifier ident) {
return icebergCatalog.tableExists(ident) || getSessionCatalog().tableExists(ident);
}

/**
* @deprecated since 1.12.0, use {@link #createTable(Identifier, TableInfo)} instead.
*/
@Deprecated
@Override
public Table createTable(
Identifier ident, StructType schema, Transform[] partitions, Map<String, String> properties)
throws TableAlreadyExistsException, NoSuchNamespaceException {
String provider = properties.get("provider");
return createTable(ident, Spark3Util.tableInfo(schema, partitions, properties));
}

@Override
public Table createTable(Identifier ident, TableInfo tableInfo)
throws TableAlreadyExistsException, NoSuchNamespaceException {
String provider = tableInfo.properties().get("provider");
if (useIceberg(provider)) {
return icebergCatalog.createTable(ident, schema, partitions, properties);
return icebergCatalog.createTable(ident, tableInfo);
} else {
// delegate to the session catalog
return getSessionCatalog().createTable(ident, schema, partitions, properties);
return getSessionCatalog().createTable(ident, tableInfo);
}
}

/**
* @deprecated since 1.12.0, use {@link #stageCreate(Identifier, TableInfo)} instead.
*/
@Deprecated
@Override
public StagedTable stageCreate(
Identifier ident, StructType schema, Transform[] partitions, Map<String, String> properties)
throws TableAlreadyExistsException, NoSuchNamespaceException {
String provider = properties.get("provider");
return stageCreate(ident, Spark3Util.tableInfo(schema, partitions, properties));
}

@Override
public StagedTable stageCreate(Identifier ident, TableInfo tableInfo)
throws TableAlreadyExistsException, NoSuchNamespaceException {
String provider = tableInfo.properties().get("provider");
TableCatalog catalog;
if (useIceberg(provider)) {
if (asStagingCatalog != null) {
return asStagingCatalog.stageCreate(ident, schema, partitions, properties);
return asStagingCatalog.stageCreate(ident, tableInfo);
}
catalog = icebergCatalog;
} else {
Expand All @@ -211,19 +232,29 @@ public StagedTable stageCreate(

// create the table with the session catalog, then wrap it in a staged table that will delete to
// roll back
Table table = catalog.createTable(ident, schema, partitions, properties);
Table table = catalog.createTable(ident, tableInfo);
return new RollbackStagedTable(catalog, ident, table);
}

/**
* @deprecated since 1.12.0, use {@link #stageReplace(Identifier, TableInfo)} instead.
*/
@Deprecated
@Override
public StagedTable stageReplace(
Identifier ident, StructType schema, Transform[] partitions, Map<String, String> properties)
throws NoSuchNamespaceException, NoSuchTableException {
String provider = properties.get("provider");
return stageReplace(ident, Spark3Util.tableInfo(schema, partitions, properties));
}

@Override
public StagedTable stageReplace(Identifier ident, TableInfo tableInfo)
throws NoSuchNamespaceException, NoSuchTableException {
String provider = tableInfo.properties().get("provider");
TableCatalog catalog;
if (useIceberg(provider)) {
if (asStagingCatalog != null) {
return asStagingCatalog.stageReplace(ident, schema, partitions, properties);
return asStagingCatalog.stageReplace(ident, tableInfo);
}
catalog = icebergCatalog;
} else {
Expand All @@ -238,24 +269,34 @@ public StagedTable stageReplace(
try {
// create the table with the session catalog, then wrap it in a staged table that will delete
// to roll back
Table table = catalog.createTable(ident, schema, partitions, properties);
Table table = catalog.createTable(ident, tableInfo);
return new RollbackStagedTable(catalog, ident, table);

} catch (TableAlreadyExistsException e) {
// the table was deleted, but now already exists again. retry the replace.
return stageReplace(ident, schema, partitions, properties);
return stageReplace(ident, tableInfo);
}
}

/**
* @deprecated since 1.12.0, use {@link #stageCreateOrReplace(Identifier, TableInfo)} instead.
*/
@Deprecated
@Override
public StagedTable stageCreateOrReplace(
Identifier ident, StructType schema, Transform[] partitions, Map<String, String> properties)
throws NoSuchNamespaceException {
String provider = properties.get("provider");
return stageCreateOrReplace(ident, Spark3Util.tableInfo(schema, partitions, properties));
}

@Override
public StagedTable stageCreateOrReplace(Identifier ident, TableInfo tableInfo)
throws NoSuchNamespaceException {
String provider = tableInfo.properties().get("provider");
TableCatalog catalog;
if (useIceberg(provider)) {
if (asStagingCatalog != null) {
return asStagingCatalog.stageCreateOrReplace(ident, schema, partitions, properties);
return asStagingCatalog.stageCreateOrReplace(ident, tableInfo);
}
catalog = icebergCatalog;
} else {
Expand All @@ -268,12 +309,12 @@ public StagedTable stageCreateOrReplace(
try {
// create the table with the session catalog, then wrap it in a staged table that will delete
// to roll back
Table sessionCatalogTable = catalog.createTable(ident, schema, partitions, properties);
Table sessionCatalogTable = catalog.createTable(ident, tableInfo);
return new RollbackStagedTable(catalog, ident, sessionCatalogTable);

} catch (TableAlreadyExistsException e) {
// the table was deleted, but now already exists again. retry the replace.
return stageCreateOrReplace(ident, schema, partitions, properties);
return stageCreateOrReplace(ident, tableInfo);
}
}

Expand Down
Loading