Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions docs/source/user-guide/latest/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ The native Iceberg reader supports the following features:

**Table specifications:**

- Iceberg table spec v1 and v2 (v3 will fall back to Spark)
- Iceberg table spec v1, v2, and v3 (basic read support)
- V3 tables using V2-compatible features are fully accelerated
- V3-specific features (Deletion Vectors, new types) will gracefully fall back to Spark

**Schema and data types:**

Expand Down Expand Up @@ -266,10 +268,19 @@ scala> spark.sql("SELECT * FROM rest_cat.db.test_table").show()

The following scenarios will fall back to Spark's native Iceberg reader:

- Iceberg table spec v3 scans
- Iceberg writes (reads are accelerated, writes use Spark)
- Tables backed by Avro or ORC data files (only Parquet is accelerated)
- Tables partitioned on `BINARY` or `DECIMAL` (with precision >28) columns
- Scans with residual filters using `truncate`, `bucket`, `year`, `month`, `day`, or `hour`
transform functions (partition pruning still works, but row-level filtering of these
transforms falls back)

**V3-specific limitations (graceful fallback to Spark):**

- Deletion Vectors (DVs) - V3's efficient bitmap-based deletes stored in Puffin files
- V3-only data types: `timestamp_ns`, `timestamptz_ns`, `variant`, `geometry`, `geography`
- Table encryption - tables with `encryption.key-id` or other encryption properties
- Default column values - schema fields with `initial-default` or `write-default`

Note: V3 tables that use only V2-compatible features (position deletes, equality deletes,
standard types, no encryption, no column defaults) are fully accelerated by Comet.
250 changes: 250 additions & 0 deletions spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,256 @@ object IcebergReflection extends Logging {
}
None
}

// ============================================================================
// Iceberg V3 Feature Detection
// ============================================================================

/** V3-only types not yet supported by Comet. */
object V3Types {
val TIMESTAMP_NS = "timestamp_ns"
val TIMESTAMPTZ_NS = "timestamptz_ns"
val VARIANT = "variant"
val GEOMETRY = "geometry"
val GEOGRAPHY = "geography"

val UNSUPPORTED_V3_TYPES: Set[String] =
Set(TIMESTAMP_NS, TIMESTAMPTZ_NS, VARIANT, GEOMETRY, GEOGRAPHY)
}

/** V3 Deletion Vector content type. */
object V3ContentTypes {
val DELETION_VECTOR = "DELETION_VECTOR"
}

/** Checks if any delete files use Deletion Vectors (V3 feature). */
def hasDeletionVectors(tasks: java.util.List[_]): Boolean = {
import scala.jdk.CollectionConverters._

try {
val deleteFiles = getDeleteFiles(tasks)

deleteFiles.asScala.exists { deleteFile =>
try {
// scalastyle:off classforname
val contentFileClass = Class.forName(ClassNames.CONTENT_FILE)
// scalastyle:on classforname
val contentMethod = contentFileClass.getMethod("content")
val content = contentMethod.invoke(deleteFile)
content.toString == V3ContentTypes.DELETION_VECTOR
} catch {
case _: Exception => false
}
}
} catch {
case _: Exception => false
}
}

/** Finds V3-only types in a schema not yet supported by Comet. */
def findUnsupportedV3Types(schema: Any): Set[String] = {
import scala.jdk.CollectionConverters._

val unsupportedTypes = scala.collection.mutable.Set[String]()

try {
val columnsMethod = schema.getClass.getMethod("columns")
val columns = columnsMethod.invoke(schema).asInstanceOf[java.util.List[_]]

columns.asScala.foreach { column =>
try {
val typeMethod = column.getClass.getMethod("type")
val icebergType = typeMethod.invoke(column)
val typeStr = icebergType.toString.toLowerCase(java.util.Locale.ROOT)

V3Types.UNSUPPORTED_V3_TYPES.foreach { v3Type =>
if (typeStr == v3Type || typeStr.startsWith(s"$v3Type(")) {
unsupportedTypes += v3Type
}
}

checkNestedTypesForV3(icebergType, unsupportedTypes)
} catch {
case _: Exception => // Skip columns where we can't determine type
}
}
} catch {
case e: Exception =>
logWarning(s"Failed to scan schema for V3 types: ${e.getMessage}")
}

unsupportedTypes.toSet
}

