Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 95 additions & 2 deletions crates/iceberg/src/writer/file_writer/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use itertools::Itertools;
use parquet::arrow::AsyncArrowWriter;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::metadata::{KeyValue, ParquetMetaData};
use parquet::file::properties::WriterProperties;
use parquet::file::statistics::Statistics;

Expand All @@ -46,6 +46,11 @@ use crate::transform::create_transform_function;
use crate::writer::{CurrentFileStatus, DataFile};
use crate::{Error, ErrorKind, Result};

/// Parquet footer metadata key under which the Iceberg table schema is stored,
/// as JSON. Matches iceberg-java's `Parquet.java`, allowing engines that read
/// the schema from the file footer to interpret iceberg-rust files (see #2184).
const ICEBERG_SCHEMA_PARQUET_KEY: &str = "iceberg.schema";

/// ParquetWriterBuilder is used to builder a [`ParquetWriter`]
#[derive(Clone, Debug)]
pub struct ParquetWriterBuilder {
Expand Down Expand Up @@ -492,7 +497,7 @@ impl FileWriter for ParquetWriter {
let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?);
let inner_writer = self.output_file.writer().await?;
let async_writer = AsyncFileWriter::new(inner_writer);
let writer = AsyncArrowWriter::try_new(
let mut writer = AsyncArrowWriter::try_new(
async_writer,
arrow_schema.clone(),
Some(self.writer_properties.clone()),
Expand All @@ -501,6 +506,23 @@ impl FileWriter for ParquetWriter {
Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.")
.with_source(err)
})?;

// Embed the Iceberg schema as Parquet footer metadata under the
// `iceberg.schema` key, matching iceberg-java (`Parquet.java`) so
// engines like Snowflake that resolve the schema from the footer can
// read files written by iceberg-rust (see #2184).
let schema_json = serde_json::to_string(self.schema.as_ref()).map_err(|err| {
Error::new(
ErrorKind::Unexpected,
"Failed to serialize Iceberg schema to JSON for Parquet footer.",
)
.with_source(err)
})?;
writer.append_key_value_metadata(KeyValue::new(
ICEBERG_SCHEMA_PARQUET_KEY.to_string(),
schema_json,
));

self.inner_writer = Some(writer);
self.inner_writer.as_mut().unwrap()
};
Expand Down Expand Up @@ -861,6 +883,77 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_parquet_writer_embeds_iceberg_schema_in_footer() -> Result<()> {
// The Parquet footer must carry the Iceberg schema as JSON under the
// `iceberg.schema` key, matching iceberg-java, so engines that resolve
// the schema from the footer can read iceberg-rust files (see #2184).
let temp_dir = TempDir::new().unwrap();
let file_io = FileIO::new_with_fs();
let location_gen = DefaultLocationGenerator::with_data_location(
temp_dir.path().to_str().unwrap().to_string(),
);
let file_name_gen =
DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);

let iceberg_schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
])
.build()
.unwrap(),
);

let arrow_schema: ArrowSchemaRef = Arc::new(iceberg_schema.as_ref().try_into().unwrap());
let batch = RecordBatch::try_new(arrow_schema.clone(), vec![
Arc::new(Int64Array::from_iter_values(0..3)) as ArrayRef,
Arc::new(arrow_array::StringArray::from(vec!["a", "b", "c"])) as ArrayRef,
])
.unwrap();

let output_file = file_io.new_output(
location_gen.generate_location(None, &file_name_gen.generate_file_name()),
)?;
let file_path = output_file.location().to_string();

let mut pw =
ParquetWriterBuilder::new(WriterProperties::builder().build(), iceberg_schema.clone())
.build(output_file)
.await?;
pw.write(&batch).await?;
pw.close().await?;

// Read the footer back and locate the `iceberg.schema` key-value entry.
let input_file = file_io.new_input(&file_path)?;
let file_read = input_file.reader().await?;
let file_metadata = input_file.metadata().await?;
let mut parquet_reader = ArrowFileReader::new(file_metadata, file_read);
let parquet_metadata = parquet_reader.get_metadata(None).await.unwrap();

let kv = parquet_metadata
.file_metadata()
.key_value_metadata()
.expect("footer should carry key-value metadata")
.iter()
.find(|kv| kv.key == ICEBERG_SCHEMA_PARQUET_KEY)
.expect("footer should contain an `iceberg.schema` entry");

let schema_json = kv
.value
.as_ref()
.expect("`iceberg.schema` value is present");
let parsed: Schema = serde_json::from_str(schema_json).unwrap();
assert_eq!(
parsed, *iceberg_schema,
"`iceberg.schema` footer value must round-trip to the written schema"
);

Ok(())
}

#[tokio::test]
async fn test_parquet_writer_with_complex_schema() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
Expand Down
Loading