Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/iceberg/public-api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2007,6 +2007,7 @@ pub fn iceberg::spec::ManifestFile::has_deleted_files(&self) -> bool
pub fn iceberg::spec::ManifestFile::has_existing_files(&self) -> bool
impl iceberg::spec::ManifestFile
pub async fn iceberg::spec::ManifestFile::load_manifest(&self, file_io: &iceberg::io::FileIO) -> iceberg::Result<iceberg::spec::Manifest>
pub async fn iceberg::spec::ManifestFile::load_manifest_with(&self, file_io: &iceberg::io::FileIO, table_metadata: core::option::Option<&iceberg::spec::TableMetadataRef>) -> iceberg::Result<iceberg::spec::Manifest>
impl core::clone::Clone for iceberg::spec::ManifestFile
pub fn iceberg::spec::ManifestFile::clone(&self) -> iceberg::spec::ManifestFile
impl core::cmp::Eq for iceberg::spec::ManifestFile
Expand Down Expand Up @@ -2052,6 +2053,7 @@ impl iceberg::spec::ManifestMetadata
pub fn iceberg::spec::ManifestMetadata::content(&self) -> &iceberg::spec::ManifestContentType
pub fn iceberg::spec::ManifestMetadata::format_version(&self) -> &iceberg::spec::FormatVersion
pub fn iceberg::spec::ManifestMetadata::parse(meta: &std::collections::hash::map::HashMap<alloc::string::String, alloc::vec::Vec<u8>>) -> iceberg::Result<Self>
pub fn iceberg::spec::ManifestMetadata::parse_with(meta: &std::collections::hash::map::HashMap<alloc::string::String, alloc::vec::Vec<u8>>, table_metadata: core::option::Option<&iceberg::spec::TableMetadataRef>) -> iceberg::Result<Self>
pub fn iceberg::spec::ManifestMetadata::partition_spec(&self) -> &iceberg::spec::PartitionSpec
pub fn iceberg::spec::ManifestMetadata::schema(&self) -> &iceberg::spec::SchemaRef
pub fn iceberg::spec::ManifestMetadata::schema_id(&self) -> iceberg::spec::SchemaId
Expand Down
112 changes: 104 additions & 8 deletions crates/iceberg/src/io/object_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,14 @@ impl ObjectCache {

/// Retrieves an Arc [`Manifest`] from the cache
/// or retrieves one from FileIO and parses it if not present
pub(crate) async fn get_manifest(&self, manifest_file: &ManifestFile) -> Result<Arc<Manifest>> {
pub(crate) async fn get_manifest(
&self,
manifest_file: &ManifestFile,
table_metadata: &TableMetadataRef,
) -> Result<Arc<Manifest>> {
if self.cache_disabled {
return manifest_file
.load_manifest(&self.file_io)
.load_manifest_with(&self.file_io, Some(table_metadata))
.await
.map(Arc::new);
}
Expand All @@ -110,7 +114,7 @@ impl ObjectCache {
let cache_entry = self
.cache
.entry_by_ref(&key)
.or_try_insert_with(self.fetch_and_parse_manifest(manifest_file))
.or_try_insert_with(self.fetch_and_parse_manifest(manifest_file, table_metadata))
.await
.map_err(|err| {
Error::new(
Expand Down Expand Up @@ -179,8 +183,14 @@ impl ObjectCache {
}
}

async fn fetch_and_parse_manifest(&self, manifest_file: &ManifestFile) -> Result<CachedItem> {
let manifest = manifest_file.load_manifest(&self.file_io).await?;
async fn fetch_and_parse_manifest(
&self,
manifest_file: &ManifestFile,
table_metadata: &TableMetadataRef,
) -> Result<CachedItem> {
let manifest = manifest_file
.load_manifest_with(&self.file_io, Some(table_metadata))
.await?;

Ok(CachedItem::Manifest(Arc::new(manifest)))
}
Expand Down Expand Up @@ -358,7 +368,10 @@ mod tests {
assert_eq!(result_manifest_list.entries().len(), 1);

let manifest_file = result_manifest_list.entries().first().unwrap();
let result_manifest = object_cache.get_manifest(manifest_file).await.unwrap();
let result_manifest = object_cache
.get_manifest(manifest_file, &fixture.table.metadata_ref())
.await
.unwrap();

assert_eq!(
result_manifest
Expand Down Expand Up @@ -405,7 +418,10 @@ mod tests {
let manifest_file = result_manifest_list.entries().first().unwrap();

// not in cache
let result_manifest = object_cache.get_manifest(manifest_file).await.unwrap();
let result_manifest = object_cache
.get_manifest(manifest_file, &fixture.table.metadata_ref())
.await
.unwrap();

assert_eq!(
result_manifest
Expand All @@ -420,7 +436,10 @@ mod tests {
);

// retrieve cached version
let result_manifest = object_cache.get_manifest(manifest_file).await.unwrap();
let result_manifest = object_cache
.get_manifest(manifest_file, &fixture.table.metadata_ref())
.await
.unwrap();

assert_eq!(
result_manifest
Expand All @@ -434,4 +453,81 @@ mod tests {
"1.parquet"
);
}

#[test]
fn test_manifest_metadata_parse_prefers_table_metadata_over_bad_schema() {
use std::collections::HashMap;

use crate::spec::ManifestMetadata;

let fixture = TableTestFixture::new();
let table_metadata = fixture.table.metadata_ref();
let schema_id = table_metadata.current_schema().schema_id();
let spec_id = table_metadata.default_partition_spec().spec_id();

// Manifest key-value metadata whose `schema` value is non-conformant
// (as written by some engines, e.g. duckdb-iceberg, which serialize the
// manifest_entry Avro schema there using Avro type names like `array`),
// but whose `schema-id` / `partition-spec-id` are valid.
let mut meta: HashMap<String, Vec<u8>> = HashMap::new();
meta.insert("schema-id".to_string(), schema_id.to_string().into_bytes());
meta.insert(
"partition-spec-id".to_string(),
spec_id.to_string().into_bytes(),
);
meta.insert("format-version".to_string(), b"2".to_vec());
meta.insert("content".to_string(), b"data".to_vec());
meta.insert(
"schema".to_string(),
br#"{"type":"struct","schema-id":0,"fields":[{"id":1,"name":"x","required":true,"type":{"type":"array","items":"int"}}]}"#
.to_vec(),
);

// Parsing from the manifest's own metadata rejects the non-conformant schema.
assert!(ManifestMetadata::parse(&meta).is_err());

// With table metadata available, the authoritative schema/spec are used
// (looked up by id) and the manifest's `schema` key is not parsed.
let parsed = ManifestMetadata::parse_with(&meta, Some(&table_metadata)).unwrap();
assert_eq!(parsed.schema.schema_id(), schema_id);
assert_eq!(parsed.partition_spec.spec_id(), spec_id);
}

#[test]
fn test_manifest_metadata_parse_self_describes_when_ids_not_recorded() {
use std::collections::HashMap;

use crate::spec::{ManifestMetadata, Type};

let fixture = TableTestFixture::new();
let table_metadata = fixture.table.metadata_ref();

// A manifest written WITHOUT `schema-id` / `partition-spec-id` keys
// (some writers omit them). Its self-described schema — a single long
// column that does NOT match the table's schemas — must win: assuming
// the default id 0 and looking that up in the table metadata would
// mistype this manifest's column bounds.
let mut meta: HashMap<String, Vec<u8>> = HashMap::new();
meta.insert("format-version".to_string(), b"2".to_vec());
meta.insert("content".to_string(), b"data".to_vec());
meta.insert(
"schema".to_string(),
br#"{"type":"struct","schema-id":0,"fields":[{"id":1,"name":"foo","required":false,"type":"long"}]}"#
.to_vec(),
);
meta.insert("partition-spec".to_string(), b"[]".to_vec());

let parsed = ManifestMetadata::parse_with(&meta, Some(&table_metadata)).unwrap();
let field = parsed.schema.field_by_id(1).unwrap();
assert_eq!(field.name, "foo");
assert_eq!(
*field.field_type,
Type::Primitive(crate::spec::PrimitiveType::Long)
);
assert_ne!(
parsed.schema.as_ref().as_struct(),
table_metadata.current_schema().as_struct(),
"must not silently adopt a table schema the manifest never referenced"
);
}
}
7 changes: 6 additions & 1 deletion crates/iceberg/src/scan/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub(crate) struct ManifestFileContext {
bound_predicates: Option<Arc<BoundPredicates>>,
object_cache: Arc<ObjectCache>,
snapshot_schema: SchemaRef,
table_metadata: TableMetadataRef,
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
delete_file_index: DeleteFileIndex,
name_mapping: Option<Arc<NameMapping>>,
Expand Down Expand Up @@ -74,6 +75,7 @@ impl ManifestFileContext {
manifest_file,
bound_predicates,
snapshot_schema,
table_metadata,
field_ids,
mut sender,
expression_evaluator_cache,
Expand All @@ -82,7 +84,9 @@ impl ManifestFileContext {
case_sensitive,
} = self;

let manifest = object_cache.get_manifest(&manifest_file).await?;
let manifest = object_cache
.get_manifest(&manifest_file, &table_metadata)
.await?;

for manifest_entry in manifest.entries() {
let manifest_entry_context = ManifestEntryContext {
Expand Down Expand Up @@ -279,6 +283,7 @@ impl PlanContext {
sender,
object_cache: self.object_cache.clone(),
snapshot_schema: self.snapshot_schema.clone(),
table_metadata: self.table_metadata.clone(),
field_ids: self.field_ids.clone(),
expression_evaluator_cache: self.expression_evaluator_cache.clone(),
delete_file_index,
Expand Down
151 changes: 103 additions & 48 deletions crates/iceberg/src/spec/manifest/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use typed_builder::TypedBuilder;

use super::{FormatVersion, ManifestContentType, PartitionSpec, Schema};
use crate::error::Result;
use crate::spec::{PartitionField, SchemaId, SchemaRef};
use crate::spec::{PartitionField, SchemaId, SchemaRef, TableMetadataRef};
use crate::{Error, ErrorKind};

/// Meta data of a manifest that is stored in the key-value metadata of the Avro file
Expand All @@ -44,22 +44,40 @@ pub struct ManifestMetadata {
impl ManifestMetadata {
/// Parse from metadata in avro file.
pub fn parse(meta: &HashMap<String, Vec<u8>>) -> Result<Self> {
let schema = Arc::new({
let bs = meta.get("schema").ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
"schema is required in manifest metadata but not found",
)
})?;
serde_json::from_slice::<Schema>(bs).map_err(|err| {
Error::new(
ErrorKind::DataInvalid,
"Fail to parse schema in manifest metadata",
)
.with_source(err)
})?
});
let schema_id: i32 = meta
Self::parse_with(meta, None)
}

/// Parse from the avro file's key-value metadata, preferring the table
/// metadata's schema and partition spec (looked up by the manifest's
/// **recorded** `schema-id` / `partition-spec-id`) over the manifest's
/// self-described `schema` / `partition-spec` keys.
///
/// A manifest's embedded `schema` key is redundant with the authoritative
/// table metadata, and some writers (e.g. duckdb-iceberg) store a
/// non-conformant value there (the manifest_entry Avro record schema rather
/// than the Iceberg table schema). When the manifest records a `schema-id`
/// and `table_metadata` contains that schema, the table's schema is used
/// and the manifest's own `schema` key is not parsed — mirroring
/// iceberg-java's `ManifestReader(specsById)`, whose reading of the schema
/// from manifest file metadata is deprecated. The same applies to
/// `partition-spec-id` and the partition spec.
///
/// The lookup happens ONLY for ids the manifest actually records. A writer
/// that omits the `schema-id` key (some engines do) may have written the
/// manifest under any historical schema — assuming the default id 0 would
/// mistype column bounds after a type promotion (e.g. 8-byte long bounds
/// decoded as int), so the manifest's self-described schema is the only
/// reliable description of its bytes and is parsed instead. When
/// `table_metadata` is `None` (or does not contain a recorded id) the
/// manifest's own metadata is likewise parsed, preserving the previous
/// self-describing behaviour.
pub fn parse_with(
meta: &HashMap<String, Vec<u8>>,
table_metadata: Option<&TableMetadataRef>,
) -> Result<Self> {
// `None` when the writer omitted the key — deliberately NOT defaulted
// before the table-metadata lookup below.
let recorded_schema_id: Option<i32> = meta
.get("schema-id")
.map(|bs| {
String::from_utf8_lossy(bs).parse().map_err(|err| {
Expand All @@ -70,42 +88,21 @@ impl ManifestMetadata {
.with_source(err)
})
})
.transpose()?
.unwrap_or(0);
let partition_spec = {
let fields = {
let bs = meta.get("partition-spec").ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
"partition-spec is required in manifest metadata but not found",
)
})?;
serde_json::from_slice::<Vec<PartitionField>>(bs).map_err(|err| {
.transpose()?;
let recorded_spec_id: Option<i32> = meta
.get("partition-spec-id")
.map(|bs| {
String::from_utf8_lossy(bs).parse().map_err(|err| {
Error::new(
ErrorKind::DataInvalid,
"Fail to parse partition spec in manifest metadata",
"Fail to parse partition spec id in manifest metadata",
)
.with_source(err)
})?
};
let spec_id = meta
.get("partition-spec-id")
.map(|bs| {
String::from_utf8_lossy(bs).parse().map_err(|err| {
Error::new(
ErrorKind::DataInvalid,
"Fail to parse partition spec id in manifest metadata",
)
.with_source(err)
})
})
.transpose()?
.unwrap_or(0);
PartitionSpec::builder(schema.clone())
.with_spec_id(spec_id)
.add_unbound_fields(fields.into_iter().map(|f| f.into_unbound()))?
.build()?
};
})
.transpose()?;
let schema_id = recorded_schema_id.unwrap_or(0);
let spec_id = recorded_spec_id.unwrap_or(0);
let format_version = if let Some(bs) = meta.get("format-version") {
serde_json::from_slice::<FormatVersion>(bs).map_err(|err| {
Error::new(
Expand All @@ -123,6 +120,64 @@ impl ManifestMetadata {
} else {
ManifestContentType::Data
};

// Prefer the authoritative table schema + partition spec when the
// manifest RECORDS the ids and the table metadata contains them,
// bypassing the manifest's redundant (and sometimes non-conformant)
// `schema` / `partition-spec` metadata keys. Manifests that omit the
// ids stay on the self-describing path below.
if let Some(table_metadata) = table_metadata
&& let (Some(schema), Some(partition_spec)) = (
recorded_schema_id.and_then(|id| table_metadata.schema_by_id(id)),
recorded_spec_id.and_then(|id| table_metadata.partition_spec_by_id(id)),
)
{
return Ok(ManifestMetadata {
schema: schema.clone(),
schema_id,
partition_spec: partition_spec.as_ref().clone(),
format_version,
content,
});
}

// Fallback: parse the schema + partition spec from the manifest's own
// key-value metadata (the manifest is self-describing).
let schema = Arc::new({
let bs = meta.get("schema").ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
"schema is required in manifest metadata but not found",
)
})?;
serde_json::from_slice::<Schema>(bs).map_err(|err| {
Error::new(
ErrorKind::DataInvalid,
"Fail to parse schema in manifest metadata",
)
.with_source(err)
})?
});
let fields = {
let bs = meta.get("partition-spec").ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
"partition-spec is required in manifest metadata but not found",
)
})?;
serde_json::from_slice::<Vec<PartitionField>>(bs).map_err(|err| {
Error::new(
ErrorKind::DataInvalid,
"Fail to parse partition spec in manifest metadata",
)
.with_source(err)
})?
};
let partition_spec = PartitionSpec::builder(schema.clone())
.with_spec_id(spec_id)
.add_unbound_fields(fields.into_iter().map(|f| f.into_unbound()))?
.build()?;

Ok(ManifestMetadata {
schema,
schema_id,
Expand Down
Loading
Loading