Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 79 additions & 3 deletions crates/integrations/datafusion/src/physical_plan/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ impl ExecutionPlan for IcebergCommitExec {

let table = self.table.clone();
let input_plan = self.input.clone();
let count_schema = Arc::clone(&self.count_schema);

// todo revisit this
let spec_id = self.table.metadata().default_partition_spec_id();
Expand Down Expand Up @@ -240,9 +239,9 @@ impl ExecutionPlan for IcebergCommitExec {
data_files.extend(batch_files);
}

// If no data files were collected, return an empty result
// If no data files were collected, return a single row with count = 0
if data_files.is_empty() {
return Ok(RecordBatch::new_empty(count_schema));
return Self::make_count_batch(0);
}

// Create a transaction and commit the data files
Expand Down Expand Up @@ -530,6 +529,83 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_iceberg_commit_exec_empty_insert() -> Result<(), Box<dyn std::error::Error>> {
let catalog = Arc::new(
MemoryCatalogBuilder::default()
.load(
"memory",
HashMap::from([(
MEMORY_CATALOG_WAREHOUSE.to_string(),
"memory://root".to_string(),
)]),
)
.await
.unwrap(),
);

let namespace = NamespaceIdent::new("test_empty_insert".to_string());
catalog.create_namespace(&namespace, HashMap::new()).await?;

let schema = Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
])
.build()?;

let table_creation = TableCreation::builder()
.name("empty_insert_table".to_string())
.schema(schema)
.location("memory://root/empty_insert_table".to_string())
.properties(HashMap::new())
.build();

let table = catalog.create_table(&namespace, table_creation).await?;
let snapshot_count_before = table.metadata().snapshots().len();

// Mock write plan produces no data files
let input_exec = Arc::new(MockWriteExec::new(vec![]));
let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
DATA_FILES_COL_NAME,
DataType::Utf8,
false,
)]));
let commit_exec =
IcebergCommitExec::new(table.clone(), catalog.clone(), input_exec, arrow_schema);

let task_ctx = Arc::new(TaskContext::default());
let stream = commit_exec.execute(0, task_ctx)?;
let batches = collect(stream).await?;

// Must return exactly one batch with one row and one column
assert_eq!(batches.len(), 1);
let batch = &batches[0];
assert_eq!(batch.num_rows(), 1);
assert_eq!(batch.num_columns(), 1);

// The count column must be UInt64 with value 0
let count_array = batch.column(0);
assert_eq!(count_array.data_type(), &DataType::UInt64);
let count = count_array.as_any().downcast_ref::<UInt64Array>().unwrap();
assert_eq!(count.value(0), 0);

// No new snapshot should be created for an empty insert
let updated_table = catalog
.load_table(
&TableIdent::from_strs(["test_empty_insert", "empty_insert_table"]).unwrap(),
)
.await?;
let snapshot_count_after = updated_table.metadata().snapshots().len();
assert_eq!(
snapshot_count_after, snapshot_count_before,
"Empty insert must not create a new snapshot"
);
assert!(updated_table.metadata().current_snapshot().is_none());

Ok(())
}

#[tokio::test]
async fn test_datafusion_execution_partitioned_source() -> Result<(), Box<dyn std::error::Error>>
{
Expand Down
30 changes: 30 additions & 0 deletions crates/sqllogictest/testdata/slts/df_test/insert_into.slt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,21 @@ query IT rowsort
SELECT * FROM default.default.test_unpartitioned_table
----

# Insert zero rows and verify it returns count 0 without creating a snapshot
query I
INSERT INTO default.default.test_unpartitioned_table SELECT * FROM (VALUES (1, 'Nobody')) AS t(id, name) WHERE false
----
0

query I
SELECT COUNT(*) FROM default.default.test_unpartitioned_table
----
0

query ??????
SELECT * FROM default.default.test_unpartitioned_table$snapshots
----

# Insert a single row and verify the count
query I
INSERT INTO default.default.test_unpartitioned_table VALUES (1, 'Alice')
Expand Down Expand Up @@ -70,6 +85,21 @@ query ITT rowsort
SELECT * FROM default.default.test_partitioned_table
----

# Insert zero rows into a partitioned table and verify it returns count 0 without creating a snapshot
query I
INSERT INTO default.default.test_partitioned_table SELECT * FROM (VALUES (1, 'electronics', 'laptop')) AS t(id, category, value) WHERE false
----
0

query I
SELECT COUNT(*) FROM default.default.test_partitioned_table
----
0

query ??????
SELECT * FROM default.default.test_partitioned_table$snapshots
----

# Insert single row into partitioned table
query I
INSERT INTO default.default.test_partitioned_table VALUES (1, 'electronics', 'laptop')
Expand Down
Loading