diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 840d1a5f16..50ac065f3e 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -27,7 +27,7 @@ use itertools::Itertools; use parquet::arrow::AsyncArrowWriter; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter; -use parquet::file::metadata::ParquetMetaData; +use parquet::file::metadata::{KeyValue, ParquetMetaData}; use parquet::file::properties::WriterProperties; use parquet::file::statistics::Statistics; @@ -46,6 +46,11 @@ use crate::transform::create_transform_function; use crate::writer::{CurrentFileStatus, DataFile}; use crate::{Error, ErrorKind, Result}; +/// Parquet footer metadata key under which the Iceberg table schema is stored, +/// as JSON. Matches iceberg-java's `Parquet.java`, allowing engines that read +/// the schema from the file footer to interpret iceberg-rust files (see #2184). +const ICEBERG_SCHEMA_PARQUET_KEY: &str = "iceberg.schema"; + /// ParquetWriterBuilder is used to builder a [`ParquetWriter`] #[derive(Clone, Debug)] pub struct ParquetWriterBuilder { @@ -492,7 +497,7 @@ impl FileWriter for ParquetWriter { let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?); let inner_writer = self.output_file.writer().await?; let async_writer = AsyncFileWriter::new(inner_writer); - let writer = AsyncArrowWriter::try_new( + let mut writer = AsyncArrowWriter::try_new( async_writer, arrow_schema.clone(), Some(self.writer_properties.clone()), @@ -501,6 +506,23 @@ impl FileWriter for ParquetWriter { Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.") .with_source(err) })?; + + // Embed the Iceberg schema as Parquet footer metadata under the + // `iceberg.schema` key, matching iceberg-java (`Parquet.java`) so + // engines like Snowflake that resolve the schema from the footer can + // read files written by iceberg-rust (see #2184). + let schema_json = serde_json::to_string(self.schema.as_ref()).map_err(|err| { + Error::new( + ErrorKind::Unexpected, + "Failed to serialize Iceberg schema to JSON for Parquet footer.", + ) + .with_source(err) + })?; + writer.append_key_value_metadata(KeyValue::new( + ICEBERG_SCHEMA_PARQUET_KEY.to_string(), + schema_json, + )); + self.inner_writer = Some(writer); self.inner_writer.as_mut().unwrap() }; @@ -861,6 +883,77 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_parquet_writer_embeds_iceberg_schema_in_footer() -> Result<()> { + // The Parquet footer must carry the Iceberg schema as JSON under the + // `iceberg.schema` key, matching iceberg-java, so engines that resolve + // the schema from the footer can read iceberg-rust files (see #2184). + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIO::new_with_fs(); + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_str().unwrap().to_string(), + ); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + let iceberg_schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema: ArrowSchemaRef = Arc::new(iceberg_schema.as_ref().try_into().unwrap()); + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![ + Arc::new(Int64Array::from_iter_values(0..3)) as ArrayRef, + Arc::new(arrow_array::StringArray::from(vec!["a", "b", "c"])) as ArrayRef, + ]) + .unwrap(); + + let output_file = file_io.new_output( + location_gen.generate_location(None, &file_name_gen.generate_file_name()), + )?; + let file_path = output_file.location().to_string(); + + let mut pw = + ParquetWriterBuilder::new(WriterProperties::builder().build(), iceberg_schema.clone()) + .build(output_file) + .await?; + pw.write(&batch).await?; + pw.close().await?; + + // Read the footer back and locate the `iceberg.schema` key-value entry. + let input_file = file_io.new_input(&file_path)?; + let file_read = input_file.reader().await?; + let file_metadata = input_file.metadata().await?; + let mut parquet_reader = ArrowFileReader::new(file_metadata, file_read); + let parquet_metadata = parquet_reader.get_metadata(None).await.unwrap(); + + let kv = parquet_metadata + .file_metadata() + .key_value_metadata() + .expect("footer should carry key-value metadata") + .iter() + .find(|kv| kv.key == ICEBERG_SCHEMA_PARQUET_KEY) + .expect("footer should contain an `iceberg.schema` entry"); + + let schema_json = kv + .value + .as_ref() + .expect("`iceberg.schema` value is present"); + let parsed: Schema = serde_json::from_str(schema_json).unwrap(); + assert_eq!( + parsed, *iceberg_schema, + "`iceberg.schema` footer value must round-trip to the written schema" + ); + + Ok(()) + } + #[tokio::test] async fn test_parquet_writer_with_complex_schema() -> Result<()> { let temp_dir = TempDir::new().unwrap();