diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 75b7988d8d..24ec6f6630 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -161,6 +161,12 @@ impl TableProvider for IcebergTableProvider { input: Arc, _insert_op: InsertOp, ) -> DFResult> { + if _insert_op != InsertOp::Append { + return Err(DataFusionError::NotImplemented(format!( + "IcebergTableProvider supports only append inserts, got {_insert_op}" + ))); + } + // Load fresh table metadata from catalog let table = self .catalog @@ -710,6 +716,42 @@ mod tests { false } + #[tokio::test] + async fn test_catalog_backed_provider_rejects_non_append_op() { + use datafusion::physical_plan::empty::EmptyExec; + + let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; + let provider = IcebergTableProvider::try_new(catalog, namespace, table_name) + .await + .unwrap(); + let ctx = SessionContext::new(); + + for (insert_op, expected_message) in [ + ( + InsertOp::Overwrite, + "IcebergTableProvider supports only append inserts, got Insert Overwrite", + ), + ( + InsertOp::Replace, + "IcebergTableProvider supports only append inserts, got Replace Into", + ), + ] { + let input = Arc::new(EmptyExec::new(provider.schema())) as Arc; + let error = provider + .insert_into(&ctx.state(), input, insert_op) + .await + .expect_err("non-append inserts should be rejected"); + + assert!( + matches!( + error, + DataFusionError::NotImplemented(ref message) if message == expected_message + ), + "unexpected error: {error}" + ); + } + } + #[tokio::test] async fn test_insert_plan_fanout_enabled_no_sort() { use datafusion::datasource::TableProvider;