/** Recursively checks nested types for V3-only types. */
private def checkNestedTypesForV3(
icebergType: Any,
unsupportedTypes: scala.collection.mutable.Set[String]): Unit = {
import scala.jdk.CollectionConverters._

try {
val typeClass = icebergType.getClass

if (typeClass.getSimpleName.contains("StructType")) {
try {
val fieldsMethod = typeClass.getMethod("fields")
val fields = fieldsMethod.invoke(icebergType).asInstanceOf[java.util.List[_]]

fields.asScala.foreach { field =>
try {
val fieldTypeMethod = field.getClass.getMethod("type")
val fieldType = fieldTypeMethod.invoke(field)
val typeStr = fieldType.toString.toLowerCase(java.util.Locale.ROOT)

V3Types.UNSUPPORTED_V3_TYPES.foreach { v3Type =>
if (typeStr == v3Type || typeStr.startsWith(s"$v3Type(")) {
unsupportedTypes += v3Type
}
}

checkNestedTypesForV3(fieldType, unsupportedTypes)
} catch {
case _: Exception =>
}
}
} catch {
case _: Exception =>
}
}

if (typeClass.getSimpleName.contains("ListType")) {
try {
val elementTypeMethod = typeClass.getMethod("elementType")
val elementType = elementTypeMethod.invoke(icebergType)
val typeStr = elementType.toString.toLowerCase(java.util.Locale.ROOT)

V3Types.UNSUPPORTED_V3_TYPES.foreach { v3Type =>
if (typeStr == v3Type || typeStr.startsWith(s"$v3Type(")) {
unsupportedTypes += v3Type
}
}

checkNestedTypesForV3(elementType, unsupportedTypes)
} catch {
case _: Exception =>
}
}

if (typeClass.getSimpleName.contains("MapType")) {
try {
val keyTypeMethod = typeClass.getMethod("keyType")
val valueTypeMethod = typeClass.getMethod("valueType")
val keyType = keyTypeMethod.invoke(icebergType)
val valueType = valueTypeMethod.invoke(icebergType)

Seq(keyType, valueType).foreach { mapType =>
val typeStr = mapType.toString.toLowerCase(java.util.Locale.ROOT)
V3Types.UNSUPPORTED_V3_TYPES.foreach { v3Type =>
if (typeStr == v3Type || typeStr.startsWith(s"$v3Type(")) {
unsupportedTypes += v3Type
}
}
checkNestedTypesForV3(mapType, unsupportedTypes)
}
} catch {
case _: Exception =>
}
}
} catch {
case _: Exception =>
}
}

/** Checks if table has encryption configured (V3 feature). */
def hasEncryption(table: Any): Boolean = {
import scala.jdk.CollectionConverters._

try {
// Check table properties for encryption.key-id
getTableProperties(table).exists { props =>
props.asScala.keys.exists { key =>
key.startsWith("encryption.") || key.startsWith("kms.")
}
}
} catch {
case _: Exception => false
}
}

/** Checks if schema has default column values (V3 feature). */
def hasDefaultColumnValues(schema: Any): Boolean = {
import scala.jdk.CollectionConverters._

try {
val columnsMethod = schema.getClass.getMethod("columns")
val columns = columnsMethod.invoke(schema).asInstanceOf[java.util.List[_]]

columns.asScala.exists { column =>
try {
hasFieldDefault(column) || hasNestedDefaults(column)
} catch {
case _: Exception => false
}
}
} catch {
case _: Exception => false
}
}

/** Checks if a field has initial-default or write-default set. */
private def hasFieldDefault(field: Any): Boolean = {
try {
val fieldClass = field.getClass

// Check initialDefault()
val hasInitialDefault =
try {
val initialDefaultMethod = fieldClass.getMethod("initialDefault")
val initialDefault = initialDefaultMethod.invoke(field)
initialDefault != null
} catch {
case _: NoSuchMethodException => false
}

// Check writeDefault()
val hasWriteDefault =
try {
val writeDefaultMethod = fieldClass.getMethod("writeDefault")
val writeDefault = writeDefaultMethod.invoke(field)
writeDefault != null
} catch {
case _: NoSuchMethodException => false
}

hasInitialDefault || hasWriteDefault
} catch {
case _: Exception => false
}
}

