Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType

import org.apache.comet.CometConf
import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus
import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator, SchemaGenOptions}

class CometParquetWriterSuite extends CometTestBase {
Expand Down Expand Up @@ -377,6 +378,238 @@ class CometParquetWriterSuite extends CometTestBase {
}
}

// NATIVE COMET WRITER TESTS WHICH FAIL IN SPARK
// https://github.com/apache/datafusion-comet/issues/3417
ignore("Spark compat: empty file should be skipped while write to file") {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTempPath { path =>
spark.range(100).repartition(10).where("id = 50").write.parquet(path.toString)
val partFiles = path
.listFiles()
.filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_"))
assert(partFiles.length === 2)
}
}
}

// https://github.com/apache/datafusion-comet/issues/3418
ignore("Spark compat: SPARK-33901 ctas should not change table's schema") {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTable("t1", "t2") {
sql("CREATE TABLE t1(i CHAR(5), c VARCHAR(4)) USING parquet")
sql("CREATE TABLE t2 USING parquet AS SELECT * FROM t1")
checkAnswer(
sql("desc t2").selectExpr("data_type").where("data_type like '%char%'"),
Seq(Row("char(5)"), Row("varchar(4)")))
}
}
}

// https://github.com/apache/datafusion-comet/issues/3419
ignore("Spark compat: SPARK-37160 CREATE TABLE AS SELECT with CHAR_AS_VARCHAR") {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTable("t1", "t2") {
sql("CREATE TABLE t1(col CHAR(5)) USING parquet")
withSQLConf(SQLConf.CHAR_AS_VARCHAR.key -> "true") {
sql("CREATE TABLE t2 USING parquet AS SELECT * FROM t1")
checkAnswer(
sql("desc t2").selectExpr("data_type").where("data_type like '%char%'"),
Seq(Row("varchar(5)")))
}
}
}
}

// https://github.com/apache/datafusion-comet/issues/3420
ignore("Spark compat: SPARK-29174 Support LOCAL in INSERT OVERWRITE DIRECTORY") {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTempPath { dir =>
val path = dir.toURI.getPath
withTable("tab1", "tab2") {
sql(s"""create table tab1 (a int) using parquet location '$path'""")
sql("insert into tab1 values(1)")
checkAnswer(sql("select * from tab1"), Seq(Row(1)))
sql("create table tab2 (a int) using parquet")
sql("insert into tab2 values(2)")
sql(s"""insert overwrite local directory '$path' using parquet select * from tab2""")
sql("refresh table tab1")
checkAnswer(sql("select * from tab1"), Seq(Row(2)))
}
}
}
}

// https://github.com/apache/datafusion-comet/issues/3421
ignore("Spark compat: SPARK-38336 INSERT INTO with default columns positive tests") {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTable("t") {
sql("create table t(i boolean, s bigint) using parquet")
sql("insert into t(i) values(true)")
checkAnswer(spark.table("t"), Row(true, null))
}
}
}

// https://github.com/apache/datafusion-comet/issues/3422
ignore("Spark compat: SPARK-38811 INSERT INTO on ALTER TABLE ADD COLUMNS positive tests") {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTable("t") {
sql("create table t(i boolean) using parquet")
sql("alter table t add column s string default concat('abc', 'def')")
sql("insert into t values(true, default)")
checkAnswer(spark.table("t"), Row(true, "abcdef"))
}
}
}

// https://github.com/apache/datafusion-comet/issues/3423
ignore("Spark compat: SPARK-43071 INSERT INTO from non-projection queries") {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTable("t1", "t2") {
sql("create table t1(i boolean, s bigint default 42) using parquet")
sql("insert into t1 values (true, 41), (false, default)")
sql("create table t2(i boolean, s bigint) using parquet")
sql("insert into t2 select * from t1 order by s")
checkAnswer(spark.table("t2"), Seq(Row(true, 41), Row(false, 42)))
}
}
}

// https://github.com/apache/datafusion-comet/issues/3424
ignore("Spark compat: Insert overwrite table command should output correct schema basic") {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTable("tbl", "tbl2") {
withView("view1") {
val df = spark.range(10).toDF("id")
df.write.format("parquet").saveAsTable("tbl")
spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl")
spark.sql("CREATE TABLE tbl2(ID long) USING parquet")
spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1")
checkAnswer(spark.table("tbl2"), (0 until 10).map(Row(_)))
}
}
}
}

// https://github.com/apache/datafusion-comet/issues/3425
ignore("Spark compat: parquet timestamp conversion") {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTempPath { dir =>
spark
.range(1)
.selectExpr("current_timestamp() as ts")
.write
.parquet(dir.toString + "/spark")
val result = spark.read.parquet(dir.toString + "/spark").collect()
assert(result.length == 1)
}
}
}

// https://github.com/apache/datafusion-comet/issues/3426
ignore("Spark compat: INSERT INTO TABLE - complex type but different names") {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTable("tab1", "tab2") {
sql("""CREATE TABLE tab1 (s struct<a: string, b: string>) USING parquet""")
sql("""CREATE TABLE tab2 (s struct<c: string, d: string>) USING parquet""")
sql("INSERT INTO tab1 VALUES (named_struct('a', 'x', 'b', 'y'))")
sql("INSERT INTO tab2 SELECT * FROM tab1")
checkAnswer(spark.table("tab2"), Row(Row("x", "y")))
}
}
}

// https://github.com/apache/datafusion-comet/issues/3427
ignore("Spark compat: Write Spark version into Parquet metadata") {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTempPath { dir =>
spark.range(1).repartition(1).write.parquet(dir.getAbsolutePath)
val files = dir.listFiles().filter(_.getName.endsWith(".parquet"))
assert(files.nonEmpty, "Expected parquet files to be written")
}
}
}

// https://github.com/apache/datafusion-comet/issues/3428
ignore("Spark compat: write path implements onTaskCommit API correctly") {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTempDir { dir =>
val path = dir.getCanonicalPath
spark.range(10).repartition(10).write.mode("overwrite").parquet(path)
val files = new File(path).listFiles().filter(_.getName.startsWith("part-"))
assert(files.length > 0, "Expected part files to be written")
}
}
}

// COMET NATIVE WRITER Spark 4.x test failures
// https://github.com/apache/datafusion-comet/issues/3429
ignore("Spark compat: ctas with union") {
assume(isSpark40Plus)
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTable("t") {
sql("CREATE TABLE t USING parquet AS SELECT 1 AS c UNION ALL SELECT 2")
checkAnswer(spark.table("t"), Seq(Row(1), Row(2)))
}
}
}

// https://github.com/apache/datafusion-comet/issues/3430
ignore("Spark compat: SPARK-48817 test multi insert") {
assume(isSpark40Plus)
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTable("t1", "t2") {
sql("CREATE TABLE t1(a INT) USING parquet")
sql("CREATE TABLE t2(a INT) USING parquet")
sql("FROM (SELECT 1 AS a) src INSERT INTO t1 SELECT a INSERT INTO t2 SELECT a")
checkAnswer(spark.table("t1"), Row(1))
checkAnswer(spark.table("t2"), Row(1))
}
}
}

private def createTestData(inputDir: File): String = {
val inputPath = new File(inputDir, "input.parquet").getAbsolutePath
val schema = FuzzDataGenerator.generateSchema(
Expand Down
Loading