From 26fd1b935a53d877f01f8da615510c706c240bae Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 27 Jun 2026 09:19:46 -0700 Subject: [PATCH] feat(writer): embed iceberg.schema in Parquet footer metadata Engines such as Snowflake resolve an Iceberg table's schema from the `iceberg.schema` key in a Parquet file's footer key-value metadata. iceberg-rust did not write this key, so Parquet files it produced (or files produced by nimtable compaction on top of it) were rejected by those engines. Write the Iceberg schema as JSON under the `iceberg.schema` footer key when the Parquet writer is initialized, matching iceberg-java (`Parquet.java`). The value is the same schema JSON that appears in table metadata's `schemas`, produced via `serde_json::to_string`. Scope is the Parquet writer only. iceberg-java writes the same key from its Avro writer too, but in iceberg-rust the Avro writer produces manifests (metadata), not the data files these engines query, so that is left as a follow-up. Closes #2184 --- .../src/writer/file_writer/parquet_writer.rs | 97 ++++++++++++++++++- 1 file changed, 95 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 840d1a5f16..50ac065f3e 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -27,7 +27,7 @@ use itertools::Itertools; use parquet::arrow::AsyncArrowWriter; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter; -use parquet::file::metadata::ParquetMetaData; +use parquet::file::metadata::{KeyValue, ParquetMetaData}; use parquet::file::properties::WriterProperties; use parquet::file::statistics::Statistics; @@ -46,6 +46,11 @@ use crate::transform::create_transform_function; use crate::writer::{CurrentFileStatus, DataFile}; use crate::{Error, ErrorKind, Result}; +/// Parquet footer metadata key under which the Iceberg table schema is stored, +/// as JSON. Matches iceberg-java's `Parquet.java`, allowing engines that read +/// the schema from the file footer to interpret iceberg-rust files (see #2184). +const ICEBERG_SCHEMA_PARQUET_KEY: &str = "iceberg.schema"; + /// ParquetWriterBuilder is used to builder a [`ParquetWriter`] #[derive(Clone, Debug)] pub struct ParquetWriterBuilder { @@ -492,7 +497,7 @@ impl FileWriter for ParquetWriter { let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?); let inner_writer = self.output_file.writer().await?; let async_writer = AsyncFileWriter::new(inner_writer); - let writer = AsyncArrowWriter::try_new( + let mut writer = AsyncArrowWriter::try_new( async_writer, arrow_schema.clone(), Some(self.writer_properties.clone()), @@ -501,6 +506,23 @@ impl FileWriter for ParquetWriter { Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.") .with_source(err) })?; + + // Embed the Iceberg schema as Parquet footer metadata under the + // `iceberg.schema` key, matching iceberg-java (`Parquet.java`) so + // engines like Snowflake that resolve the schema from the footer can + // read files written by iceberg-rust (see #2184). + let schema_json = serde_json::to_string(self.schema.as_ref()).map_err(|err| { + Error::new( + ErrorKind::Unexpected, + "Failed to serialize Iceberg schema to JSON for Parquet footer.", + ) + .with_source(err) + })?; + writer.append_key_value_metadata(KeyValue::new( + ICEBERG_SCHEMA_PARQUET_KEY.to_string(), + schema_json, + )); + self.inner_writer = Some(writer); self.inner_writer.as_mut().unwrap() }; @@ -861,6 +883,77 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_parquet_writer_embeds_iceberg_schema_in_footer() -> Result<()> { + // The Parquet footer must carry the Iceberg schema as JSON under the + // `iceberg.schema` key, matching iceberg-java, so engines that resolve + // the schema from the footer can read iceberg-rust files (see #2184). + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIO::new_with_fs(); + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_str().unwrap().to_string(), + ); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + let iceberg_schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema: ArrowSchemaRef = Arc::new(iceberg_schema.as_ref().try_into().unwrap()); + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![ + Arc::new(Int64Array::from_iter_values(0..3)) as ArrayRef, + Arc::new(arrow_array::StringArray::from(vec!["a", "b", "c"])) as ArrayRef, + ]) + .unwrap(); + + let output_file = file_io.new_output( + location_gen.generate_location(None, &file_name_gen.generate_file_name()), + )?; + let file_path = output_file.location().to_string(); + + let mut pw = + ParquetWriterBuilder::new(WriterProperties::builder().build(), iceberg_schema.clone()) + .build(output_file) + .await?; + pw.write(&batch).await?; + pw.close().await?; + + // Read the footer back and locate the `iceberg.schema` key-value entry. + let input_file = file_io.new_input(&file_path)?; + let file_read = input_file.reader().await?; + let file_metadata = input_file.metadata().await?; + let mut parquet_reader = ArrowFileReader::new(file_metadata, file_read); + let parquet_metadata = parquet_reader.get_metadata(None).await.unwrap(); + + let kv = parquet_metadata + .file_metadata() + .key_value_metadata() + .expect("footer should carry key-value metadata") + .iter() + .find(|kv| kv.key == ICEBERG_SCHEMA_PARQUET_KEY) + .expect("footer should contain an `iceberg.schema` entry"); + + let schema_json = kv + .value + .as_ref() + .expect("`iceberg.schema` value is present"); + let parsed: Schema = serde_json::from_str(schema_json).unwrap(); + assert_eq!( + parsed, *iceberg_schema, + "`iceberg.schema` footer value must round-trip to the written schema" + ); + + Ok(()) + } + #[tokio::test] async fn test_parquet_writer_with_complex_schema() -> Result<()> { let temp_dir = TempDir::new().unwrap();