diff --git a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala index b691039f19..ee20b5ecbc 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.comet.CometConf +import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator, SchemaGenOptions} class CometParquetWriterSuite extends CometTestBase { @@ -377,6 +378,238 @@ class CometParquetWriterSuite extends CometTestBase { } } +// NATIVE COMET WRITER TESTS WHICH FAIL IN SPARK + // https://github.com/apache/datafusion-comet/issues/3417 + ignore("Spark compat: empty file should be skipped while write to file") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { + withTempPath { path => + spark.range(100).repartition(10).where("id = 50").write.parquet(path.toString) + val partFiles = path + .listFiles() + .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_")) + assert(partFiles.length === 2) + } + } + } + + // https://github.com/apache/datafusion-comet/issues/3418 + ignore("Spark compat: SPARK-33901 ctas should not change table's schema") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { + withTable("t1", "t2") { + sql("CREATE TABLE t1(i CHAR(5), c VARCHAR(4)) USING parquet") + sql("CREATE TABLE t2 USING parquet AS SELECT * FROM t1") + checkAnswer( + sql("desc t2").selectExpr("data_type").where("data_type like '%char%'"), + Seq(Row("char(5)"), Row("varchar(4)"))) + } + } + } + + // https://github.com/apache/datafusion-comet/issues/3419 + ignore("Spark compat: SPARK-37160 CREATE TABLE AS SELECT with CHAR_AS_VARCHAR") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { + withTable("t1", "t2") { + sql("CREATE TABLE t1(col CHAR(5)) USING parquet") + withSQLConf(SQLConf.CHAR_AS_VARCHAR.key -> "true") { + sql("CREATE TABLE t2 USING parquet AS SELECT * FROM t1") + checkAnswer( + sql("desc t2").selectExpr("data_type").where("data_type like '%char%'"), + Seq(Row("varchar(5)"))) + } + } + } + } + + // https://github.com/apache/datafusion-comet/issues/3420 + ignore("Spark compat: SPARK-29174 Support LOCAL in INSERT OVERWRITE DIRECTORY") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { + withTempPath { dir => + val path = dir.toURI.getPath + withTable("tab1", "tab2") { + sql(s"""create table tab1 (a int) using parquet location '$path'""") + sql("insert into tab1 values(1)") + checkAnswer(sql("select * from tab1"), Seq(Row(1))) + sql("create table tab2 (a int) using parquet") + sql("insert into tab2 values(2)") + sql(s"""insert overwrite local directory '$path' using parquet select * from tab2""") + sql("refresh table tab1") + checkAnswer(sql("select * from tab1"), Seq(Row(2))) + } + } + } + } + + // https://github.com/apache/datafusion-comet/issues/3421 + ignore("Spark compat: SPARK-38336 INSERT INTO with default columns positive tests") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { + withTable("t") { + sql("create table t(i boolean, s bigint) using parquet") + sql("insert into t(i) values(true)") + checkAnswer(spark.table("t"), Row(true, null)) + } + } + } + + // https://github.com/apache/datafusion-comet/issues/3422 + ignore("Spark compat: SPARK-38811 INSERT INTO on ALTER TABLE ADD COLUMNS positive tests") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { + withTable("t") { + sql("create table t(i boolean) using parquet") + sql("alter table t add column s string default concat('abc', 'def')") + sql("insert into t values(true, default)") + checkAnswer(spark.table("t"), Row(true, "abcdef")) + } + } + } + + // https://github.com/apache/datafusion-comet/issues/3423 + ignore("Spark compat: SPARK-43071 INSERT INTO from non-projection queries") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { + withTable("t1", "t2") { + sql("create table t1(i boolean, s bigint default 42) using parquet") + sql("insert into t1 values (true, 41), (false, default)") + sql("create table t2(i boolean, s bigint) using parquet") + sql("insert into t2 select * from t1 order by s") + checkAnswer(spark.table("t2"), Seq(Row(true, 41), Row(false, 42))) + } + } + } + + // https://github.com/apache/datafusion-comet/issues/3424 + ignore("Spark compat: Insert overwrite table command should output correct schema basic") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { + withTable("tbl", "tbl2") { + withView("view1") { + val df = spark.range(10).toDF("id") + df.write.format("parquet").saveAsTable("tbl") + spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl") + spark.sql("CREATE TABLE tbl2(ID long) USING parquet") + spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1") + checkAnswer(spark.table("tbl2"), (0 until 10).map(Row(_))) + } + } + } + } + + // https://github.com/apache/datafusion-comet/issues/3425 + ignore("Spark compat: parquet timestamp conversion") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { + withTempPath { dir => + spark + .range(1) + .selectExpr("current_timestamp() as ts") + .write + .parquet(dir.toString + "/spark") + val result = spark.read.parquet(dir.toString + "/spark").collect() + assert(result.length == 1) + } + } + } + + // https://github.com/apache/datafusion-comet/issues/3426 + ignore("Spark compat: INSERT INTO TABLE - complex type but different names") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { + withTable("tab1", "tab2") { + sql("""CREATE TABLE tab1 (s struct) USING parquet""") + sql("""CREATE TABLE tab2 (s struct) USING parquet""") + sql("INSERT INTO tab1 VALUES (named_struct('a', 'x', 'b', 'y'))") + sql("INSERT INTO tab2 SELECT * FROM tab1") + checkAnswer(spark.table("tab2"), Row(Row("x", "y"))) + } + } + } + + // https://github.com/apache/datafusion-comet/issues/3427 + ignore("Spark compat: Write Spark version into Parquet metadata") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { + withTempPath { dir => + spark.range(1).repartition(1).write.parquet(dir.getAbsolutePath) + val files = dir.listFiles().filter(_.getName.endsWith(".parquet")) + assert(files.nonEmpty, "Expected parquet files to be written") + } + } + } + + // https://github.com/apache/datafusion-comet/issues/3428 + ignore("Spark compat: write path implements onTaskCommit API correctly") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { + withTempDir { dir => + val path = dir.getCanonicalPath + spark.range(10).repartition(10).write.mode("overwrite").parquet(path) + val files = new File(path).listFiles().filter(_.getName.startsWith("part-")) + assert(files.length > 0, "Expected part files to be written") + } + } + } + + // COMET NATIVE WRITER Spark 4.x test failures + // https://github.com/apache/datafusion-comet/issues/3429 + ignore("Spark compat: ctas with union") { + assume(isSpark40Plus) + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { + withTable("t") { + sql("CREATE TABLE t USING parquet AS SELECT 1 AS c UNION ALL SELECT 2") + checkAnswer(spark.table("t"), Seq(Row(1), Row(2))) + } + } + } + + // https://github.com/apache/datafusion-comet/issues/3430 + ignore("Spark compat: SPARK-48817 test multi insert") { + assume(isSpark40Plus) + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { + withTable("t1", "t2") { + sql("CREATE TABLE t1(a INT) USING parquet") + sql("CREATE TABLE t2(a INT) USING parquet") + sql("FROM (SELECT 1 AS a) src INSERT INTO t1 SELECT a INSERT INTO t2 SELECT a") + checkAnswer(spark.table("t1"), Row(1)) + checkAnswer(spark.table("t2"), Row(1)) + } + } + } + private def createTestData(inputDir: File): String = { val inputPath = new File(inputDir, "input.parquet").getAbsolutePath val schema = FuzzDataGenerator.generateSchema(