From 2ae10dbb6aea316fca21d93317650e73368234fc Mon Sep 17 00:00:00 2001 From: kid Date: Thu, 25 Jun 2026 19:58:07 +0800 Subject: [PATCH] fix(datafusion): return single row with count 0 for empty inserts When an INSERT produces no data files, IcebergCommitExec previously returned an empty RecordBatch. DataFusion expects a single-row count result, so this change returns a batch with count=0 and skips creating a new snapshot. Includes unit and integration tests for unpartitioned and partitioned tables. --- .../datafusion/src/physical_plan/commit.rs | 82 ++++++++++++++++++- .../testdata/slts/df_test/insert_into.slt | 30 +++++++ 2 files changed, 109 insertions(+), 3 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs index 835c804908..ec2376493d 100644 --- a/crates/integrations/datafusion/src/physical_plan/commit.rs +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -182,7 +182,6 @@ impl ExecutionPlan for IcebergCommitExec { let table = self.table.clone(); let input_plan = self.input.clone(); - let count_schema = Arc::clone(&self.count_schema); // todo revisit this let spec_id = self.table.metadata().default_partition_spec_id(); @@ -240,9 +239,9 @@ impl ExecutionPlan for IcebergCommitExec { data_files.extend(batch_files); } - // If no data files were collected, return an empty result + // If no data files were collected, return a single row with count = 0 if data_files.is_empty() { - return Ok(RecordBatch::new_empty(count_schema)); + return Self::make_count_batch(0); } // Create a transaction and commit the data files @@ -530,6 +529,83 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_iceberg_commit_exec_empty_insert() -> Result<(), Box> { + let catalog = Arc::new( + MemoryCatalogBuilder::default() + .load( + "memory", + HashMap::from([( + MEMORY_CATALOG_WAREHOUSE.to_string(), + "memory://root".to_string(), + )]), + ) + .await + .unwrap(), + ); + + let namespace = NamespaceIdent::new("test_empty_insert".to_string()); + catalog.create_namespace(&namespace, HashMap::new()).await?; + + let schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build()?; + + let table_creation = TableCreation::builder() + .name("empty_insert_table".to_string()) + .schema(schema) + .location("memory://root/empty_insert_table".to_string()) + .properties(HashMap::new()) + .build(); + + let table = catalog.create_table(&namespace, table_creation).await?; + let snapshot_count_before = table.metadata().snapshots().len(); + + // Mock write plan produces no data files + let input_exec = Arc::new(MockWriteExec::new(vec![])); + let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new( + DATA_FILES_COL_NAME, + DataType::Utf8, + false, + )])); + let commit_exec = + IcebergCommitExec::new(table.clone(), catalog.clone(), input_exec, arrow_schema); + + let task_ctx = Arc::new(TaskContext::default()); + let stream = commit_exec.execute(0, task_ctx)?; + let batches = collect(stream).await?; + + // Must return exactly one batch with one row and one column + assert_eq!(batches.len(), 1); + let batch = &batches[0]; + assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.num_columns(), 1); + + // The count column must be UInt64 with value 0 + let count_array = batch.column(0); + assert_eq!(count_array.data_type(), &DataType::UInt64); + let count = count_array.as_any().downcast_ref::().unwrap(); + assert_eq!(count.value(0), 0); + + // No new snapshot should be created for an empty insert + let updated_table = catalog + .load_table( + &TableIdent::from_strs(["test_empty_insert", "empty_insert_table"]).unwrap(), + ) + .await?; + let snapshot_count_after = updated_table.metadata().snapshots().len(); + assert_eq!( + snapshot_count_after, snapshot_count_before, + "Empty insert must not create a new snapshot" + ); + assert!(updated_table.metadata().current_snapshot().is_none()); + + Ok(()) + } + #[tokio::test] async fn test_datafusion_execution_partitioned_source() -> Result<(), Box> { diff --git a/crates/sqllogictest/testdata/slts/df_test/insert_into.slt b/crates/sqllogictest/testdata/slts/df_test/insert_into.slt index 1e07844326..56fc1fefb1 100644 --- a/crates/sqllogictest/testdata/slts/df_test/insert_into.slt +++ b/crates/sqllogictest/testdata/slts/df_test/insert_into.slt @@ -24,6 +24,21 @@ query IT rowsort SELECT * FROM default.default.test_unpartitioned_table ---- +# Insert zero rows and verify it returns count 0 without creating a snapshot +query I +INSERT INTO default.default.test_unpartitioned_table SELECT * FROM (VALUES (1, 'Nobody')) AS t(id, name) WHERE false +---- +0 + +query I +SELECT COUNT(*) FROM default.default.test_unpartitioned_table +---- +0 + +query ?????? +SELECT * FROM default.default.test_unpartitioned_table$snapshots +---- + # Insert a single row and verify the count query I INSERT INTO default.default.test_unpartitioned_table VALUES (1, 'Alice') @@ -70,6 +85,21 @@ query ITT rowsort SELECT * FROM default.default.test_partitioned_table ---- +# Insert zero rows into a partitioned table and verify it returns count 0 without creating a snapshot +query I +INSERT INTO default.default.test_partitioned_table SELECT * FROM (VALUES (1, 'electronics', 'laptop')) AS t(id, category, value) WHERE false +---- +0 + +query I +SELECT COUNT(*) FROM default.default.test_partitioned_table +---- +0 + +query ?????? +SELECT * FROM default.default.test_partitioned_table$snapshots +---- + # Insert single row into partitioned table query I INSERT INTO default.default.test_partitioned_table VALUES (1, 'electronics', 'laptop')