Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/get-started/VeloxIceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ extracted from https://iceberg.apache.org/docs/latest/configuration/
| write.parquet.page-size-bytes | 1048576 (1 MB) | Parquet page size |✅|
| write.parquet.page-row-limit | 20000 | Parquet page row limit | |
| write.parquet.dict-size-bytes | 2097152 (2 MB) | Parquet dictionary page size | |
| write.parquet.compression-codec | zstd | Parquet compression codec: zstd, brotli, lz4, gzip, snappy, uncompressed | |
| write.parquet.compression-codec | zstd | Parquet compression codec: zstd, lz4, gzip, snappy, uncompressed. **Note:** brotli, lzo, lz4raw, and lz4_raw are not supported |⚠️|
| write.parquet.compression-level | null | Parquet compression level | |
| write.parquet.bloom-filter-enabled.column.col1 | (not set) | Hint to parquet to write a bloom filter for the column: 'col1' | |
| write.parquet.bloom-filter-max-bytes | 1048576 (1 MB) | The maximum number of bytes for a bloom filter bitset | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ trait IcebergWriteExec extends ColumnarV2TableWriteExec {
}

val codec = getCodec
if (Seq("brotli, lzo").contains(codec)) {
return ValidationResult.failed("Not support this codec " + codec)
val unsupported = Set("brotli", "lzo", "lz4raw", "lz4_raw")
if (unsupported.contains(codec.toLowerCase())) {
return ValidationResult.failed("Codec unsupported: " + codec)
}
if (query.output.exists(a => !AvroSchemaUtil.makeCompatibleName(a.name).equals(a.name))) {
return ValidationResult.failed("Not support the compatible column name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,4 +717,40 @@ abstract class IcebergSuite extends WholeStageTransformerSuite {
e.getCause != null && e.getCause.getMessage.contains("null"))
}
}

test("iceberg write with unsupported codec should fail validation") {
withTable("iceberg_codec_test") {
spark.sql("""
|CREATE TABLE iceberg_codec_test (id INT, data STRING)
|USING iceberg
|""".stripMargin)

val unsupportedCodecs = Seq("brotli", "lzo", "lz4raw", "lz4_raw")

unsupportedCodecs.foreach {
codec =>
withSQLConf("spark.sql.parquet.compression.codec" -> codec) {
val e = intercept[Exception] {
spark
.sql("INSERT INTO iceberg_codec_test VALUES (1, 'test')")
.collect()
}
assert(
e.getMessage.contains("Codec unsupported") ||
e.getCause != null && e.getCause.getMessage.contains("Codec unsupported"),
s"Expected validation error for codec: $codec, but got: ${e.getMessage}"
)
}
}
val supportedCodecs = Seq("snappy", "SNAPPY", "gzip", "GZIP", "zstd", "ZSTD", "none")
supportedCodecs.foreach {
codec =>
withSQLConf("spark.sql.parquet.compression.codec" -> codec) {
spark.sql(s"INSERT INTO iceberg_codec_test VALUES (2, 'test_$codec')")
}
}
val result = spark.sql("SELECT COUNT(*) FROM iceberg_codec_test").collect()
assert(result.head.getLong(0) == supportedCodecs.length)
}
}
}
Loading