diff --git a/docs/source/user-guide/latest/iceberg.md b/docs/source/user-guide/latest/iceberg.md index ad6e5b2433..e4cd6c00a9 100644 --- a/docs/source/user-guide/latest/iceberg.md +++ b/docs/source/user-guide/latest/iceberg.md @@ -189,7 +189,9 @@ The native Iceberg reader supports the following features: **Table specifications:** -- Iceberg table spec v1 and v2 (v3 will fall back to Spark) +- Iceberg table spec v1, v2, and v3 (basic read support) +- V3 tables using V2-compatible features are fully accelerated +- V3-specific features (Deletion Vectors, new types) will gracefully fall back to Spark **Schema and data types:** @@ -266,10 +268,19 @@ scala> spark.sql("SELECT * FROM rest_cat.db.test_table").show() The following scenarios will fall back to Spark's native Iceberg reader: -- Iceberg table spec v3 scans - Iceberg writes (reads are accelerated, writes use Spark) - Tables backed by Avro or ORC data files (only Parquet is accelerated) - Tables partitioned on `BINARY` or `DECIMAL` (with precision >28) columns - Scans with residual filters using `truncate`, `bucket`, `year`, `month`, `day`, or `hour` transform functions (partition pruning still works, but row-level filtering of these transforms falls back) + +**V3-specific limitations (graceful fallback to Spark):** + +- Deletion Vectors (DVs) - V3's efficient bitmap-based deletes stored in Puffin files +- V3-only data types: `timestamp_ns`, `timestamptz_ns`, `variant`, `geometry`, `geography` +- Table encryption - tables with `encryption.key-id` or other encryption properties +- Default column values - schema fields with `initial-default` or `write-default` + +Note: V3 tables that use only V2-compatible features (position deletes, equality deletes, +standard types, no encryption, no column defaults) are fully accelerated by Comet. diff --git a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala index 2d772063e4..3434837b41 100644 --- a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala +++ b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala @@ -705,6 +705,256 @@ object IcebergReflection extends Logging { } None } + + // ============================================================================ + // Iceberg V3 Feature Detection + // ============================================================================ + + /** V3-only types not yet supported by Comet. */ + object V3Types { + val TIMESTAMP_NS = "timestamp_ns" + val TIMESTAMPTZ_NS = "timestamptz_ns" + val VARIANT = "variant" + val GEOMETRY = "geometry" + val GEOGRAPHY = "geography" + + val UNSUPPORTED_V3_TYPES: Set[String] = + Set(TIMESTAMP_NS, TIMESTAMPTZ_NS, VARIANT, GEOMETRY, GEOGRAPHY) + } + + /** V3 Deletion Vector content type. */ + object V3ContentTypes { + val DELETION_VECTOR = "DELETION_VECTOR" + } + + /** Checks if any delete files use Deletion Vectors (V3 feature). */ + def hasDeletionVectors(tasks: java.util.List[_]): Boolean = { + import scala.jdk.CollectionConverters._ + + try { + val deleteFiles = getDeleteFiles(tasks) + + deleteFiles.asScala.exists { deleteFile => + try { + // scalastyle:off classforname + val contentFileClass = Class.forName(ClassNames.CONTENT_FILE) + // scalastyle:on classforname + val contentMethod = contentFileClass.getMethod("content") + val content = contentMethod.invoke(deleteFile) + content.toString == V3ContentTypes.DELETION_VECTOR + } catch { + case _: Exception => false + } + } + } catch { + case _: Exception => false + } + } + + /** Finds V3-only types in a schema not yet supported by Comet. */ + def findUnsupportedV3Types(schema: Any): Set[String] = { + import scala.jdk.CollectionConverters._ + + val unsupportedTypes = scala.collection.mutable.Set[String]() + + try { + val columnsMethod = schema.getClass.getMethod("columns") + val columns = columnsMethod.invoke(schema).asInstanceOf[java.util.List[_]] + + columns.asScala.foreach { column => + try { + val typeMethod = column.getClass.getMethod("type") + val icebergType = typeMethod.invoke(column) + val typeStr = icebergType.toString.toLowerCase(java.util.Locale.ROOT) + + V3Types.UNSUPPORTED_V3_TYPES.foreach { v3Type => + if (typeStr == v3Type || typeStr.startsWith(s"$v3Type(")) { + unsupportedTypes += v3Type + } + } + + checkNestedTypesForV3(icebergType, unsupportedTypes) + } catch { + case _: Exception => // Skip columns where we can't determine type + } + } + } catch { + case e: Exception => + logWarning(s"Failed to scan schema for V3 types: ${e.getMessage}") + } + + unsupportedTypes.toSet + } + + /** Recursively checks nested types for V3-only types. */ + private def checkNestedTypesForV3( + icebergType: Any, + unsupportedTypes: scala.collection.mutable.Set[String]): Unit = { + import scala.jdk.CollectionConverters._ + + try { + val typeClass = icebergType.getClass + + if (typeClass.getSimpleName.contains("StructType")) { + try { + val fieldsMethod = typeClass.getMethod("fields") + val fields = fieldsMethod.invoke(icebergType).asInstanceOf[java.util.List[_]] + + fields.asScala.foreach { field => + try { + val fieldTypeMethod = field.getClass.getMethod("type") + val fieldType = fieldTypeMethod.invoke(field) + val typeStr = fieldType.toString.toLowerCase(java.util.Locale.ROOT) + + V3Types.UNSUPPORTED_V3_TYPES.foreach { v3Type => + if (typeStr == v3Type || typeStr.startsWith(s"$v3Type(")) { + unsupportedTypes += v3Type + } + } + + checkNestedTypesForV3(fieldType, unsupportedTypes) + } catch { + case _: Exception => + } + } + } catch { + case _: Exception => + } + } + + if (typeClass.getSimpleName.contains("ListType")) { + try { + val elementTypeMethod = typeClass.getMethod("elementType") + val elementType = elementTypeMethod.invoke(icebergType) + val typeStr = elementType.toString.toLowerCase(java.util.Locale.ROOT) + + V3Types.UNSUPPORTED_V3_TYPES.foreach { v3Type => + if (typeStr == v3Type || typeStr.startsWith(s"$v3Type(")) { + unsupportedTypes += v3Type + } + } + + checkNestedTypesForV3(elementType, unsupportedTypes) + } catch { + case _: Exception => + } + } + + if (typeClass.getSimpleName.contains("MapType")) { + try { + val keyTypeMethod = typeClass.getMethod("keyType") + val valueTypeMethod = typeClass.getMethod("valueType") + val keyType = keyTypeMethod.invoke(icebergType) + val valueType = valueTypeMethod.invoke(icebergType) + + Seq(keyType, valueType).foreach { mapType => + val typeStr = mapType.toString.toLowerCase(java.util.Locale.ROOT) + V3Types.UNSUPPORTED_V3_TYPES.foreach { v3Type => + if (typeStr == v3Type || typeStr.startsWith(s"$v3Type(")) { + unsupportedTypes += v3Type + } + } + checkNestedTypesForV3(mapType, unsupportedTypes) + } + } catch { + case _: Exception => + } + } + } catch { + case _: Exception => + } + } + + /** Checks if table has encryption configured (V3 feature). */ + def hasEncryption(table: Any): Boolean = { + import scala.jdk.CollectionConverters._ + + try { + // Check table properties for encryption.key-id + getTableProperties(table).exists { props => + props.asScala.keys.exists { key => + key.startsWith("encryption.") || key.startsWith("kms.") + } + } + } catch { + case _: Exception => false + } + } + + /** Checks if schema has default column values (V3 feature). */ + def hasDefaultColumnValues(schema: Any): Boolean = { + import scala.jdk.CollectionConverters._ + + try { + val columnsMethod = schema.getClass.getMethod("columns") + val columns = columnsMethod.invoke(schema).asInstanceOf[java.util.List[_]] + + columns.asScala.exists { column => + try { + hasFieldDefault(column) || hasNestedDefaults(column) + } catch { + case _: Exception => false + } + } + } catch { + case _: Exception => false + } + } + + /** Checks if a field has initial-default or write-default set. */ + private def hasFieldDefault(field: Any): Boolean = { + try { + val fieldClass = field.getClass + + // Check initialDefault() + val hasInitialDefault = + try { + val initialDefaultMethod = fieldClass.getMethod("initialDefault") + val initialDefault = initialDefaultMethod.invoke(field) + initialDefault != null + } catch { + case _: NoSuchMethodException => false + } + + // Check writeDefault() + val hasWriteDefault = + try { + val writeDefaultMethod = fieldClass.getMethod("writeDefault") + val writeDefault = writeDefaultMethod.invoke(field) + writeDefault != null + } catch { + case _: NoSuchMethodException => false + } + + hasInitialDefault || hasWriteDefault + } catch { + case _: Exception => false + } + } + + /** Recursively checks nested struct fields for defaults. */ + private def hasNestedDefaults(field: Any): Boolean = { + import scala.jdk.CollectionConverters._ + + try { + val typeMethod = field.getClass.getMethod("type") + val fieldType = typeMethod.invoke(field) + val typeClass = fieldType.getClass + + if (typeClass.getSimpleName.contains("StructType")) { + val fieldsMethod = typeClass.getMethod("fields") + val nestedFields = fieldsMethod.invoke(fieldType).asInstanceOf[java.util.List[_]] + + nestedFields.asScala.exists { nestedField => + hasFieldDefault(nestedField) || hasNestedDefaults(nestedField) + } + } else { + false + } + } catch { + case _: Exception => false + } + } } /** diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 45faa4d940..564f68bf1a 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -441,10 +441,10 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com val formatVersionSupported = IcebergReflection.getFormatVersion(metadata.table) match { case Some(formatVersion) => - if (formatVersion > 2) { + if (formatVersion > 3) { fallbackReasons += "Iceberg table format version " + s"$formatVersion is not supported. " + - "Comet only supports Iceberg table format V1 and V2" + "Comet supports Iceberg table format V1, V2, and V3" false } else { true @@ -621,10 +621,70 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com !hasUnsupportedDeletes } + val v3FeaturesSupported = IcebergReflection.getFormatVersion(metadata.table) match { + case Some(formatVersion) if formatVersion >= 3 => + var allV3FeaturesSupported = true + + try { + if (IcebergReflection.hasDeletionVectors(metadata.tasks)) { + fallbackReasons += "Iceberg V3 Deletion Vectors are not yet supported. " + + "Tables using Deletion Vectors will fall back to Spark" + allV3FeaturesSupported = false + } + } catch { + case e: Exception => + fallbackReasons += "Iceberg reflection failure: Could not check for " + + s"Deletion Vectors: ${e.getMessage}" + allV3FeaturesSupported = false + } + + try { + val unsupportedTypes = IcebergReflection.findUnsupportedV3Types(metadata.scanSchema) + if (unsupportedTypes.nonEmpty) { + fallbackReasons += "Iceberg V3 types not yet supported: " + + s"${unsupportedTypes.mkString(", ")}. " + + "Tables with these types will fall back to Spark" + allV3FeaturesSupported = false + } + } catch { + case e: Exception => + fallbackReasons += "Iceberg reflection failure: Could not check for " + + s"V3 types: ${e.getMessage}" + allV3FeaturesSupported = false + } + + try { + if (IcebergReflection.hasEncryption(metadata.table)) { + fallbackReasons += "Iceberg table encryption is not yet supported" + allV3FeaturesSupported = false + } + } catch { + case e: Exception => + fallbackReasons += "Iceberg reflection failure: Could not check for " + + s"encryption: ${e.getMessage}" + allV3FeaturesSupported = false + } + + try { + if (IcebergReflection.hasDefaultColumnValues(metadata.scanSchema)) { + fallbackReasons += "Iceberg default column values are not yet supported" + allV3FeaturesSupported = false + } + } catch { + case e: Exception => + fallbackReasons += "Iceberg reflection failure: Could not check for " + + s"default column values: ${e.getMessage}" + allV3FeaturesSupported = false + } + + allV3FeaturesSupported + case _ => true + } + if (schemaSupported && fileIOCompatible && formatVersionSupported && allParquetFiles && allSupportedFilesystems && partitionTypesSupported && complexTypePredicatesSupported && transformFunctionsSupported && - deleteFileTypesSupported) { + deleteFileTypesSupported && v3FeaturesSupported) { CometBatchScanExec( scanExec.clone().asInstanceOf[BatchScanExec], runtimeFilters = scanExec.runtimeFilters, diff --git a/spark/src/test/resources/sql-tests/iceberg/v3_aggregations.sql b/spark/src/test/resources/sql-tests/iceberg/v3_aggregations.sql new file mode 100644 index 0000000000..47542bef64 --- /dev/null +++ b/spark/src/test/resources/sql-tests/iceberg/v3_aggregations.sql @@ -0,0 +1,48 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Iceberg V3 aggregations and grouping + +statement +CREATE NAMESPACE IF NOT EXISTS iceberg_cat.db + +statement +CREATE TABLE iceberg_cat.db.v3_orders (order_id INT, customer_id INT, amount DOUBLE, category STRING) USING iceberg TBLPROPERTIES ('format-version' = '3') + +statement +INSERT INTO iceberg_cat.db.v3_orders VALUES (1, 1, 100.0, 'A'), (2, 1, 200.0, 'B'), (3, 2, 150.0, 'A'), (4, 3, 300.0, 'B'), (5, 2, 50.0, 'A') + +query spark_answer_only +SELECT COUNT(*) FROM iceberg_cat.db.v3_orders + +query spark_answer_only +SELECT SUM(amount) FROM iceberg_cat.db.v3_orders + +query spark_answer_only +SELECT AVG(amount) FROM iceberg_cat.db.v3_orders + +query spark_answer_only +SELECT MIN(amount), MAX(amount) FROM iceberg_cat.db.v3_orders + +query spark_answer_only +SELECT customer_id, SUM(amount) as total FROM iceberg_cat.db.v3_orders GROUP BY customer_id ORDER BY customer_id + +query spark_answer_only +SELECT category, COUNT(*) as cnt, AVG(amount) as avg_amt FROM iceberg_cat.db.v3_orders GROUP BY category ORDER BY category + +query spark_answer_only +SELECT customer_id, SUM(amount) as total FROM iceberg_cat.db.v3_orders GROUP BY customer_id HAVING SUM(amount) > 200 ORDER BY customer_id diff --git a/spark/src/test/resources/sql-tests/iceberg/v3_basic.sql b/spark/src/test/resources/sql-tests/iceberg/v3_basic.sql new file mode 100644 index 0000000000..5bdc6ce6fa --- /dev/null +++ b/spark/src/test/resources/sql-tests/iceberg/v3_basic.sql @@ -0,0 +1,42 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Iceberg V3 basic table operations + +statement +CREATE NAMESPACE IF NOT EXISTS iceberg_cat.db + +statement +CREATE TABLE iceberg_cat.db.v3_basic (id INT, name STRING, value DOUBLE) USING iceberg TBLPROPERTIES ('format-version' = '3') + +statement +INSERT INTO iceberg_cat.db.v3_basic VALUES (1, 'Alice', 10.5), (2, 'Bob', 20.3), (3, 'Charlie', 30.7) + +query spark_answer_only +SELECT * FROM iceberg_cat.db.v3_basic ORDER BY id + +query spark_answer_only +SELECT * FROM iceberg_cat.db.v3_basic WHERE id > 1 ORDER BY id + +query spark_answer_only +SELECT * FROM iceberg_cat.db.v3_basic WHERE name = 'Alice' + +query spark_answer_only +SELECT COUNT(*) FROM iceberg_cat.db.v3_basic + +query spark_answer_only +SELECT id, name FROM iceberg_cat.db.v3_basic WHERE value > 15.0 ORDER BY id diff --git a/spark/src/test/resources/sql-tests/iceberg/v3_joins.sql b/spark/src/test/resources/sql-tests/iceberg/v3_joins.sql new file mode 100644 index 0000000000..d31ff3ef3c --- /dev/null +++ b/spark/src/test/resources/sql-tests/iceberg/v3_joins.sql @@ -0,0 +1,42 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Iceberg V3 join operations + +statement +CREATE NAMESPACE IF NOT EXISTS iceberg_cat.db + +statement +CREATE TABLE iceberg_cat.db.v3_customers (id INT, name STRING) USING iceberg TBLPROPERTIES ('format-version' = '3') + +statement +CREATE TABLE iceberg_cat.db.v3_orders_join (order_id INT, customer_id INT, amount DOUBLE) USING iceberg TBLPROPERTIES ('format-version' = '3') + +statement +INSERT INTO iceberg_cat.db.v3_customers VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie') + +statement +INSERT INTO iceberg_cat.db.v3_orders_join VALUES (1, 1, 100.0), (2, 1, 200.0), (3, 2, 150.0), (4, 3, 300.0) + +query spark_answer_only +SELECT o.order_id, c.name, o.amount FROM iceberg_cat.db.v3_orders_join o JOIN iceberg_cat.db.v3_customers c ON o.customer_id = c.id ORDER BY o.order_id + +query spark_answer_only +SELECT c.name, SUM(o.amount) as total FROM iceberg_cat.db.v3_orders_join o JOIN iceberg_cat.db.v3_customers c ON o.customer_id = c.id GROUP BY c.name ORDER BY c.name + +query spark_answer_only +SELECT c.name, COUNT(o.order_id) as order_count FROM iceberg_cat.db.v3_customers c LEFT JOIN iceberg_cat.db.v3_orders_join o ON c.id = o.customer_id GROUP BY c.name ORDER BY c.name diff --git a/spark/src/test/resources/sql-tests/iceberg/v3_partitioned.sql b/spark/src/test/resources/sql-tests/iceberg/v3_partitioned.sql new file mode 100644 index 0000000000..a131556aba --- /dev/null +++ b/spark/src/test/resources/sql-tests/iceberg/v3_partitioned.sql @@ -0,0 +1,39 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Iceberg V3 partitioned tables + +statement +CREATE NAMESPACE IF NOT EXISTS iceberg_cat.db + +statement +CREATE TABLE iceberg_cat.db.v3_partitioned (id INT, event_date DATE, value STRING) USING iceberg PARTITIONED BY (days(event_date)) TBLPROPERTIES ('format-version' = '3') + +statement +INSERT INTO iceberg_cat.db.v3_partitioned VALUES (1, DATE '2024-01-01', 'a'), (2, DATE '2024-01-02', 'b'), (3, DATE '2024-01-01', 'c'), (4, DATE '2024-02-01', 'd') + +query spark_answer_only +SELECT * FROM iceberg_cat.db.v3_partitioned ORDER BY id + +query spark_answer_only +SELECT * FROM iceberg_cat.db.v3_partitioned WHERE event_date = DATE '2024-01-01' ORDER BY id + +query spark_answer_only +SELECT * FROM iceberg_cat.db.v3_partitioned WHERE event_date >= DATE '2024-01-02' ORDER BY id + +query spark_answer_only +SELECT event_date, COUNT(*) FROM iceberg_cat.db.v3_partitioned GROUP BY event_date ORDER BY event_date diff --git a/spark/src/test/resources/sql-tests/iceberg/v3_types.sql b/spark/src/test/resources/sql-tests/iceberg/v3_types.sql new file mode 100644 index 0000000000..496109c0d2 --- /dev/null +++ b/spark/src/test/resources/sql-tests/iceberg/v3_types.sql @@ -0,0 +1,52 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Iceberg V3 supported data types + +statement +CREATE NAMESPACE IF NOT EXISTS iceberg_cat.db + +statement +CREATE TABLE iceberg_cat.db.v3_types ( + id INT, + big_num BIGINT, + price DECIMAL(10, 2), + event_date DATE, + event_time TIMESTAMP, + is_active BOOLEAN, + description STRING, + score FLOAT, + rating DOUBLE +) USING iceberg TBLPROPERTIES ('format-version' = '3') + +statement +INSERT INTO iceberg_cat.db.v3_types VALUES (1, 9223372036854775807, 123.45, DATE '2024-01-15', TIMESTAMP '2024-01-15 10:30:00', true, 'Test', 3.14, 2.718) + +query spark_answer_only +SELECT * FROM iceberg_cat.db.v3_types + +query spark_answer_only +SELECT id, big_num FROM iceberg_cat.db.v3_types WHERE big_num > 1000 + +query spark_answer_only +SELECT id, price FROM iceberg_cat.db.v3_types WHERE price > 100.00 + +query spark_answer_only +SELECT id, event_date FROM iceberg_cat.db.v3_types WHERE event_date = DATE '2024-01-15' + +query spark_answer_only +SELECT id, is_active FROM iceberg_cat.db.v3_types WHERE is_active = true diff --git a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala index f3c8a8b2a6..a57043b2ea 100644 --- a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala @@ -96,6 +96,267 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { } } + // ========================================================================== + // Iceberg V3 Table Format Tests + // ========================================================================== + + test("V3 table - basic read support (no V3-specific features)") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.v3_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.v3_cat.type" -> "hadoop", + "spark.sql.catalog.v3_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE v3_cat.db.v3_basic ( + id INT, + name STRING, + value DOUBLE + ) USING iceberg + TBLPROPERTIES ('format-version' = '3') + """) + + spark.sql(""" + INSERT INTO v3_cat.db.v3_basic + VALUES (1, 'Alice', 10.5), (2, 'Bob', 20.3), (3, 'Charlie', 30.7) + """) + + checkIcebergNativeScan("SELECT * FROM v3_cat.db.v3_basic ORDER BY id") + checkIcebergNativeScan("SELECT * FROM v3_cat.db.v3_basic WHERE id = 2") + checkIcebergNativeScan("SELECT * FROM v3_cat.db.v3_basic WHERE name = 'Alice'") + checkIcebergNativeScan("SELECT * FROM v3_cat.db.v3_basic WHERE value > 15.0 ORDER BY id") + + spark.sql("DROP TABLE v3_cat.db.v3_basic") + } + } + } + + test("V3 table - with position deletes (V2 compatibility mode)") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.v3_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.v3_cat.type" -> "hadoop", + "spark.sql.catalog.v3_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE v3_cat.db.v3_pos_deletes ( + id INT, + name STRING, + value DOUBLE + ) USING iceberg + TBLPROPERTIES ( + 'format-version' = '3', + 'write.delete.mode' = 'merge-on-read', + 'write.update.mode' = 'merge-on-read' + ) + """) + + spark.sql(""" + INSERT INTO v3_cat.db.v3_pos_deletes + VALUES (1, 'Alice', 10.5), (2, 'Bob', 20.3), (3, 'Charlie', 30.7) + """) + + spark.sql("DELETE FROM v3_cat.db.v3_pos_deletes WHERE id = 2") + + checkIcebergNativeScan("SELECT * FROM v3_cat.db.v3_pos_deletes ORDER BY id") + + spark.sql("DROP TABLE v3_cat.db.v3_pos_deletes") + } + } + } + + test("V3 table - with equality deletes") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.v3_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.v3_cat.type" -> "hadoop", + "spark.sql.catalog.v3_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE v3_cat.db.v3_eq_deletes ( + id INT, + category STRING, + value DOUBLE + ) USING iceberg + PARTITIONED BY (category) + TBLPROPERTIES ( + 'format-version' = '3', + 'write.delete.mode' = 'merge-on-read' + ) + """) + + spark.sql(""" + INSERT INTO v3_cat.db.v3_eq_deletes + VALUES (1, 'A', 10.5), (2, 'B', 20.3), (3, 'A', 30.7), (4, 'B', 40.1) + """) + + spark.sql("DELETE FROM v3_cat.db.v3_eq_deletes WHERE category = 'B'") + + checkIcebergNativeScan("SELECT * FROM v3_cat.db.v3_eq_deletes ORDER BY id") + + spark.sql("DROP TABLE v3_cat.db.v3_eq_deletes") + } + } + } + + test("V3 table - all data types supported in V3") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.v3_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.v3_cat.type" -> "hadoop", + "spark.sql.catalog.v3_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE v3_cat.db.v3_types ( + id INT, + big_num BIGINT, + price DECIMAL(10, 2), + event_date DATE, + event_time TIMESTAMP, + is_active BOOLEAN, + description STRING, + score FLOAT, + rating DOUBLE, + tags ARRAY, + metadata MAP, + address STRUCT + ) USING iceberg + TBLPROPERTIES ('format-version' = '3') + """) + + spark.sql(""" + INSERT INTO v3_cat.db.v3_types + VALUES ( + 1, 9223372036854775807, 123.45, DATE '2024-01-15', + TIMESTAMP '2024-01-15 10:30:00', true, 'Test description', + 3.14, 2.718281828, array('tag1', 'tag2'), + map('key1', 'value1'), struct('NYC', 10001) + ) + """) + + checkIcebergNativeScan("SELECT * FROM v3_cat.db.v3_types") + + spark.sql("DROP TABLE v3_cat.db.v3_types") + } + } + } + + test("V3 table - partitioned table with date transform") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.v3_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.v3_cat.type" -> "hadoop", + "spark.sql.catalog.v3_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE v3_cat.db.v3_partitioned ( + id INT, + event_date DATE, + value STRING + ) USING iceberg + PARTITIONED BY (days(event_date)) + TBLPROPERTIES ('format-version' = '3') + """) + + spark.sql(""" + INSERT INTO v3_cat.db.v3_partitioned VALUES + (1, DATE '2024-01-01', 'a'), (2, DATE '2024-01-02', 'b'), + (3, DATE '2024-01-03', 'c'), (4, DATE '2024-02-01', 'd') + """) + + checkIcebergNativeScan("SELECT * FROM v3_cat.db.v3_partitioned ORDER BY id") + checkIcebergNativeScan( + "SELECT * FROM v3_cat.db.v3_partitioned WHERE event_date = DATE '2024-01-01'") + + spark.sql("DROP TABLE v3_cat.db.v3_partitioned") + } + } + } + + test("V3 table - aggregations and joins") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.v3_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.v3_cat.type" -> "hadoop", + "spark.sql.catalog.v3_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + spark.sql(""" + CREATE TABLE v3_cat.db.v3_orders ( + order_id INT, + customer_id INT, + amount DOUBLE + ) USING iceberg + TBLPROPERTIES ('format-version' = '3') + """) + + spark.sql(""" + CREATE TABLE v3_cat.db.v3_customers ( + id INT, + name STRING + ) USING iceberg + TBLPROPERTIES ('format-version' = '3') + """) + + spark.sql(""" + INSERT INTO v3_cat.db.v3_orders VALUES + (1, 1, 100.0), (2, 1, 200.0), (3, 2, 150.0), (4, 3, 300.0) + """) + + spark.sql(""" + INSERT INTO v3_cat.db.v3_customers VALUES + (1, 'Alice'), (2, 'Bob'), (3, 'Charlie') + """) + + checkIcebergNativeScan( + "SELECT customer_id, SUM(amount) as total FROM v3_cat.db.v3_orders GROUP BY customer_id") + + val (_, joinPlan) = checkSparkAnswer(""" + SELECT o.order_id, c.name, o.amount + FROM v3_cat.db.v3_orders o + JOIN v3_cat.db.v3_customers c ON o.customer_id = c.id + ORDER BY o.order_id + """) + val icebergScans = collectIcebergNativeScans(joinPlan) + assert( + icebergScans.length == 2, + s"Expected 2 CometIcebergNativeScanExec for join but found ${icebergScans.length}") + + spark.sql("DROP TABLE v3_cat.db.v3_orders") + spark.sql("DROP TABLE v3_cat.db.v3_customers") + } + } + } + test("filter pushdown - equality predicates") { assume(icebergAvailable, "Iceberg not available in classpath") diff --git a/spark/src/test/scala/org/apache/comet/CometIcebergSqlFileTestSuite.scala b/spark/src/test/scala/org/apache/comet/CometIcebergSqlFileTestSuite.scala new file mode 100644 index 0000000000..73fb55b719 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/CometIcebergSqlFileTestSuite.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet + +import java.io.File +import java.nio.file.Files + +import org.scalactic.source.Position +import org.scalatest.Tag + +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper + +/** + * SQL file test suite for Iceberg tables. + * + * Tests are placed in sql-tests/iceberg/ and use a pre-configured Iceberg catalog. Tables should + * use the `iceberg_cat` catalog prefix (e.g., `iceberg_cat.db.table_name`). + */ +class CometIcebergSqlFileTestSuite extends CometTestBase with AdaptiveSparkPlanHelper { + + private def icebergAvailable: Boolean = { + try { + Class.forName("org.apache.iceberg.catalog.Catalog") + true + } catch { + case _: ClassNotFoundException => false + } + } + + override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit + pos: Position): Unit = { + super.test(testName, testTags: _*) { + assume(icebergAvailable, "Iceberg not available in classpath") + testFun + } + } + + private val testResourceDir = { + val url = getClass.getClassLoader.getResource("sql-tests/iceberg") + if (url == null) null else new File(url.toURI) + } + + private def discoverTestFiles(dir: File): Seq[File] = { + if (dir == null || !dir.exists()) return Seq.empty + val files = dir.listFiles().toSeq + val sqlFiles = files.filter(f => f.isFile && f.getName.endsWith(".sql")) + val subDirFiles = files.filter(_.isDirectory).flatMap(discoverTestFiles) + sqlFiles ++ subDirFiles + } + + private val constantFoldingExcluded = Seq( + "spark.sql.optimizer.excludedRules" -> + "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") + + private def runTestFile(file: SqlTestFile, warehouseDir: File): Unit = { + val icebergConfigs = Seq( + "spark.sql.catalog.iceberg_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.iceberg_cat.type" -> "hadoop", + "spark.sql.catalog.iceberg_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") + + val allConfigs = icebergConfigs ++ file.configs ++ constantFoldingExcluded + withSQLConf(allConfigs: _*) { + file.records.foreach { + case SqlStatement(sql) => + spark.sql(sql) + case SqlQuery(sql, mode) => + mode match { + case CheckCoverageAndAnswer => + checkSparkAnswerAndOperator(sql) + case SparkAnswerOnly => + checkSparkAnswer(sql) + case WithTolerance(tol) => + checkSparkAnswerWithTolerance(sql, tol) + case ExpectFallback(reason) => + checkSparkAnswerAndFallbackReason(sql, reason) + case Ignore(reason) => + logInfo(s"IGNORED query (${reason}): $sql") + } + } + } + } + + if (testResourceDir != null) { + discoverTestFiles(testResourceDir).foreach { file => + val relativePath = + new File(testResourceDir.getParentFile, "iceberg").toURI.relativize(file.toURI).getPath + val parsed = SqlFileTestParser.parse(file) + + test(s"iceberg-sql: $relativePath") { + val warehouseDir = Files.createTempDirectory("iceberg-sql-test").toFile + try { + runTestFile(parsed, warehouseDir) + } finally { + deleteRecursively(warehouseDir) + } + } + } + } + + private def deleteRecursively(file: File): Unit = { + if (file.isDirectory) { + file.listFiles().foreach(deleteRecursively) + } + file.delete() + } +}