diff --git a/crates/iceberg/src/transaction/add_fields.rs b/crates/iceberg/src/transaction/add_fields.rs new file mode 100644 index 0000000000..2c4346837e --- /dev/null +++ b/crates/iceberg/src/transaction/add_fields.rs @@ -0,0 +1,232 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; + +use crate::spec::NestedFieldRef; +use crate::table::Table; +use crate::transaction::action::{ActionCommit, TransactionAction}; +use crate::{Error, ErrorKind, Result, TableRequirement, TableUpdate}; + +/// A transaction action for adding new fields to the table's current schema. +/// +/// This action clones the table's current schema, appends the provided fields, +/// and emits the appropriate `AddSchema` and `SetCurrentSchema` updates along +/// with a `CurrentSchemaIdMatch` requirement to guard against concurrent schema changes. +pub struct AddFieldsAction { + fields: Vec, +} + +impl AddFieldsAction { + pub(crate) fn new(fields: Vec) -> Self { + Self { fields } + } + + /// Adds a single field to the action. + pub fn add_field(mut self, field: NestedFieldRef) -> Self { + self.fields.push(field); + self + } +} + +#[async_trait] +impl TransactionAction for AddFieldsAction { + async fn commit(self: Arc, table: &Table) -> Result { + // Validate that new required fields have an initial_default value. + // Without initial_default, old Parquet files (written before the schema change) + // cannot provide a value for the required column, leading to silent data corruption + // (a non-nullable Arrow column filled with nulls). + if let Some(field) = self + .fields + .iter() + .find(|f| f.required && f.initial_default.is_none()) + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add required field '{}' (id={}) without an initial_default value. \ + Existing data files do not contain this field, so a default is needed \ + to populate it when reading older data. Either make the field optional \ + or set an initial_default.", + field.name, field.id + ), + )); + } + + let base_schema = table.metadata().current_schema(); + let schema = base_schema + .as_ref() + .clone() + .into_builder() + .with_fields(self.fields.clone()) + .build()?; + + let updates = vec![ + TableUpdate::AddSchema { schema }, + TableUpdate::SetCurrentSchema { schema_id: -1 }, + ]; + + let requirements = vec![TableRequirement::CurrentSchemaIdMatch { + current_schema_id: base_schema.schema_id(), + }]; + + Ok(ActionCommit::new(updates, requirements)) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use as_any::Downcast; + + use crate::spec::{Literal, NestedField, NestedFieldRef, PrimitiveType, Type}; + use crate::transaction::Transaction; + use crate::transaction::action::{ApplyTransactionAction, TransactionAction}; + use crate::transaction::add_fields::AddFieldsAction; + use crate::transaction::tests::make_v2_table; + use crate::{ErrorKind, TableRequirement, TableUpdate}; + + #[tokio::test] + async fn test_add_field() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + + let new_field = NestedFieldRef::new(NestedField::optional( + 4, + "new_field", + Type::Primitive(PrimitiveType::Int), + )); + + let action = tx.add_fields(vec![new_field.clone()]); + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + let updates = action_commit.take_updates(); + let requirements = action_commit.take_requirements(); + + // Verify AddSchema update + let expected_schema = table + .metadata() + .current_schema() + .as_ref() + .clone() + .into_builder() + .with_fields(vec![new_field]) + .build() + .unwrap(); + + assert_eq!(updates.len(), 2); + assert_eq!(updates[0], TableUpdate::AddSchema { + schema: expected_schema + }); + assert_eq!(updates[1], TableUpdate::SetCurrentSchema { schema_id: -1 }); + + // Verify requirement + assert_eq!(requirements.len(), 1); + assert_eq!(requirements[0], TableRequirement::CurrentSchemaIdMatch { + current_schema_id: table.metadata().current_schema().schema_id() + }); + } + + #[test] + fn test_add_field_apply() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + + let new_field = NestedFieldRef::new(NestedField::optional( + 4, + "new_field", + Type::Primitive(PrimitiveType::Int), + )); + + let tx = tx.add_fields(vec![new_field]).apply(tx).unwrap(); + + assert_eq!(tx.actions.len(), 1); + (*tx.actions[0]) + .downcast_ref::() + .expect("AddFieldsAction was not applied to Transaction!"); + } + + #[tokio::test] + async fn test_add_field_with_existing_field_id() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + + // Field ID 1 already exists in the V2 test table schema + let conflicting_field = NestedFieldRef::new(NestedField::new( + 1, + "new_field", + Type::Primitive(PrimitiveType::Int), + true, + )); + + let action = tx.add_fields(vec![conflicting_field]); + let result = Arc::new(action).commit(&table).await; + assert!( + result.is_err(), + "should fail because field_id 1 is already taken" + ); + } + + #[tokio::test] + async fn test_add_required_field_without_initial_default_fails() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + + let required_field = NestedFieldRef::new(NestedField::required( + 4, + "required_no_default", + Type::Primitive(PrimitiveType::Int), + )); + + let action = tx.add_fields(vec![required_field]); + let result = Arc::new(action).commit(&table).await; + let err = match result { + Err(e) => e, + Ok(_) => panic!("should reject required field without initial_default"), + }; + assert_eq!(err.kind(), ErrorKind::DataInvalid); + assert!( + err.message().contains("required_no_default"), + "error should mention the field name, got: {}", + err.message() + ); + } + + #[tokio::test] + async fn test_add_required_field_with_initial_default_succeeds() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + + let required_field_with_default = NestedFieldRef::new( + NestedField::required( + 4, + "required_with_default", + Type::Primitive(PrimitiveType::Int), + ) + .with_initial_default(Literal::int(0)), + ); + + let action = tx.add_fields(vec![required_field_with_default]); + let result = Arc::new(action).commit(&table).await; + assert!( + result.is_ok(), + "required field with initial_default should be accepted" + ); + } +} diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 074c7fefe4..6845576b6f 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -53,6 +53,7 @@ mod action; pub use action::*; +mod add_fields; mod append; mod snapshot; mod sort_order; @@ -67,9 +68,10 @@ use std::time::Duration; use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext}; use crate::error::Result; -use crate::spec::TableProperties; +use crate::spec::{NestedFieldRef, TableProperties}; use crate::table::Table; use crate::transaction::action::BoxedTransactionAction; +use crate::transaction::add_fields::AddFieldsAction; use crate::transaction::append::FastAppendAction; use crate::transaction::sort_order::ReplaceSortOrderAction; use crate::transaction::update_location::UpdateLocationAction; @@ -136,6 +138,11 @@ impl Transaction { UpdatePropertiesAction::new() } + /// Creates an add fields action. + pub fn add_fields(&self, fields: Vec) -> AddFieldsAction { + AddFieldsAction::new(fields) + } + /// Creates a fast append action. pub fn fast_append(&self) -> FastAppendAction { FastAppendAction::new() diff --git a/crates/integration_tests/tests/add_fields_test.rs b/crates/integration_tests/tests/add_fields_test.rs new file mode 100644 index 0000000000..d2acc59f1f --- /dev/null +++ b/crates/integration_tests/tests/add_fields_test.rs @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for the `AddFieldsAction`. + +mod common; + +use std::sync::Arc; + +use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; +use common::{random_ns, test_schema}; +use futures::TryStreamExt; +use iceberg::transaction::{ApplyTransactionAction, Transaction}; +use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +use iceberg::writer::file_writer::ParquetWriterBuilder; +use iceberg::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, +}; +use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder; +use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; +use iceberg::{Catalog, CatalogBuilder, TableCreation}; +use iceberg_catalog_rest::RestCatalogBuilder; +use iceberg_integration_tests::get_test_fixture; +use parquet::arrow::arrow_reader::ArrowReaderOptions; +use parquet::file::properties::WriterProperties; + +/// Creates a table, appends data, adds a new field to the schema, +/// verifies existing data is still readable, then appends data with the new schema. +#[tokio::test] +async fn test_add_field() { + let fixture = get_test_fixture(); + let rest_catalog = RestCatalogBuilder::default() + .load("rest", fixture.catalog_config.clone()) + .await + .unwrap(); + let ns = random_ns().await; + let schema = test_schema(); + + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(schema.clone()) + .build(); + + let table = rest_catalog + .create_table(ns.name(), table_creation) + .await + .unwrap(); + + // Create the writer and write initial data + let arrow_schema: Arc = Arc::new( + table + .metadata() + .current_schema() + .as_ref() + .try_into() + .unwrap(), + ); + let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); + let file_name_generator = DefaultFileNameGenerator::new( + "test".to_string(), + None, + iceberg::spec::DataFileFormat::Parquet, + ); + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + table.metadata().current_schema().clone(), + ); + let rolling_file_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + parquet_writer_builder, + table.file_io().clone(), + location_generator.clone(), + file_name_generator.clone(), + ); + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_file_writer_builder); + let mut data_file_writer = data_file_writer_builder.build(None).await.unwrap(); + let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); + let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); + let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]); + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![ + Arc::new(col1) as ArrayRef, + Arc::new(col2) as ArrayRef, + Arc::new(col3) as ArrayRef, + ]) + .unwrap(); + data_file_writer.write(batch.clone()).await.unwrap(); + let data_file = data_file_writer.close().await.unwrap(); + + // Check parquet file schema has the expected field IDs + let content = table + .file_io() + .new_input(data_file[0].file_path()) + .unwrap() + .read() + .await + .unwrap(); + let parquet_reader = parquet::arrow::arrow_reader::ArrowReaderMetadata::load( + &content, + ArrowReaderOptions::default(), + ) + .unwrap(); + let field_ids: Vec = parquet_reader + .parquet_schema() + .columns() + .iter() + .map(|col| col.self_type().get_basic_info().id()) + .collect(); + assert_eq!(field_ids, vec![1, 2, 3]); + + // Commit the initial data + let tx = Transaction::new(&table); + let append_action = tx.fast_append().add_data_files(data_file.clone()); + let tx = append_action.apply(tx).unwrap(); + let table = tx.commit(&rest_catalog).await.unwrap(); + + // Verify the initial data is readable + let batch_stream = table + .scan() + .select_all() + .build() + .unwrap() + .to_arrow() + .await + .unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0], batch); + + // Add a new optional field to the table + let tx = Transaction::new(&table); + let add_action = tx.add_fields(vec![iceberg::spec::NestedFieldRef::new( + iceberg::spec::NestedField::optional( + 4, + "a", + iceberg::spec::Type::Primitive(iceberg::spec::PrimitiveType::Int), + ), + )]); + let tx = add_action.apply(tx).unwrap(); + let table = tx.commit(&rest_catalog).await.unwrap(); + + // Verify existing data is still readable after schema evolution + let batch_stream = table + .scan() + .select_all() + .build() + .unwrap() + .to_arrow() + .await + .unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0], batch); + + // Create a new writer with the evolved schema and write data including the new field + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + table.metadata().current_schema().clone(), + ); + let rolling_file_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + parquet_writer_builder, + table.file_io().clone(), + location_generator.clone(), + file_name_generator.clone(), + ); + let data_file_writer_builder = DataFileWriterBuilder::new(rolling_file_writer_builder); + let mut data_file_writer = data_file_writer_builder.build(None).await.unwrap(); + let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); + let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); + let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]); + let col4 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); + let evolved_arrow_schema: Arc = Arc::new( + table + .metadata() + .current_schema() + .as_ref() + .try_into() + .unwrap(), + ); + let batch_with_new_field = RecordBatch::try_new(evolved_arrow_schema.clone(), vec![ + Arc::new(col1) as ArrayRef, + Arc::new(col2) as ArrayRef, + Arc::new(col3) as ArrayRef, + Arc::new(col4) as ArrayRef, + ]) + .unwrap(); + data_file_writer + .write(batch_with_new_field.clone()) + .await + .unwrap(); + let data_file = data_file_writer.close().await.unwrap(); + + // Commit the new data with evolved schema + let tx = Transaction::new(&table); + let append_action = tx.fast_append().add_data_files(data_file.clone()); + let tx = append_action.apply(tx).unwrap(); + let _table = tx.commit(&rest_catalog).await.unwrap(); +}