/** Recursively checks nested struct fields for defaults. */
private def hasNestedDefaults(field: Any): Boolean = {
import scala.jdk.CollectionConverters._

try {
val typeMethod = field.getClass.getMethod("type")
val fieldType = typeMethod.invoke(field)
val typeClass = fieldType.getClass

if (typeClass.getSimpleName.contains("StructType")) {
val fieldsMethod = typeClass.getMethod("fields")
val nestedFields = fieldsMethod.invoke(fieldType).asInstanceOf[java.util.List[_]]

nestedFields.asScala.exists { nestedField =>
hasFieldDefault(nestedField) || hasNestedDefaults(nestedField)
}
} else {
false
}
} catch {
case _: Exception => false
}
}
}

/**
Expand Down
66 changes: 63 additions & 3 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -441,10 +441,10 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com

val formatVersionSupported = IcebergReflection.getFormatVersion(metadata.table) match {
case Some(formatVersion) =>
if (formatVersion > 2) {
if (formatVersion > 3) {
fallbackReasons += "Iceberg table format version " +
s"$formatVersion is not supported. " +
"Comet only supports Iceberg table format V1 and V2"
"Comet supports Iceberg table format V1, V2, and V3"
false
} else {
true
Expand Down Expand Up @@ -621,10 +621,70 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
!hasUnsupportedDeletes
}

val v3FeaturesSupported = IcebergReflection.getFormatVersion(metadata.table) match {
case Some(formatVersion) if formatVersion >= 3 =>
var allV3FeaturesSupported = true

try {
if (IcebergReflection.hasDeletionVectors(metadata.tasks)) {
fallbackReasons += "Iceberg V3 Deletion Vectors are not yet supported. " +
"Tables using Deletion Vectors will fall back to Spark"
allV3FeaturesSupported = false
}
} catch {
case e: Exception =>
fallbackReasons += "Iceberg reflection failure: Could not check for " +
s"Deletion Vectors: ${e.getMessage}"
allV3FeaturesSupported = false
}

try {
val unsupportedTypes = IcebergReflection.findUnsupportedV3Types(metadata.scanSchema)
if (unsupportedTypes.nonEmpty) {
fallbackReasons += "Iceberg V3 types not yet supported: " +
s"${unsupportedTypes.mkString(", ")}. " +
"Tables with these types will fall back to Spark"
allV3FeaturesSupported = false
}
} catch {
case e: Exception =>
fallbackReasons += "Iceberg reflection failure: Could not check for " +
s"V3 types: ${e.getMessage}"
allV3FeaturesSupported = false
}

try {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these check may slowdown a bit but eventually the plan is to support these v3 features and fully functional iceberg v3 spec support.

if (IcebergReflection.hasEncryption(metadata.table)) {
fallbackReasons += "Iceberg table encryption is not yet supported"
allV3FeaturesSupported = false
}
} catch {
case e: Exception =>
fallbackReasons += "Iceberg reflection failure: Could not check for " +
s"encryption: ${e.getMessage}"
allV3FeaturesSupported = false
}

try {
if (IcebergReflection.hasDefaultColumnValues(metadata.scanSchema)) {
fallbackReasons += "Iceberg default column values are not yet supported"
allV3FeaturesSupported = false
}
} catch {
case e: Exception =>
fallbackReasons += "Iceberg reflection failure: Could not check for " +
s"default column values: ${e.getMessage}"
allV3FeaturesSupported = false
}

allV3FeaturesSupported
case _ => true
}

if (schemaSupported && fileIOCompatible && formatVersionSupported && allParquetFiles &&
allSupportedFilesystems && partitionTypesSupported &&
complexTypePredicatesSupported && transformFunctionsSupported &&
deleteFileTypesSupported) {
deleteFileTypesSupported && v3FeaturesSupported) {
CometBatchScanExec(
scanExec.clone().asInstanceOf[BatchScanExec],
runtimeFilters = scanExec.runtimeFilters,
Expand Down
Loading
Loading