From e1107a513a4dd43c3d412da8f6858efb6c4eae96 Mon Sep 17 00:00:00 2001 From: nathanmetzger Date: Thu, 29 Jan 2026 19:46:30 +0100 Subject: [PATCH] feat(inspect): add core metadata tables (refs, history, metadata_log_entries, files, all_manifests, all_files) Implement 10 new metadata table types for the inspect module: - Group A (metadata-only): RefsTable, HistoryTable, MetadataLogEntriesTable - Group B (file-level): FilesTable, DataFilesTable, DeleteFilesTable - Group C (all-snapshot): AllManifestsTable, AllFilesTable, AllDataFilesTable, AllDeleteFilesTable Shared infrastructure in files.rs handles dynamic partition structs, content filtering, and manifest deduplication across snapshots. DataFusion integration updated with all new variants. Part of #823. --- crates/iceberg/src/inspect/all_files.rs | 683 +++++++++ crates/iceberg/src/inspect/all_manifests.rs | 409 ++++++ crates/iceberg/src/inspect/files.rs | 1253 +++++++++++++++++ crates/iceberg/src/inspect/history.rs | 159 +++ .../src/inspect/metadata_log_entries.rs | 157 +++ crates/iceberg/src/inspect/metadata_table.rs | 96 +- crates/iceberg/src/inspect/mod.rs | 12 + crates/iceberg/src/inspect/refs.rs | 178 +++ crates/iceberg/src/scan/mod.rs | 23 + .../datafusion/src/table/metadata_table.rs | 22 + .../tests/integration_datafusion_test.rs | 10 + .../testdata/slts/df_test/show_tables.slt | 20 + 12 files changed, 3021 insertions(+), 1 deletion(-) create mode 100644 crates/iceberg/src/inspect/all_files.rs create mode 100644 crates/iceberg/src/inspect/all_manifests.rs create mode 100644 crates/iceberg/src/inspect/files.rs create mode 100644 crates/iceberg/src/inspect/history.rs create mode 100644 crates/iceberg/src/inspect/metadata_log_entries.rs create mode 100644 crates/iceberg/src/inspect/refs.rs diff --git a/crates/iceberg/src/inspect/all_files.rs b/crates/iceberg/src/inspect/all_files.rs new file mode 100644 index 0000000000..a981d75436 --- /dev/null +++ b/crates/iceberg/src/inspect/all_files.rs @@ -0,0 +1,683 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::files::{ContentFilter, files_schema, scan_all_files}; +use crate::Result; +use crate::scan::ArrowRecordBatchStream; +use crate::table::Table; + +/// All files metadata table. +/// +/// Shows all data and delete files from all unique manifests across all snapshots. +pub struct AllFilesTable<'a> { + table: &'a Table, +} + +/// All data files metadata table. +/// +/// Shows only data files from all unique manifests across all snapshots. +pub struct AllDataFilesTable<'a> { + table: &'a Table, +} + +/// All delete files metadata table. +/// +/// Shows only delete files from all unique manifests across all snapshots. +pub struct AllDeleteFilesTable<'a> { + table: &'a Table, +} + +impl<'a> AllFilesTable<'a> { + /// Create a new AllFiles table instance. + pub fn new(table: &'a Table) -> Self { + Self { table } + } + + /// Returns the iceberg schema of the all files table. + pub fn schema(&self) -> crate::spec::Schema { + files_schema(self.table.metadata().default_partition_type()) + } + + /// Scans the all files table. + pub async fn scan(&self) -> Result { + scan_all_files(self.table, ContentFilter::All).await + } +} + +impl<'a> AllDataFilesTable<'a> { + /// Create a new AllDataFiles table instance. + pub fn new(table: &'a Table) -> Self { + Self { table } + } + + /// Returns the iceberg schema of the all data files table. + pub fn schema(&self) -> crate::spec::Schema { + files_schema(self.table.metadata().default_partition_type()) + } + + /// Scans the all data files table. + pub async fn scan(&self) -> Result { + scan_all_files(self.table, ContentFilter::DataOnly).await + } +} + +impl<'a> AllDeleteFilesTable<'a> { + /// Create a new AllDeleteFiles table instance. + pub fn new(table: &'a Table) -> Self { + Self { table } + } + + /// Returns the iceberg schema of the all delete files table. + pub fn schema(&self) -> crate::spec::Schema { + files_schema(self.table.metadata().default_partition_type()) + } + + /// Scans the all delete files table. + pub async fn scan(&self) -> Result { + scan_all_files(self.table, ContentFilter::DeletesOnly).await + } +} + +#[cfg(test)] +mod tests { + use expect_test::expect; + use futures::TryStreamExt; + + use crate::scan::tests::TableTestFixture; + use crate::test_utils::check_record_batches; + + #[tokio::test] + async fn test_all_files_table() { + let mut fixture = TableTestFixture::new(); + fixture.setup_all_snapshot_manifest_files().await; + + let batch_stream = fixture.table.inspect().all_files().scan().await.unwrap(); + + check_record_batches( + batch_stream.try_collect::>().await.unwrap(), + expect![[r#" + Field { "content": Int32, metadata: {"PARQUET:field_id": "134"} }, + Field { "file_path": Utf8, metadata: {"PARQUET:field_id": "100"} }, + Field { "file_format": Utf8, metadata: {"PARQUET:field_id": "101"} }, + Field { "partition": Struct("x": Int64, metadata: {"PARQUET:field_id": "1000"}), metadata: {"PARQUET:field_id": "102"} }, + Field { "record_count": Int64, metadata: {"PARQUET:field_id": "103"} }, + Field { "file_size_in_bytes": Int64, metadata: {"PARQUET:field_id": "104"} }, + Field { "column_sizes": nullable Map("key_value": non-null Struct("key": non-null Int32, metadata: {"PARQUET:field_id": "201"}, "value": Int64, metadata: {"PARQUET:field_id": "202"}), unsorted), metadata: {"PARQUET:field_id": "108"} }, + Field { "value_counts": nullable Map("key_value": non-null Struct("key": non-null Int32, metadata: {"PARQUET:field_id": "203"}, "value": Int64, metadata: {"PARQUET:field_id": "204"}), unsorted), metadata: {"PARQUET:field_id": "109"} }, + Field { "null_value_counts": nullable Map("key_value": non-null Struct("key": non-null Int32, metadata: {"PARQUET:field_id": "205"}, "value": Int64, metadata: {"PARQUET:field_id": "206"}), unsorted), metadata: {"PARQUET:field_id": "110"} }, + Field { "nan_value_counts": nullable Map("key_value": non-null Struct("key": non-null Int32, metadata: {"PARQUET:field_id": "207"}, "value": Int64, metadata: {"PARQUET:field_id": "208"}), unsorted), metadata: {"PARQUET:field_id": "137"} }, + Field { "lower_bounds": nullable Map("key_value": non-null Struct("key": non-null Int32, metadata: {"PARQUET:field_id": "209"}, "value": LargeBinary, metadata: {"PARQUET:field_id": "210"}), unsorted), metadata: {"PARQUET:field_id": "125"} }, + Field { "upper_bounds": nullable Map("key_value": non-null Struct("key": non-null Int32, metadata: {"PARQUET:field_id": "211"}, "value": LargeBinary, metadata: {"PARQUET:field_id": "212"}), unsorted), metadata: {"PARQUET:field_id": "128"} }, + Field { "key_metadata": nullable LargeBinary, metadata: {"PARQUET:field_id": "131"} }, + Field { "split_offsets": nullable List(Int64, field: 'element', metadata: {"PARQUET:field_id": "213"}), metadata: {"PARQUET:field_id": "132"} }, + Field { "equality_ids": nullable List(Int32, field: 'element', metadata: {"PARQUET:field_id": "214"}), metadata: {"PARQUET:field_id": "135"} }, + Field { "sort_order_id": nullable Int32, metadata: {"PARQUET:field_id": "140"} }, + Field { "spec_id": nullable Int32, metadata: {"PARQUET:field_id": "141"} }"#]], + expect![[r#" + content: PrimitiveArray + [ + 0, + 0, + ], + file_path: (skipped), + file_format: StringArray + [ + "parquet", + "parquet", + ], + partition: StructArray + -- validity: + [ + valid, + valid, + ] + [ + -- child 0: "x" (Int64) + PrimitiveArray + [ + 100, + 300, + ] + ], + record_count: PrimitiveArray + [ + 1, + 1, + ], + file_size_in_bytes: PrimitiveArray + [ + 100, + 100, + ], + column_sizes: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + ], + value_counts: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + ], + null_value_counts: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + ], + nan_value_counts: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + ], + lower_bounds: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (LargeBinary) + LargeBinaryArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (LargeBinary) + LargeBinaryArray + [ + ] + ], + ], + upper_bounds: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (LargeBinary) + LargeBinaryArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (LargeBinary) + LargeBinaryArray + [ + ] + ], + ], + key_metadata: LargeBinaryArray + [ + null, + null, + ], + split_offsets: ListArray + [ + null, + null, + ], + equality_ids: ListArray + [ + null, + null, + ], + sort_order_id: PrimitiveArray + [ + null, + null, + ], + spec_id: PrimitiveArray + [ + 0, + 0, + ]"#]], + &["file_path"], + Some("file_path"), + ); + } + + #[tokio::test] + async fn test_all_data_files_table() { + let mut fixture = TableTestFixture::new(); + fixture.setup_all_snapshot_manifest_files().await; + + let batch_stream = fixture + .table + .inspect() + .all_data_files() + .scan() + .await + .unwrap(); + + check_record_batches( + batch_stream.try_collect::>().await.unwrap(), + expect![[r#" + Field { "content": Int32, metadata: {"PARQUET:field_id": "134"} }, + Field { "file_path": Utf8, metadata: {"PARQUET:field_id": "100"} }, + Field { "file_format": Utf8, metadata: {"PARQUET:field_id": "101"} }, + Field { "partition": Struct("x": Int64, metadata: {"PARQUET:field_id": "1000"}), metadata: {"PARQUET:field_id": "102"} }, + Field { "record_count": Int64, metadata: {"PARQUET:field_id": "103"} }, + Field { "file_size_in_bytes": Int64, metadata: {"PARQUET:field_id": "104"} }, + Field { "column_sizes": nullable Map("key_value": non-null Struct("key": non-null Int32, metadata: {"PARQUET:field_id": "201"}, "value": Int64, metadata: {"PARQUET:field_id": "202"}), unsorted), metadata: {"PARQUET:field_id": "108"} }, + Field { "value_counts": nullable Map("key_value": non-null Struct("key": non-null Int32, metadata: {"PARQUET:field_id": "203"}, "value": Int64, metadata: {"PARQUET:field_id": "204"}), unsorted), metadata: {"PARQUET:field_id": "109"} }, + Field { "null_value_counts": nullable Map("key_value": non-null Struct("key": non-null Int32, metadata: {"PARQUET:field_id": "205"}, "value": Int64, metadata: {"PARQUET:field_id": "206"}), unsorted), metadata: {"PARQUET:field_id": "110"} }, + Field { "nan_value_counts": nullable Map("key_value": non-null Struct("key": non-null Int32, metadata: {"PARQUET:field_id": "207"}, "value": Int64, metadata: {"PARQUET:field_id": "208"}), unsorted), metadata: {"PARQUET:field_id": "137"} }, + Field { "lower_bounds": nullable Map("key_value": non-null Struct("key": non-null Int32, metadata: {"PARQUET:field_id": "209"}, "value": LargeBinary, metadata: {"PARQUET:field_id": "210"}), unsorted), metadata: {"PARQUET:field_id": "125"} }, + Field { "upper_bounds": nullable Map("key_value": non-null Struct("key": non-null Int32, metadata: {"PARQUET:field_id": "211"}, "value": LargeBinary, metadata: {"PARQUET:field_id": "212"}), unsorted), metadata: {"PARQUET:field_id": "128"} }, + Field { "key_metadata": nullable LargeBinary, metadata: {"PARQUET:field_id": "131"} }, + Field { "split_offsets": nullable List(Int64, field: 'element', metadata: {"PARQUET:field_id": "213"}), metadata: {"PARQUET:field_id": "132"} }, + Field { "equality_ids": nullable List(Int32, field: 'element', metadata: {"PARQUET:field_id": "214"}), metadata: {"PARQUET:field_id": "135"} }, + Field { "sort_order_id": nullable Int32, metadata: {"PARQUET:field_id": "140"} }, + Field { "spec_id": nullable Int32, metadata: {"PARQUET:field_id": "141"} }"#]], + expect![[r#" + content: PrimitiveArray + [ + 0, + 0, + ], + file_path: (skipped), + file_format: StringArray + [ + "parquet", + "parquet", + ], + partition: StructArray + -- validity: + [ + valid, + valid, + ] + [ + -- child 0: "x" (Int64) + PrimitiveArray + [ + 100, + 300, + ] + ], + record_count: PrimitiveArray + [ + 1, + 1, + ], + file_size_in_bytes: PrimitiveArray + [ + 100, + 100, + ], + column_sizes: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + ], + value_counts: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + ], + null_value_counts: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + ], + nan_value_counts: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + ], + lower_bounds: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (LargeBinary) + LargeBinaryArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (LargeBinary) + LargeBinaryArray + [ + ] + ], + ], + upper_bounds: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (LargeBinary) + LargeBinaryArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (LargeBinary) + LargeBinaryArray + [ + ] + ], + ], + key_metadata: LargeBinaryArray + [ + null, + null, + ], + split_offsets: ListArray + [ + null, + null, + ], + equality_ids: ListArray + [ + null, + null, + ], + sort_order_id: PrimitiveArray + [ + null, + null, + ], + spec_id: PrimitiveArray + [ + 0, + 0, + ]"#]], + &["file_path"], + Some("file_path"), + ); + } + + #[tokio::test] + async fn test_all_delete_files_table_empty() { + let mut fixture = TableTestFixture::new(); + fixture.setup_all_snapshot_manifest_files().await; + + let batch_stream = fixture + .table + .inspect() + .all_delete_files() + .scan() + .await + .unwrap(); + + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + assert_eq!(batches[0].num_rows(), 0); + } +} diff --git a/crates/iceberg/src/inspect/all_manifests.rs b/crates/iceberg/src/inspect/all_manifests.rs new file mode 100644 index 0000000000..213e0cd80c --- /dev/null +++ b/crates/iceberg/src/inspect/all_manifests.rs @@ -0,0 +1,409 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_array::RecordBatch; +use arrow_array::builder::{ + BooleanBuilder, GenericListBuilder, ListBuilder, PrimitiveBuilder, StringBuilder, StructBuilder, +}; +use arrow_array::types::{Int32Type, Int64Type}; +use arrow_schema::{DataType, Field, Fields}; +use futures::{StreamExt, stream}; + +use crate::Result; +use crate::arrow::schema_to_arrow_schema; +use crate::scan::ArrowRecordBatchStream; +use crate::spec::{Datum, FieldSummary, ListType, NestedField, PrimitiveType, StructType, Type}; +use crate::table::Table; + +/// All manifests metadata table. +/// +/// Shows all manifests from all snapshots with the referencing snapshot ID. +pub struct AllManifestsTable<'a> { + table: &'a Table, +} + +impl<'a> AllManifestsTable<'a> { + /// Create a new AllManifests table instance. + pub fn new(table: &'a Table) -> Self { + Self { table } + } + + /// Returns the iceberg schema of the all manifests table. + /// + /// Same as the manifests table schema plus a `reference_snapshot_id` field. + pub fn schema(&self) -> crate::spec::Schema { + let fields = vec![ + NestedField::new(14, "content", Type::Primitive(PrimitiveType::Int), true), + NestedField::new(1, "path", Type::Primitive(PrimitiveType::String), true), + NestedField::new(2, "length", Type::Primitive(PrimitiveType::Long), true), + NestedField::new( + 3, + "partition_spec_id", + Type::Primitive(PrimitiveType::Int), + true, + ), + NestedField::new( + 4, + "added_snapshot_id", + Type::Primitive(PrimitiveType::Long), + true, + ), + NestedField::new( + 5, + "added_data_files_count", + Type::Primitive(PrimitiveType::Int), + true, + ), + NestedField::new( + 6, + "existing_data_files_count", + Type::Primitive(PrimitiveType::Int), + true, + ), + NestedField::new( + 7, + "deleted_data_files_count", + Type::Primitive(PrimitiveType::Int), + true, + ), + NestedField::new( + 15, + "added_delete_files_count", + Type::Primitive(PrimitiveType::Int), + true, + ), + NestedField::new( + 16, + "existing_delete_files_count", + Type::Primitive(PrimitiveType::Int), + true, + ), + NestedField::new( + 17, + "deleted_delete_files_count", + Type::Primitive(PrimitiveType::Int), + true, + ), + NestedField::new( + 8, + "partition_summaries", + Type::List(ListType { + element_field: Arc::new(NestedField::new( + 9, + "item", + Type::Struct(StructType::new(vec![ + Arc::new(NestedField::new( + 10, + "contains_null", + Type::Primitive(PrimitiveType::Boolean), + true, + )), + Arc::new(NestedField::new( + 11, + "contains_nan", + Type::Primitive(PrimitiveType::Boolean), + false, + )), + Arc::new(NestedField::new( + 12, + "lower_bound", + Type::Primitive(PrimitiveType::String), + false, + )), + Arc::new(NestedField::new( + 13, + "upper_bound", + Type::Primitive(PrimitiveType::String), + false, + )), + ])), + true, + )), + }), + true, + ), + NestedField::new( + 18, + "reference_snapshot_id", + Type::Primitive(PrimitiveType::Long), + true, + ), + ]; + + crate::spec::Schema::builder() + .with_fields(fields.into_iter().map(|f| f.into())) + .build() + .unwrap() + } + + /// Scans the all manifests table. + pub async fn scan(&self) -> Result { + let schema = schema_to_arrow_schema(&self.schema())?; + + let mut content = PrimitiveBuilder::::new(); + let mut path = StringBuilder::new(); + let mut length = PrimitiveBuilder::::new(); + let mut partition_spec_id = PrimitiveBuilder::::new(); + let mut added_snapshot_id = PrimitiveBuilder::::new(); + let mut added_data_files_count = PrimitiveBuilder::::new(); + let mut existing_data_files_count = PrimitiveBuilder::::new(); + let mut deleted_data_files_count = PrimitiveBuilder::::new(); + let mut added_delete_files_count = PrimitiveBuilder::::new(); + let mut existing_delete_files_count = PrimitiveBuilder::::new(); + let mut deleted_delete_files_count = PrimitiveBuilder::::new(); + let mut partition_summaries = self.partition_summary_builder()?; + let mut reference_snapshot_id = PrimitiveBuilder::::new(); + + for snapshot in self.table.metadata().snapshots() { + let manifest_list = snapshot + .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) + .await?; + for manifest in manifest_list.entries() { + content.append_value(manifest.content as i32); + path.append_value(manifest.manifest_path.clone()); + length.append_value(manifest.manifest_length); + partition_spec_id.append_value(manifest.partition_spec_id); + added_snapshot_id.append_value(manifest.added_snapshot_id); + added_data_files_count.append_value(manifest.added_files_count.unwrap_or(0) as i32); + existing_data_files_count + .append_value(manifest.existing_files_count.unwrap_or(0) as i32); + deleted_data_files_count + .append_value(manifest.deleted_files_count.unwrap_or(0) as i32); + added_delete_files_count + .append_value(manifest.added_files_count.unwrap_or(0) as i32); + existing_delete_files_count + .append_value(manifest.existing_files_count.unwrap_or(0) as i32); + deleted_delete_files_count + .append_value(manifest.deleted_files_count.unwrap_or(0) as i32); + + let spec = self + .table + .metadata() + .partition_spec_by_id(manifest.partition_spec_id) + .unwrap(); + let spec_struct = spec + .partition_type(self.table.metadata().current_schema()) + .unwrap(); + self.append_partition_summaries( + &mut partition_summaries, + &manifest.partitions.clone().unwrap_or_default(), + spec_struct, + ); + + reference_snapshot_id.append_value(snapshot.snapshot_id()); + } + } + + let batch = RecordBatch::try_new(Arc::new(schema), vec![ + Arc::new(content.finish()), + Arc::new(path.finish()), + Arc::new(length.finish()), + Arc::new(partition_spec_id.finish()), + Arc::new(added_snapshot_id.finish()), + Arc::new(added_data_files_count.finish()), + Arc::new(existing_data_files_count.finish()), + Arc::new(deleted_data_files_count.finish()), + Arc::new(added_delete_files_count.finish()), + Arc::new(existing_delete_files_count.finish()), + Arc::new(deleted_delete_files_count.finish()), + Arc::new(partition_summaries.finish()), + Arc::new(reference_snapshot_id.finish()), + ])?; + Ok(stream::iter(vec![Ok(batch)]).boxed()) + } + + fn partition_summary_builder(&self) -> Result> { + let schema = schema_to_arrow_schema(&self.schema())?; + let partition_summary_fields = + match schema.field_with_name("partition_summaries")?.data_type() { + DataType::List(list_type) => match list_type.data_type() { + DataType::Struct(fields) => fields.to_vec(), + _ => unreachable!(), + }, + _ => unreachable!(), + }; + + let partition_summaries = ListBuilder::new(StructBuilder::from_fields( + Fields::from(partition_summary_fields.clone()), + 0, + )) + .with_field(Arc::new( + Field::new_struct("item", partition_summary_fields, false).with_metadata( + HashMap::from([("PARQUET:field_id".to_string(), "9".to_string())]), + ), + )); + + Ok(partition_summaries) + } + + fn append_partition_summaries( + &self, + builder: &mut GenericListBuilder, + partitions: &[FieldSummary], + partition_struct: StructType, + ) { + let partition_summaries_builder = builder.values(); + for (summary, field) in partitions.iter().zip(partition_struct.fields()) { + partition_summaries_builder + .field_builder::(0) + .unwrap() + .append_value(summary.contains_null); + partition_summaries_builder + .field_builder::(1) + .unwrap() + .append_option(summary.contains_nan); + + partition_summaries_builder + .field_builder::(2) + .unwrap() + .append_option(summary.lower_bound.as_ref().map(|v| { + Datum::try_from_bytes(v, field.field_type.as_primitive_type().unwrap().clone()) + .unwrap() + .to_string() + })); + partition_summaries_builder + .field_builder::(3) + .unwrap() + .append_option(summary.upper_bound.as_ref().map(|v| { + Datum::try_from_bytes(v, field.field_type.as_primitive_type().unwrap().clone()) + .unwrap() + .to_string() + })); + partition_summaries_builder.append(true); + } + builder.append(true); + } +} + +#[cfg(test)] +mod tests { + use expect_test::expect; + use futures::TryStreamExt; + + use crate::scan::tests::TableTestFixture; + use crate::test_utils::check_record_batches; + + #[tokio::test] + async fn test_all_manifests_table() { + let mut fixture = TableTestFixture::new(); + fixture.setup_all_snapshot_manifest_files().await; + + let record_batch = fixture + .table + .inspect() + .all_manifests() + .scan() + .await + .unwrap(); + + check_record_batches( + record_batch.try_collect::>().await.unwrap(), + expect![[r#" + Field { "content": Int32, metadata: {"PARQUET:field_id": "14"} }, + Field { "path": Utf8, metadata: {"PARQUET:field_id": "1"} }, + Field { "length": Int64, metadata: {"PARQUET:field_id": "2"} }, + Field { "partition_spec_id": Int32, metadata: {"PARQUET:field_id": "3"} }, + Field { "added_snapshot_id": Int64, metadata: {"PARQUET:field_id": "4"} }, + Field { "added_data_files_count": Int32, metadata: {"PARQUET:field_id": "5"} }, + Field { "existing_data_files_count": Int32, metadata: {"PARQUET:field_id": "6"} }, + Field { "deleted_data_files_count": Int32, metadata: {"PARQUET:field_id": "7"} }, + Field { "added_delete_files_count": Int32, metadata: {"PARQUET:field_id": "15"} }, + Field { "existing_delete_files_count": Int32, metadata: {"PARQUET:field_id": "16"} }, + Field { "deleted_delete_files_count": Int32, metadata: {"PARQUET:field_id": "17"} }, + Field { "partition_summaries": List(non-null Struct("contains_null": non-null Boolean, metadata: {"PARQUET:field_id": "10"}, "contains_nan": Boolean, metadata: {"PARQUET:field_id": "11"}, "lower_bound": Utf8, metadata: {"PARQUET:field_id": "12"}, "upper_bound": Utf8, metadata: {"PARQUET:field_id": "13"}), metadata: {"PARQUET:field_id": "9"}), metadata: {"PARQUET:field_id": "8"} }, + Field { "reference_snapshot_id": Int64, metadata: {"PARQUET:field_id": "18"} }"#]], + expect![[r#" + content: PrimitiveArray + [ + 0, + ], + path: (skipped), + length: (skipped), + partition_spec_id: PrimitiveArray + [ + 0, + ], + added_snapshot_id: PrimitiveArray + [ + 3055729675574597004, + ], + added_data_files_count: PrimitiveArray + [ + 1, + ], + existing_data_files_count: PrimitiveArray + [ + 1, + ], + deleted_data_files_count: PrimitiveArray + [ + 1, + ], + added_delete_files_count: PrimitiveArray + [ + 1, + ], + existing_delete_files_count: PrimitiveArray + [ + 1, + ], + deleted_delete_files_count: PrimitiveArray + [ + 1, + ], + partition_summaries: ListArray + [ + StructArray + -- validity: + [ + valid, + ] + [ + -- child 0: "contains_null" (Boolean) + BooleanArray + [ + false, + ] + -- child 1: "contains_nan" (Boolean) + BooleanArray + [ + false, + ] + -- child 2: "lower_bound" (Utf8) + StringArray + [ + "100", + ] + -- child 3: "upper_bound" (Utf8) + StringArray + [ + "300", + ] + ], + ], + reference_snapshot_id: PrimitiveArray + [ + 3055729675574597004, + ]"#]], + &["path", "length"], + Some("reference_snapshot_id"), + ); + } +} diff --git a/crates/iceberg/src/inspect/files.rs b/crates/iceberg/src/inspect/files.rs new file mode 100644 index 0000000000..305123b06d --- /dev/null +++ b/crates/iceberg/src/inspect/files.rs @@ -0,0 +1,1253 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_array::RecordBatch; +use arrow_array::builder::{ + Int32Builder, Int64Builder, LargeBinaryBuilder, ListBuilder, MapBuilder, MapFieldNames, + PrimitiveBuilder, StringBuilder, StructBuilder, +}; +use arrow_array::types::Int64Type; +use arrow_schema::{DataType, Field, Fields}; +use futures::{StreamExt, stream}; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + +use crate::Result; +use crate::arrow::schema_to_arrow_schema; +use crate::scan::ArrowRecordBatchStream; +use crate::spec::{ + DataContentType, ListType, MapType, NestedField, PrimitiveType, StructType, Type, +}; +use crate::table::Table; + +/// Files metadata table. +/// +/// Shows all data and delete files referenced by the current snapshot. +pub struct FilesTable<'a> { + table: &'a Table, +} + +/// Data files metadata table. +/// +/// Shows only data files referenced by the current snapshot. +pub struct DataFilesTable<'a> { + table: &'a Table, +} + +/// Delete files metadata table. +/// +/// Shows only delete files referenced by the current snapshot. +pub struct DeleteFilesTable<'a> { + table: &'a Table, +} + +/// Content type filter for file-level metadata tables. +#[derive(Debug, Clone, Copy)] +pub(crate) enum ContentFilter { + /// All files (data + delete). + All, + /// Data files only. + DataOnly, + /// Delete files only (position + equality). + DeletesOnly, +} + +impl<'a> FilesTable<'a> { + /// Create a new Files table instance. + pub fn new(table: &'a Table) -> Self { + Self { table } + } + + /// Returns the iceberg schema of the files table. + pub fn schema(&self) -> crate::spec::Schema { + files_schema(self.table.metadata().default_partition_type()) + } + + /// Scans the files table. + pub async fn scan(&self) -> Result { + scan_files(self.table, ContentFilter::All).await + } +} + +impl<'a> DataFilesTable<'a> { + /// Create a new DataFiles table instance. + pub fn new(table: &'a Table) -> Self { + Self { table } + } + + /// Returns the iceberg schema of the data files table. + pub fn schema(&self) -> crate::spec::Schema { + files_schema(self.table.metadata().default_partition_type()) + } + + /// Scans the data files table. + pub async fn scan(&self) -> Result { + scan_files(self.table, ContentFilter::DataOnly).await + } +} + +impl<'a> DeleteFilesTable<'a> { + /// Create a new DeleteFiles table instance. + pub fn new(table: &'a Table) -> Self { + Self { table } + } + + /// Returns the iceberg schema of the delete files table. + pub fn schema(&self) -> crate::spec::Schema { + files_schema(self.table.metadata().default_partition_type()) + } + + /// Scans the delete files table. + pub async fn scan(&self) -> Result { + scan_files(self.table, ContentFilter::DeletesOnly).await + } +} + +/// Build the Iceberg schema for file-level metadata tables. +pub(crate) fn files_schema(partition_type: &StructType) -> crate::spec::Schema { + let partition_fields: Vec> = partition_type + .fields() + .iter() + .map(|f| { + Arc::new(NestedField::optional( + f.id, + f.name.as_str(), + (*f.field_type).clone(), + )) + }) + .collect(); + + let fields = vec![ + NestedField::required(134, "content", Type::Primitive(PrimitiveType::Int)), + NestedField::required(100, "file_path", Type::Primitive(PrimitiveType::String)), + NestedField::required(101, "file_format", Type::Primitive(PrimitiveType::String)), + NestedField::required( + 102, + "partition", + Type::Struct(StructType::new(partition_fields)), + ), + NestedField::required(103, "record_count", Type::Primitive(PrimitiveType::Long)), + NestedField::required( + 104, + "file_size_in_bytes", + Type::Primitive(PrimitiveType::Long), + ), + NestedField::optional( + 108, + "column_sizes", + Type::Map(MapType { + key_field: Arc::new(NestedField::map_key_element( + 201, + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::map_value_element( + 202, + Type::Primitive(PrimitiveType::Long), + false, + )), + }), + ), + NestedField::optional( + 109, + "value_counts", + Type::Map(MapType { + key_field: Arc::new(NestedField::map_key_element( + 203, + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::map_value_element( + 204, + Type::Primitive(PrimitiveType::Long), + false, + )), + }), + ), + NestedField::optional( + 110, + "null_value_counts", + Type::Map(MapType { + key_field: Arc::new(NestedField::map_key_element( + 205, + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::map_value_element( + 206, + Type::Primitive(PrimitiveType::Long), + false, + )), + }), + ), + NestedField::optional( + 137, + "nan_value_counts", + Type::Map(MapType { + key_field: Arc::new(NestedField::map_key_element( + 207, + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::map_value_element( + 208, + Type::Primitive(PrimitiveType::Long), + false, + )), + }), + ), + NestedField::optional( + 125, + "lower_bounds", + Type::Map(MapType { + key_field: Arc::new(NestedField::map_key_element( + 209, + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::map_value_element( + 210, + Type::Primitive(PrimitiveType::Binary), + false, + )), + }), + ), + NestedField::optional( + 128, + "upper_bounds", + Type::Map(MapType { + key_field: Arc::new(NestedField::map_key_element( + 211, + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::map_value_element( + 212, + Type::Primitive(PrimitiveType::Binary), + false, + )), + }), + ), + NestedField::optional(131, "key_metadata", Type::Primitive(PrimitiveType::Binary)), + NestedField::optional( + 132, + "split_offsets", + Type::List(ListType { + element_field: Arc::new(NestedField::list_element( + 213, + Type::Primitive(PrimitiveType::Long), + false, + )), + }), + ), + NestedField::optional( + 135, + "equality_ids", + Type::List(ListType { + element_field: Arc::new(NestedField::list_element( + 214, + Type::Primitive(PrimitiveType::Int), + false, + )), + }), + ), + NestedField::optional(140, "sort_order_id", Type::Primitive(PrimitiveType::Int)), + NestedField::optional(141, "spec_id", Type::Primitive(PrimitiveType::Int)), + ]; + crate::spec::Schema::builder() + .with_fields(fields.into_iter().map(|f| f.into())) + .build() + .unwrap() +} + +/// Shared scan implementation for file-level metadata tables. +async fn scan_files(table: &Table, filter: ContentFilter) -> Result { + let partition_type = table.metadata().default_partition_type(); + let schema = schema_to_arrow_schema(&files_schema(partition_type))?; + + let mut content = Int32Builder::new(); + let mut file_path = StringBuilder::new(); + let mut file_format = StringBuilder::new(); + let mut partition_builder = build_partition_struct_builder(&schema)?; + let mut record_count = Int64Builder::new(); + let mut file_size_in_bytes = Int64Builder::new(); + let mut column_sizes = new_int_long_map_builder(201, 202); + let mut value_counts = new_int_long_map_builder(203, 204); + let mut null_value_counts = new_int_long_map_builder(205, 206); + let mut nan_value_counts = new_int_long_map_builder(207, 208); + let mut lower_bounds = new_int_binary_map_builder(209, 210); + let mut upper_bounds = new_int_binary_map_builder(211, 212); + let mut key_metadata = LargeBinaryBuilder::new(); + let mut split_offsets = ListBuilder::new(PrimitiveBuilder::::new()).with_field( + Arc::new(Field::new("element", DataType::Int64, true).with_metadata(field_id_meta(213))), + ); + let mut equality_ids = ListBuilder::new(Int32Builder::new()).with_field(Arc::new( + Field::new("element", DataType::Int32, true).with_metadata(field_id_meta(214)), + )); + let mut sort_order_id = Int32Builder::new(); + let mut spec_id = Int32Builder::new(); + + if let Some(snapshot) = table.metadata().current_snapshot() { + let manifest_list = snapshot + .load_manifest_list(table.file_io(), &table.metadata_ref()) + .await?; + + for manifest_file in manifest_list.entries() { + let manifest = manifest_file.load_manifest(table.file_io()).await?; + for entry in manifest.entries() { + if !entry.is_alive() { + continue; + } + + let data_file = entry.data_file(); + let ct = data_file.content_type(); + + match filter { + ContentFilter::All => {} + ContentFilter::DataOnly => { + if ct != DataContentType::Data { + continue; + } + } + ContentFilter::DeletesOnly => { + if ct == DataContentType::Data { + continue; + } + } + } + + content.append_value(ct as i32); + file_path.append_value(data_file.file_path()); + file_format.append_value(data_file.file_format().to_string()); + append_partition_value( + &mut partition_builder, + data_file.partition(), + partition_type, + ); + record_count.append_value(data_file.record_count() as i64); + file_size_in_bytes.append_value(data_file.file_size_in_bytes() as i64); + + append_int_long_map(&mut column_sizes, data_file.column_sizes()); + append_int_long_map(&mut value_counts, data_file.value_counts()); + append_int_long_map(&mut null_value_counts, data_file.null_value_counts()); + append_int_long_map(&mut nan_value_counts, data_file.nan_value_counts()); + append_datum_map(&mut lower_bounds, data_file.lower_bounds())?; + append_datum_map(&mut upper_bounds, data_file.upper_bounds())?; + + match data_file.key_metadata() { + Some(km) => key_metadata.append_value(km), + None => key_metadata.append_null(), + } + + match data_file.split_offsets() { + Some(offsets) => { + let list_builder = split_offsets.values(); + for offset in offsets { + list_builder.append_value(*offset); + } + split_offsets.append(true); + } + None => split_offsets.append_null(), + } + + match data_file.equality_ids() { + Some(ids) => { + let list_builder = equality_ids.values(); + for id in ids { + list_builder.append_value(id); + } + equality_ids.append(true); + } + None => equality_ids.append_null(), + } + + sort_order_id.append_option(data_file.sort_order_id()); + spec_id.append_value(data_file.partition_spec_id); + } + } + } + + let batch = RecordBatch::try_new(Arc::new(schema), vec![ + Arc::new(content.finish()), + Arc::new(file_path.finish()), + Arc::new(file_format.finish()), + Arc::new(partition_builder.finish()), + Arc::new(record_count.finish()), + Arc::new(file_size_in_bytes.finish()), + Arc::new(column_sizes.finish()), + Arc::new(value_counts.finish()), + Arc::new(null_value_counts.finish()), + Arc::new(nan_value_counts.finish()), + Arc::new(lower_bounds.finish()), + Arc::new(upper_bounds.finish()), + Arc::new(key_metadata.finish()), + Arc::new(split_offsets.finish()), + Arc::new(equality_ids.finish()), + Arc::new(sort_order_id.finish()), + Arc::new(spec_id.finish()), + ])?; + + Ok(stream::iter(vec![Ok(batch)]).boxed()) +} + +fn field_id_meta(id: i32) -> HashMap { + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), id.to_string())]) +} + +fn new_int_long_map_builder( + key_field_id: i32, + value_field_id: i32, +) -> MapBuilder { + use crate::arrow::DEFAULT_MAP_FIELD_NAME; + use crate::spec::{MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME}; + + MapBuilder::new( + Some(MapFieldNames { + entry: DEFAULT_MAP_FIELD_NAME.to_string(), + key: MAP_KEY_FIELD_NAME.to_string(), + value: MAP_VALUE_FIELD_NAME.to_string(), + }), + Int32Builder::new(), + Int64Builder::new(), + ) + .with_keys_field(Arc::new( + Field::new(MAP_KEY_FIELD_NAME, DataType::Int32, false) + .with_metadata(field_id_meta(key_field_id)), + )) + .with_values_field(Arc::new( + Field::new(MAP_VALUE_FIELD_NAME, DataType::Int64, true) + .with_metadata(field_id_meta(value_field_id)), + )) +} + +fn new_int_binary_map_builder( + key_field_id: i32, + value_field_id: i32, +) -> MapBuilder { + use crate::arrow::DEFAULT_MAP_FIELD_NAME; + use crate::spec::{MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME}; + + MapBuilder::new( + Some(MapFieldNames { + entry: DEFAULT_MAP_FIELD_NAME.to_string(), + key: MAP_KEY_FIELD_NAME.to_string(), + value: MAP_VALUE_FIELD_NAME.to_string(), + }), + Int32Builder::new(), + LargeBinaryBuilder::new(), + ) + .with_keys_field(Arc::new( + Field::new(MAP_KEY_FIELD_NAME, DataType::Int32, false) + .with_metadata(field_id_meta(key_field_id)), + )) + .with_values_field(Arc::new( + Field::new(MAP_VALUE_FIELD_NAME, DataType::LargeBinary, true) + .with_metadata(field_id_meta(value_field_id)), + )) +} + +fn append_int_long_map( + builder: &mut MapBuilder, + map: &HashMap, +) { + for (&k, &v) in map { + builder.keys().append_value(k); + builder.values().append_value(v as i64); + } + builder.append(true).unwrap(); +} + +fn append_datum_map( + builder: &mut MapBuilder, + map: &HashMap, +) -> Result<()> { + for (&k, v) in map { + builder.keys().append_value(k); + builder.values().append_value(v.to_bytes()?); + } + builder.append(true).unwrap(); + Ok(()) +} + +fn build_partition_struct_builder(schema: &arrow_schema::Schema) -> Result { + let partition_fields = match schema.field_with_name("partition")?.data_type() { + DataType::Struct(fields) => fields.to_vec(), + _ => unreachable!(), + }; + Ok(StructBuilder::from_fields( + Fields::from(partition_fields), + 0, + )) +} + +fn append_partition_value( + builder: &mut StructBuilder, + partition: &crate::spec::Struct, + partition_type: &StructType, +) { + let fields = partition_type.fields(); + for (i, field) in fields.iter().enumerate() { + let literal: Option<&crate::spec::Literal> = + partition.fields().get(i).and_then(|opt| opt.as_ref()); + match &*field.field_type { + Type::Primitive(PrimitiveType::Long) => { + let b = builder + .field_builder::>(i) + .unwrap(); + match literal.and_then(|l| l.as_primitive_literal()) { + Some(crate::spec::PrimitiveLiteral::Long(v)) => b.append_value(v), + Some(crate::spec::PrimitiveLiteral::Int(v)) => b.append_value(v as i64), + _ => b.append_null(), + } + } + Type::Primitive(PrimitiveType::Int) => { + let b = builder.field_builder::(i).unwrap(); + match literal.and_then(|l| l.as_primitive_literal()) { + Some(crate::spec::PrimitiveLiteral::Int(v)) => b.append_value(v), + _ => b.append_null(), + } + } + Type::Primitive(PrimitiveType::String) => { + let b = builder.field_builder::(i).unwrap(); + match literal.and_then(|l| l.as_primitive_literal()) { + Some(crate::spec::PrimitiveLiteral::String(s)) => b.append_value(&s), + _ => b.append_null(), + } + } + _ => { + // For other partition types, we cannot easily append. + // This is a best-effort approach supporting common types. + } + } + } + builder.append(true); +} + +/// Scan files across all snapshots with manifest deduplication. +pub(crate) async fn scan_all_files( + table: &Table, + filter: ContentFilter, +) -> Result { + use std::collections::HashSet; + + let partition_type = table.metadata().default_partition_type(); + let schema = schema_to_arrow_schema(&files_schema(partition_type))?; + + let mut content = Int32Builder::new(); + let mut file_path = StringBuilder::new(); + let mut file_format = StringBuilder::new(); + let mut partition_builder = build_partition_struct_builder(&schema)?; + let mut record_count = Int64Builder::new(); + let mut file_size_in_bytes = Int64Builder::new(); + let mut column_sizes = new_int_long_map_builder(201, 202); + let mut value_counts = new_int_long_map_builder(203, 204); + let mut null_value_counts = new_int_long_map_builder(205, 206); + let mut nan_value_counts = new_int_long_map_builder(207, 208); + let mut lower_bounds = new_int_binary_map_builder(209, 210); + let mut upper_bounds = new_int_binary_map_builder(211, 212); + let mut key_metadata = LargeBinaryBuilder::new(); + let mut split_offsets = ListBuilder::new(PrimitiveBuilder::::new()).with_field( + Arc::new(Field::new("element", DataType::Int64, true).with_metadata(field_id_meta(213))), + ); + let mut equality_ids = ListBuilder::new(Int32Builder::new()).with_field(Arc::new( + Field::new("element", DataType::Int32, true).with_metadata(field_id_meta(214)), + )); + let mut sort_order_id = Int32Builder::new(); + let mut spec_id = Int32Builder::new(); + + let mut seen_manifests = HashSet::new(); + + for snapshot in table.metadata().snapshots() { + let manifest_list = snapshot + .load_manifest_list(table.file_io(), &table.metadata_ref()) + .await?; + + for manifest_file in manifest_list.entries() { + if !seen_manifests.insert(manifest_file.manifest_path.clone()) { + continue; + } + + let manifest = manifest_file.load_manifest(table.file_io()).await?; + for entry in manifest.entries() { + if !entry.is_alive() { + continue; + } + + let data_file = entry.data_file(); + let ct = data_file.content_type(); + + match filter { + ContentFilter::All => {} + ContentFilter::DataOnly => { + if ct != DataContentType::Data { + continue; + } + } + ContentFilter::DeletesOnly => { + if ct == DataContentType::Data { + continue; + } + } + } + + content.append_value(ct as i32); + file_path.append_value(data_file.file_path()); + file_format.append_value(data_file.file_format().to_string()); + append_partition_value( + &mut partition_builder, + data_file.partition(), + partition_type, + ); + record_count.append_value(data_file.record_count() as i64); + file_size_in_bytes.append_value(data_file.file_size_in_bytes() as i64); + + append_int_long_map(&mut column_sizes, data_file.column_sizes()); + append_int_long_map(&mut value_counts, data_file.value_counts()); + append_int_long_map(&mut null_value_counts, data_file.null_value_counts()); + append_int_long_map(&mut nan_value_counts, data_file.nan_value_counts()); + append_datum_map(&mut lower_bounds, data_file.lower_bounds())?; + append_datum_map(&mut upper_bounds, data_file.upper_bounds())?; + + match data_file.key_metadata() { + Some(km) => key_metadata.append_value(km), + None => key_metadata.append_null(), + } + + match data_file.split_offsets() { + Some(offsets) => { + let list_builder = split_offsets.values(); + for offset in offsets { + list_builder.append_value(*offset); + } + split_offsets.append(true); + } + None => split_offsets.append_null(), + } + + match data_file.equality_ids() { + Some(ids) => { + let list_builder = equality_ids.values(); + for id in ids { + list_builder.append_value(id); + } + equality_ids.append(true); + } + None => equality_ids.append_null(), + } + + sort_order_id.append_option(data_file.sort_order_id()); + spec_id.append_value(data_file.partition_spec_id); + } + } + } + + let batch = RecordBatch::try_new(Arc::new(schema), vec![ + Arc::new(content.finish()), + Arc::new(file_path.finish()), + Arc::new(file_format.finish()), + Arc::new(partition_builder.finish()), + Arc::new(record_count.finish()), + Arc::new(file_size_in_bytes.finish()), + Arc::new(column_sizes.finish()), + Arc::new(value_counts.finish()), + Arc::new(null_value_counts.finish()), + Arc::new(nan_value_counts.finish()), + Arc::new(lower_bounds.finish()), + Arc::new(upper_bounds.finish()), + Arc::new(key_metadata.finish()), + Arc::new(split_offsets.finish()), + Arc::new(equality_ids.finish()), + Arc::new(sort_order_id.finish()), + Arc::new(spec_id.finish()), + ])?; + + Ok(stream::iter(vec![Ok(batch)]).boxed()) +} + +#[cfg(test)] +mod tests { + use expect_test::expect; + use futures::TryStreamExt; + + use crate::scan::tests::TableTestFixture; + use crate::test_utils::check_record_batches; + + #[tokio::test] + async fn test_files_table() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + let batch_stream = fixture.table.inspect().files().scan().await.unwrap(); + + check_record_batches( + batch_stream.try_collect::>().await.unwrap(), + expect![[r#" + Field { "content": Int32, metadata: {"PARQUET:field_id": "134"} }, + Field { "file_path": Utf8, metadata: {"PARQUET:field_id": "100"} }, + Field { "file_format": Utf8, metadata: {"PARQUET:field_id": "101"} }, + Field { "partition": Struct("x": Int64, metadata: {"PARQUET:field_id": "1000"}), metadata: {"PARQUET:field_id": "102"} }, + Field { "record_count": Int64, metadata: {"PARQUET:field_id": "103"} }, + Field { "file_size_in_bytes": Int64, metadata: {"PARQUET:field_id": "104"} }, + Field { "column_sizes": nullable Map("key_value": non-null Struct("key": non-null Int32, metadata: {"PARQUET:field_id": "201"}, "value": Int64, metadata: {"PARQUET:field_id": "202"}), unsorted), metadata: {"PARQUET:field_id": "108"} }, + Field { "value_counts": nullable Map("key_value": non-null Struct("key": non-null Int32, metadata: {"PARQUET:field_id": "203"}, "value": Int64, metadata: {"PARQUET:field_id": "204"}), unsorted), metadata: {"PARQUET:field_id": "109"} }, + Field { "null_value_counts": nullable Map("key_value": non-null Struct("key": non-null Int32, metadata: {"PARQUET:field_id": "205"}, "value": Int64, metadata: {"PARQUET:field_id": "206"}), unsorted), metadata: {"PARQUET:field_id": "110"} }, + Field { "nan_value_counts": nullable Map("key_value": non-null Struct("key": non-null Int32, metadata: {"PARQUET:field_id": "207"}, "value": Int64, metadata: {"PARQUET:field_id": "208"}), unsorted), metadata: {"PARQUET:field_id": "137"} }, + Field { "lower_bounds": nullable Map("key_value": non-null Struct("key": non-null Int32, metadata: {"PARQUET:field_id": "209"}, "value": LargeBinary, metadata: {"PARQUET:field_id": "210"}), unsorted), metadata: {"PARQUET:field_id": "125"} }, + Field { "upper_bounds": nullable Map("key_value": non-null Struct("key": non-null Int32, metadata: {"PARQUET:field_id": "211"}, "value": LargeBinary, metadata: {"PARQUET:field_id": "212"}), unsorted), metadata: {"PARQUET:field_id": "128"} }, + Field { "key_metadata": nullable LargeBinary, metadata: {"PARQUET:field_id": "131"} }, + Field { "split_offsets": nullable List(Int64, field: 'element', metadata: {"PARQUET:field_id": "213"}), metadata: {"PARQUET:field_id": "132"} }, + Field { "equality_ids": nullable List(Int32, field: 'element', metadata: {"PARQUET:field_id": "214"}), metadata: {"PARQUET:field_id": "135"} }, + Field { "sort_order_id": nullable Int32, metadata: {"PARQUET:field_id": "140"} }, + Field { "spec_id": nullable Int32, metadata: {"PARQUET:field_id": "141"} }"#]], + expect![[r#" + content: PrimitiveArray + [ + 0, + 0, + ], + file_path: (skipped), + file_format: StringArray + [ + "parquet", + "parquet", + ], + partition: StructArray + -- validity: + [ + valid, + valid, + ] + [ + -- child 0: "x" (Int64) + PrimitiveArray + [ + 100, + 300, + ] + ], + record_count: PrimitiveArray + [ + 1, + 1, + ], + file_size_in_bytes: PrimitiveArray + [ + 100, + 100, + ], + column_sizes: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + ], + value_counts: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + ], + null_value_counts: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + ], + nan_value_counts: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + ], + lower_bounds: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (LargeBinary) + LargeBinaryArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (LargeBinary) + LargeBinaryArray + [ + ] + ], + ], + upper_bounds: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (LargeBinary) + LargeBinaryArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (LargeBinary) + LargeBinaryArray + [ + ] + ], + ], + key_metadata: LargeBinaryArray + [ + null, + null, + ], + split_offsets: ListArray + [ + null, + null, + ], + equality_ids: ListArray + [ + null, + null, + ], + sort_order_id: PrimitiveArray + [ + null, + null, + ], + spec_id: PrimitiveArray + [ + 0, + 0, + ]"#]], + &["file_path"], + Some("file_path"), + ); + } + + #[tokio::test] + async fn test_data_files_table() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + let batch_stream = fixture.table.inspect().data_files().scan().await.unwrap(); + + check_record_batches( + batch_stream.try_collect::>().await.unwrap(), + expect![[r#" + Field { "content": Int32, metadata: {"PARQUET:field_id": "134"} }, + Field { "file_path": Utf8, metadata: {"PARQUET:field_id": "100"} }, + Field { "file_format": Utf8, metadata: {"PARQUET:field_id": "101"} }, + Field { "partition": Struct("x": Int64, metadata: {"PARQUET:field_id": "1000"}), metadata: {"PARQUET:field_id": "102"} }, + Field { "record_count": Int64, metadata: {"PARQUET:field_id": "103"} }, + Field { "file_size_in_bytes": Int64, metadata: {"PARQUET:field_id": "104"} }, + Field { "column_sizes": nullable Map("key_value": non-null Struct("key": non-null Int32, metadata: {"PARQUET:field_id": "201"}, "value": Int64, metadata: {"PARQUET:field_id": "202"}), unsorted), metadata: {"PARQUET:field_id": "108"} }, + Field { "value_counts": nullable Map("key_value": non-null Struct("key": non-null Int32, metadata: {"PARQUET:field_id": "203"}, "value": Int64, metadata: {"PARQUET:field_id": "204"}), unsorted), metadata: {"PARQUET:field_id": "109"} }, + Field { "null_value_counts": nullable Map("key_value": non-null Struct("key": non-null Int32, metadata: {"PARQUET:field_id": "205"}, "value": Int64, metadata: {"PARQUET:field_id": "206"}), unsorted), metadata: {"PARQUET:field_id": "110"} }, + Field { "nan_value_counts": nullable Map("key_value": non-null Struct("key": non-null Int32, metadata: {"PARQUET:field_id": "207"}, "value": Int64, metadata: {"PARQUET:field_id": "208"}), unsorted), metadata: {"PARQUET:field_id": "137"} }, + Field { "lower_bounds": nullable Map("key_value": non-null Struct("key": non-null Int32, metadata: {"PARQUET:field_id": "209"}, "value": LargeBinary, metadata: {"PARQUET:field_id": "210"}), unsorted), metadata: {"PARQUET:field_id": "125"} }, + Field { "upper_bounds": nullable Map("key_value": non-null Struct("key": non-null Int32, metadata: {"PARQUET:field_id": "211"}, "value": LargeBinary, metadata: {"PARQUET:field_id": "212"}), unsorted), metadata: {"PARQUET:field_id": "128"} }, + Field { "key_metadata": nullable LargeBinary, metadata: {"PARQUET:field_id": "131"} }, + Field { "split_offsets": nullable List(Int64, field: 'element', metadata: {"PARQUET:field_id": "213"}), metadata: {"PARQUET:field_id": "132"} }, + Field { "equality_ids": nullable List(Int32, field: 'element', metadata: {"PARQUET:field_id": "214"}), metadata: {"PARQUET:field_id": "135"} }, + Field { "sort_order_id": nullable Int32, metadata: {"PARQUET:field_id": "140"} }, + Field { "spec_id": nullable Int32, metadata: {"PARQUET:field_id": "141"} }"#]], + expect![[r#" + content: PrimitiveArray + [ + 0, + 0, + ], + file_path: (skipped), + file_format: StringArray + [ + "parquet", + "parquet", + ], + partition: StructArray + -- validity: + [ + valid, + valid, + ] + [ + -- child 0: "x" (Int64) + PrimitiveArray + [ + 100, + 300, + ] + ], + record_count: PrimitiveArray + [ + 1, + 1, + ], + file_size_in_bytes: PrimitiveArray + [ + 100, + 100, + ], + column_sizes: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + ], + value_counts: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + ], + null_value_counts: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + ], + nan_value_counts: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (Int64) + PrimitiveArray + [ + ] + ], + ], + lower_bounds: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (LargeBinary) + LargeBinaryArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (LargeBinary) + LargeBinaryArray + [ + ] + ], + ], + upper_bounds: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (LargeBinary) + LargeBinaryArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "key" (Int32) + PrimitiveArray + [ + ] + -- child 1: "value" (LargeBinary) + LargeBinaryArray + [ + ] + ], + ], + key_metadata: LargeBinaryArray + [ + null, + null, + ], + split_offsets: ListArray + [ + null, + null, + ], + equality_ids: ListArray + [ + null, + null, + ], + sort_order_id: PrimitiveArray + [ + null, + null, + ], + spec_id: PrimitiveArray + [ + 0, + 0, + ]"#]], + &["file_path"], + Some("file_path"), + ); + } + + #[tokio::test] + async fn test_delete_files_table_empty() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + let batch_stream = fixture.table.inspect().delete_files().scan().await.unwrap(); + + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + assert_eq!(batches[0].num_rows(), 0); + } +} diff --git a/crates/iceberg/src/inspect/history.rs b/crates/iceberg/src/inspect/history.rs new file mode 100644 index 0000000000..7757bdf23e --- /dev/null +++ b/crates/iceberg/src/inspect/history.rs @@ -0,0 +1,159 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use arrow_array::RecordBatch; +use arrow_array::builder::{BooleanBuilder, PrimitiveBuilder}; +use arrow_array::types::{Int64Type, TimestampMicrosecondType}; +use futures::{StreamExt, stream}; + +use crate::Result; +use crate::arrow::schema_to_arrow_schema; +use crate::scan::ArrowRecordBatchStream; +use crate::spec::{NestedField, PrimitiveType, Type}; +use crate::table::Table; + +/// History metadata table. +/// +/// Shows the table's snapshot history log with ancestry information. +pub struct HistoryTable<'a> { + table: &'a Table, +} + +impl<'a> HistoryTable<'a> { + /// Create a new History table instance. + pub fn new(table: &'a Table) -> Self { + Self { table } + } + + /// Returns the iceberg schema of the history table. + pub fn schema(&self) -> crate::spec::Schema { + let fields = vec![ + NestedField::required( + 1, + "made_current_at", + Type::Primitive(PrimitiveType::Timestamptz), + ), + NestedField::required(2, "snapshot_id", Type::Primitive(PrimitiveType::Long)), + NestedField::optional(3, "parent_id", Type::Primitive(PrimitiveType::Long)), + NestedField::required( + 4, + "is_current_ancestor", + Type::Primitive(PrimitiveType::Boolean), + ), + ]; + crate::spec::Schema::builder() + .with_fields(fields.into_iter().map(|f| f.into())) + .build() + .unwrap() + } + + /// Scans the history table. + pub async fn scan(&self) -> Result { + let schema = schema_to_arrow_schema(&self.schema())?; + let metadata = self.table.metadata(); + + // Build set of ancestor snapshot IDs by walking back from current snapshot + let mut ancestor_ids = HashSet::new(); + if let Some(current) = metadata.current_snapshot() { + let mut snapshot_id = Some(current.snapshot_id()); + while let Some(id) = snapshot_id { + ancestor_ids.insert(id); + snapshot_id = metadata + .snapshot_by_id(id) + .and_then(|s| s.parent_snapshot_id()); + } + } + + let mut made_current_at = + PrimitiveBuilder::::new().with_timezone("+00:00"); + let mut snapshot_id = PrimitiveBuilder::::new(); + let mut parent_id = PrimitiveBuilder::::new(); + let mut is_current_ancestor = BooleanBuilder::new(); + + for log_entry in metadata.history() { + made_current_at.append_value(log_entry.timestamp_ms * 1000); + snapshot_id.append_value(log_entry.snapshot_id); + + let parent = metadata + .snapshot_by_id(log_entry.snapshot_id) + .and_then(|s| s.parent_snapshot_id()); + parent_id.append_option(parent); + + is_current_ancestor.append_value(ancestor_ids.contains(&log_entry.snapshot_id)); + } + + let batch = RecordBatch::try_new(Arc::new(schema), vec![ + Arc::new(made_current_at.finish()), + Arc::new(snapshot_id.finish()), + Arc::new(parent_id.finish()), + Arc::new(is_current_ancestor.finish()), + ])?; + + Ok(stream::iter(vec![Ok(batch)]).boxed()) + } +} + +#[cfg(test)] +mod tests { + use expect_test::expect; + use futures::TryStreamExt; + + use crate::scan::tests::TableTestFixture; + use crate::test_utils::check_record_batches; + + #[tokio::test] + async fn test_history_table() { + let table = TableTestFixture::new().table; + + let batch_stream = table.inspect().history().scan().await.unwrap(); + + check_record_batches( + batch_stream.try_collect::>().await.unwrap(), + expect![[r#" + Field { "made_current_at": Timestamp(µs, "+00:00"), metadata: {"PARQUET:field_id": "1"} }, + Field { "snapshot_id": Int64, metadata: {"PARQUET:field_id": "2"} }, + Field { "parent_id": nullable Int64, metadata: {"PARQUET:field_id": "3"} }, + Field { "is_current_ancestor": Boolean, metadata: {"PARQUET:field_id": "4"} }"#]], + expect![[r#" + made_current_at: PrimitiveArray + [ + 2018-01-04T21:22:35.770+00:00, + 2019-04-12T20:29:15.770+00:00, + ], + snapshot_id: PrimitiveArray + [ + 3051729675574597004, + 3055729675574597004, + ], + parent_id: PrimitiveArray + [ + null, + 3051729675574597004, + ], + is_current_ancestor: BooleanArray + [ + true, + true, + ]"#]], + &[], + Some("made_current_at"), + ); + } +} diff --git a/crates/iceberg/src/inspect/metadata_log_entries.rs b/crates/iceberg/src/inspect/metadata_log_entries.rs new file mode 100644 index 0000000000..ba79d8c6c2 --- /dev/null +++ b/crates/iceberg/src/inspect/metadata_log_entries.rs @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow_array::RecordBatch; +use arrow_array::builder::{PrimitiveBuilder, StringBuilder}; +use arrow_array::types::{Int32Type, Int64Type, TimestampMicrosecondType}; +use futures::{StreamExt, stream}; + +use crate::Result; +use crate::arrow::schema_to_arrow_schema; +use crate::scan::ArrowRecordBatchStream; +use crate::spec::{NestedField, PrimitiveType, Type}; +use crate::table::Table; + +/// Metadata log entries table. +/// +/// Shows the table's metadata log history. +pub struct MetadataLogEntriesTable<'a> { + table: &'a Table, +} + +impl<'a> MetadataLogEntriesTable<'a> { + /// Create a new MetadataLogEntries table instance. + pub fn new(table: &'a Table) -> Self { + Self { table } + } + + /// Returns the iceberg schema of the metadata log entries table. + pub fn schema(&self) -> crate::spec::Schema { + let fields = vec![ + NestedField::required(1, "timestamp", Type::Primitive(PrimitiveType::Timestamptz)), + NestedField::required(2, "file", Type::Primitive(PrimitiveType::String)), + NestedField::optional( + 3, + "latest_snapshot_id", + Type::Primitive(PrimitiveType::Long), + ), + NestedField::optional(4, "latest_schema_id", Type::Primitive(PrimitiveType::Int)), + NestedField::optional( + 5, + "latest_sequence_number", + Type::Primitive(PrimitiveType::Long), + ), + ]; + crate::spec::Schema::builder() + .with_fields(fields.into_iter().map(|f| f.into())) + .build() + .unwrap() + } + + /// Scans the metadata log entries table. + pub async fn scan(&self) -> Result { + let schema = schema_to_arrow_schema(&self.schema())?; + let metadata = self.table.metadata(); + + let mut timestamp = + PrimitiveBuilder::::new().with_timezone("+00:00"); + let mut file = StringBuilder::new(); + let mut latest_snapshot_id = PrimitiveBuilder::::new(); + let mut latest_schema_id = PrimitiveBuilder::::new(); + let mut latest_sequence_number = PrimitiveBuilder::::new(); + + // Historical entries from the metadata log + for log_entry in metadata.metadata_log() { + timestamp.append_value(log_entry.timestamp_ms * 1000); + file.append_value(&log_entry.metadata_file); + latest_snapshot_id.append_null(); + latest_schema_id.append_null(); + latest_sequence_number.append_null(); + } + + // Current metadata entry + if let Some(metadata_location) = self.table.metadata_location() { + timestamp.append_value(metadata.last_updated_ms * 1000); + file.append_value(metadata_location); + latest_snapshot_id.append_option(metadata.current_snapshot_id); + latest_schema_id.append_value(metadata.current_schema_id); + latest_sequence_number.append_value(metadata.last_sequence_number); + } + + let batch = RecordBatch::try_new(Arc::new(schema), vec![ + Arc::new(timestamp.finish()), + Arc::new(file.finish()), + Arc::new(latest_snapshot_id.finish()), + Arc::new(latest_schema_id.finish()), + Arc::new(latest_sequence_number.finish()), + ])?; + + Ok(stream::iter(vec![Ok(batch)]).boxed()) + } +} + +#[cfg(test)] +mod tests { + use expect_test::expect; + use futures::TryStreamExt; + + use crate::scan::tests::TableTestFixture; + use crate::test_utils::check_record_batches; + + #[tokio::test] + async fn test_metadata_log_entries_table() { + let table = TableTestFixture::new().table; + + let batch_stream = table.inspect().metadata_log_entries().scan().await.unwrap(); + + check_record_batches( + batch_stream.try_collect::>().await.unwrap(), + expect![[r#" + Field { "timestamp": Timestamp(µs, "+00:00"), metadata: {"PARQUET:field_id": "1"} }, + Field { "file": Utf8, metadata: {"PARQUET:field_id": "2"} }, + Field { "latest_snapshot_id": nullable Int64, metadata: {"PARQUET:field_id": "3"} }, + Field { "latest_schema_id": nullable Int32, metadata: {"PARQUET:field_id": "4"} }, + Field { "latest_sequence_number": nullable Int64, metadata: {"PARQUET:field_id": "5"} }"#]], + expect![[r#" + timestamp: PrimitiveArray + [ + 1970-01-01T00:25:15.100+00:00, + 2020-10-14T01:22:53.590+00:00, + ], + file: (skipped), + latest_snapshot_id: PrimitiveArray + [ + null, + 3055729675574597004, + ], + latest_schema_id: PrimitiveArray + [ + null, + 1, + ], + latest_sequence_number: PrimitiveArray + [ + null, + 34, + ]"#]], + &["file"], + Some("timestamp"), + ); + } +} diff --git a/crates/iceberg/src/inspect/metadata_table.rs b/crates/iceberg/src/inspect/metadata_table.rs index d5e9d60869..2d8ecf7feb 100644 --- a/crates/iceberg/src/inspect/metadata_table.rs +++ b/crates/iceberg/src/inspect/metadata_table.rs @@ -15,7 +15,11 @@ // specific language governing permissions and limitations // under the License. -use super::{ManifestsTable, SnapshotsTable}; +use super::{ + AllDataFilesTable, AllDeleteFilesTable, AllFilesTable, AllManifestsTable, DataFilesTable, + DeleteFilesTable, FilesTable, HistoryTable, ManifestsTable, MetadataLogEntriesTable, RefsTable, + SnapshotsTable, +}; use crate::table::Table; /// Metadata table is used to inspect a table's history, snapshots, and other metadata as a table. @@ -34,6 +38,26 @@ pub enum MetadataTableType { Snapshots, /// [`ManifestsTable`] Manifests, + /// [`HistoryTable`] + History, + /// [`MetadataLogEntriesTable`] + MetadataLogEntries, + /// [`RefsTable`] + Refs, + /// [`FilesTable`] + Files, + /// [`DataFilesTable`] + DataFiles, + /// [`DeleteFilesTable`] + DeleteFiles, + /// [`AllManifestsTable`] + AllManifests, + /// [`AllFilesTable`] + AllFiles, + /// [`AllDataFilesTable`] + AllDataFiles, + /// [`AllDeleteFilesTable`] + AllDeleteFiles, } impl MetadataTableType { @@ -42,6 +66,16 @@ impl MetadataTableType { match self { MetadataTableType::Snapshots => "snapshots", MetadataTableType::Manifests => "manifests", + MetadataTableType::History => "history", + MetadataTableType::MetadataLogEntries => "metadata_log_entries", + MetadataTableType::Refs => "refs", + MetadataTableType::Files => "files", + MetadataTableType::DataFiles => "data_files", + MetadataTableType::DeleteFiles => "delete_files", + MetadataTableType::AllManifests => "all_manifests", + MetadataTableType::AllFiles => "all_files", + MetadataTableType::AllDataFiles => "all_data_files", + MetadataTableType::AllDeleteFiles => "all_delete_files", } } @@ -59,6 +93,16 @@ impl TryFrom<&str> for MetadataTableType { match value { "snapshots" => Ok(Self::Snapshots), "manifests" => Ok(Self::Manifests), + "history" => Ok(Self::History), + "metadata_log_entries" => Ok(Self::MetadataLogEntries), + "refs" => Ok(Self::Refs), + "files" => Ok(Self::Files), + "data_files" => Ok(Self::DataFiles), + "delete_files" => Ok(Self::DeleteFiles), + "all_manifests" => Ok(Self::AllManifests), + "all_files" => Ok(Self::AllFiles), + "all_data_files" => Ok(Self::AllDataFiles), + "all_delete_files" => Ok(Self::AllDeleteFiles), _ => Err(format!("invalid metadata table type: {value}")), } } @@ -79,4 +123,54 @@ impl<'a> MetadataTable<'a> { pub fn manifests(&self) -> ManifestsTable<'_> { ManifestsTable::new(self.0) } + + /// Get the history table. + pub fn history(&self) -> HistoryTable<'_> { + HistoryTable::new(self.0) + } + + /// Get the metadata log entries table. + pub fn metadata_log_entries(&self) -> MetadataLogEntriesTable<'_> { + MetadataLogEntriesTable::new(self.0) + } + + /// Get the refs table. + pub fn refs(&self) -> RefsTable<'_> { + RefsTable::new(self.0) + } + + /// Get the files table. + pub fn files(&self) -> FilesTable<'_> { + FilesTable::new(self.0) + } + + /// Get the data files table. + pub fn data_files(&self) -> DataFilesTable<'_> { + DataFilesTable::new(self.0) + } + + /// Get the delete files table. + pub fn delete_files(&self) -> DeleteFilesTable<'_> { + DeleteFilesTable::new(self.0) + } + + /// Get the all manifests table. + pub fn all_manifests(&self) -> AllManifestsTable<'_> { + AllManifestsTable::new(self.0) + } + + /// Get the all files table. + pub fn all_files(&self) -> AllFilesTable<'_> { + AllFilesTable::new(self.0) + } + + /// Get the all data files table. + pub fn all_data_files(&self) -> AllDataFilesTable<'_> { + AllDataFilesTable::new(self.0) + } + + /// Get the all delete files table. + pub fn all_delete_files(&self) -> AllDeleteFilesTable<'_> { + AllDeleteFilesTable::new(self.0) + } } diff --git a/crates/iceberg/src/inspect/mod.rs b/crates/iceberg/src/inspect/mod.rs index b64420ea11..f44e53518b 100644 --- a/crates/iceberg/src/inspect/mod.rs +++ b/crates/iceberg/src/inspect/mod.rs @@ -17,10 +17,22 @@ //! Metadata table APIs. +mod all_files; +mod all_manifests; +mod files; +mod history; mod manifests; +mod metadata_log_entries; mod metadata_table; +mod refs; mod snapshots; +pub use all_files::{AllDataFilesTable, AllDeleteFilesTable, AllFilesTable}; +pub use all_manifests::AllManifestsTable; +pub use files::{DataFilesTable, DeleteFilesTable, FilesTable}; +pub use history::HistoryTable; pub use manifests::ManifestsTable; +pub use metadata_log_entries::MetadataLogEntriesTable; pub use metadata_table::*; +pub use refs::RefsTable; pub use snapshots::SnapshotsTable; diff --git a/crates/iceberg/src/inspect/refs.rs b/crates/iceberg/src/inspect/refs.rs new file mode 100644 index 0000000000..55ec5939e8 --- /dev/null +++ b/crates/iceberg/src/inspect/refs.rs @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow_array::RecordBatch; +use arrow_array::builder::{PrimitiveBuilder, StringBuilder}; +use arrow_array::types::Int64Type; +use futures::{StreamExt, stream}; + +use crate::Result; +use crate::arrow::schema_to_arrow_schema; +use crate::scan::ArrowRecordBatchStream; +use crate::spec::{NestedField, PrimitiveType, SnapshotRetention, Type}; +use crate::table::Table; + +/// Refs metadata table. +/// +/// Shows all named references (branches and tags) for the table. +pub struct RefsTable<'a> { + table: &'a Table, +} + +impl<'a> RefsTable<'a> { + /// Create a new Refs table instance. + pub fn new(table: &'a Table) -> Self { + Self { table } + } + + /// Returns the iceberg schema of the refs table. + pub fn schema(&self) -> crate::spec::Schema { + let fields = vec![ + NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)), + NestedField::required(2, "type", Type::Primitive(PrimitiveType::String)), + NestedField::required(3, "snapshot_id", Type::Primitive(PrimitiveType::Long)), + NestedField::optional( + 4, + "max_reference_age_in_ms", + Type::Primitive(PrimitiveType::Long), + ), + NestedField::optional( + 5, + "min_snapshots_to_keep", + Type::Primitive(PrimitiveType::Long), + ), + NestedField::optional( + 6, + "max_snapshot_age_in_ms", + Type::Primitive(PrimitiveType::Long), + ), + ]; + crate::spec::Schema::builder() + .with_fields(fields.into_iter().map(|f| f.into())) + .build() + .unwrap() + } + + /// Scans the refs table. + pub async fn scan(&self) -> Result { + let schema = schema_to_arrow_schema(&self.schema())?; + + let mut name_builder = StringBuilder::new(); + let mut type_builder = StringBuilder::new(); + let mut snapshot_id_builder = PrimitiveBuilder::::new(); + let mut max_ref_age_builder = PrimitiveBuilder::::new(); + let mut min_snapshots_builder = PrimitiveBuilder::::new(); + let mut max_snapshot_age_builder = PrimitiveBuilder::::new(); + + for (ref_name, snapshot_ref) in &self.table.metadata().refs { + name_builder.append_value(ref_name); + snapshot_id_builder.append_value(snapshot_ref.snapshot_id); + + match &snapshot_ref.retention { + SnapshotRetention::Branch { + min_snapshots_to_keep, + max_snapshot_age_ms, + max_ref_age_ms, + } => { + type_builder.append_value("branch"); + max_ref_age_builder.append_option(*max_ref_age_ms); + min_snapshots_builder.append_option(min_snapshots_to_keep.map(|v| v as i64)); + max_snapshot_age_builder.append_option(*max_snapshot_age_ms); + } + SnapshotRetention::Tag { max_ref_age_ms } => { + type_builder.append_value("tag"); + max_ref_age_builder.append_option(*max_ref_age_ms); + min_snapshots_builder.append_null(); + max_snapshot_age_builder.append_null(); + } + } + } + + let batch = RecordBatch::try_new(Arc::new(schema), vec![ + Arc::new(name_builder.finish()), + Arc::new(type_builder.finish()), + Arc::new(snapshot_id_builder.finish()), + Arc::new(max_ref_age_builder.finish()), + Arc::new(min_snapshots_builder.finish()), + Arc::new(max_snapshot_age_builder.finish()), + ])?; + + Ok(stream::iter(vec![Ok(batch)]).boxed()) + } +} + +#[cfg(test)] +mod tests { + use expect_test::expect; + use futures::TryStreamExt; + + use crate::scan::tests::TableTestFixture; + use crate::test_utils::check_record_batches; + + #[tokio::test] + async fn test_refs_table() { + let table = TableTestFixture::new().table; + + let batch_stream = table.inspect().refs().scan().await.unwrap(); + + check_record_batches( + batch_stream.try_collect::>().await.unwrap(), + expect![[r#" + Field { "name": Utf8, metadata: {"PARQUET:field_id": "1"} }, + Field { "type": Utf8, metadata: {"PARQUET:field_id": "2"} }, + Field { "snapshot_id": Int64, metadata: {"PARQUET:field_id": "3"} }, + Field { "max_reference_age_in_ms": nullable Int64, metadata: {"PARQUET:field_id": "4"} }, + Field { "min_snapshots_to_keep": nullable Int64, metadata: {"PARQUET:field_id": "5"} }, + Field { "max_snapshot_age_in_ms": nullable Int64, metadata: {"PARQUET:field_id": "6"} }"#]], + expect![[r#" + name: StringArray + [ + "main", + "test", + ], + type: StringArray + [ + "branch", + "tag", + ], + snapshot_id: PrimitiveArray + [ + 3055729675574597004, + 3051729675574597004, + ], + max_reference_age_in_ms: PrimitiveArray + [ + null, + 10000000, + ], + min_snapshots_to_keep: PrimitiveArray + [ + null, + null, + ], + max_snapshot_age_in_ms: PrimitiveArray + [ + null, + null, + ]"#]], + &[], + Some("name"), + ); + } +} diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index c055c12c9a..13ef50d87a 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -960,6 +960,29 @@ pub mod tests { } } + /// Sets up manifest files for all snapshots (including parent). + /// Use this for all_* metadata table tests that iterate all snapshots. + pub async fn setup_all_snapshot_manifest_files(&mut self) { + self.setup_manifest_files().await; + + // Also write an empty manifest list for the parent snapshot + let current_snapshot = self.table.metadata().current_snapshot().unwrap(); + let parent_snapshot = current_snapshot + .parent_snapshot(self.table.metadata()) + .unwrap(); + + let manifest_list_write = ManifestListWriter::v2( + self.table + .file_io() + .new_output(parent_snapshot.manifest_list()) + .unwrap(), + parent_snapshot.snapshot_id(), + parent_snapshot.parent_snapshot_id(), + parent_snapshot.sequence_number(), + ); + manifest_list_write.close().await.unwrap(); + } + pub async fn setup_unpartitioned_manifest_files(&mut self) { let current_snapshot = self.table.metadata().current_snapshot().unwrap(); let parent_snapshot = current_snapshot diff --git a/crates/integrations/datafusion/src/table/metadata_table.rs b/crates/integrations/datafusion/src/table/metadata_table.rs index 38148b4084..d41bab6e34 100644 --- a/crates/integrations/datafusion/src/table/metadata_table.rs +++ b/crates/integrations/datafusion/src/table/metadata_table.rs @@ -54,6 +54,16 @@ impl TableProvider for IcebergMetadataTableProvider { let schema = match self.r#type { MetadataTableType::Snapshots => metadata_table.snapshots().schema(), MetadataTableType::Manifests => metadata_table.manifests().schema(), + MetadataTableType::History => metadata_table.history().schema(), + MetadataTableType::MetadataLogEntries => metadata_table.metadata_log_entries().schema(), + MetadataTableType::Refs => metadata_table.refs().schema(), + MetadataTableType::Files => metadata_table.files().schema(), + MetadataTableType::DataFiles => metadata_table.data_files().schema(), + MetadataTableType::DeleteFiles => metadata_table.delete_files().schema(), + MetadataTableType::AllManifests => metadata_table.all_manifests().schema(), + MetadataTableType::AllFiles => metadata_table.all_files().schema(), + MetadataTableType::AllDataFiles => metadata_table.all_data_files().schema(), + MetadataTableType::AllDeleteFiles => metadata_table.all_delete_files().schema(), }; schema_to_arrow_schema(&schema).unwrap().into() } @@ -79,6 +89,18 @@ impl IcebergMetadataTableProvider { let stream = match self.r#type { MetadataTableType::Snapshots => metadata_table.snapshots().scan().await, MetadataTableType::Manifests => metadata_table.manifests().scan().await, + MetadataTableType::History => metadata_table.history().scan().await, + MetadataTableType::MetadataLogEntries => { + metadata_table.metadata_log_entries().scan().await + } + MetadataTableType::Refs => metadata_table.refs().scan().await, + MetadataTableType::Files => metadata_table.files().scan().await, + MetadataTableType::DataFiles => metadata_table.data_files().scan().await, + MetadataTableType::DeleteFiles => metadata_table.delete_files().scan().await, + MetadataTableType::AllManifests => metadata_table.all_manifests().scan().await, + MetadataTableType::AllFiles => metadata_table.all_files().scan().await, + MetadataTableType::AllDataFiles => metadata_table.all_data_files().scan().await, + MetadataTableType::AllDeleteFiles => metadata_table.all_delete_files().scan().await, } .map_err(to_datafusion_error)?; let stream = stream.map_err(to_datafusion_error); diff --git a/crates/integrations/datafusion/tests/integration_datafusion_test.rs b/crates/integrations/datafusion/tests/integration_datafusion_test.rs index 6f8898abb8..6fe404f860 100644 --- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs +++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs @@ -171,6 +171,16 @@ async fn test_provider_list_table_names() -> Result<()> { "my_table", "my_table$snapshots", "my_table$manifests", + "my_table$history", + "my_table$metadata_log_entries", + "my_table$refs", + "my_table$files", + "my_table$data_files", + "my_table$delete_files", + "my_table$all_manifests", + "my_table$all_files", + "my_table$all_data_files", + "my_table$all_delete_files", ] "#]] .assert_debug_eq(&result); diff --git a/crates/sqllogictest/testdata/slts/df_test/show_tables.slt b/crates/sqllogictest/testdata/slts/df_test/show_tables.slt index a0f0e55b5b..c81272e04b 100644 --- a/crates/sqllogictest/testdata/slts/df_test/show_tables.slt +++ b/crates/sqllogictest/testdata/slts/df_test/show_tables.slt @@ -26,10 +26,30 @@ datafusion information_schema schemata VIEW datafusion information_schema tables VIEW datafusion information_schema views VIEW default default test_binary_table BASE TABLE +default default test_binary_table$all_data_files BASE TABLE +default default test_binary_table$all_delete_files BASE TABLE +default default test_binary_table$all_files BASE TABLE +default default test_binary_table$all_manifests BASE TABLE +default default test_binary_table$data_files BASE TABLE +default default test_binary_table$delete_files BASE TABLE +default default test_binary_table$files BASE TABLE +default default test_binary_table$history BASE TABLE default default test_binary_table$manifests BASE TABLE +default default test_binary_table$metadata_log_entries BASE TABLE +default default test_binary_table$refs BASE TABLE default default test_binary_table$snapshots BASE TABLE default default test_partitioned_table BASE TABLE +default default test_partitioned_table$all_data_files BASE TABLE +default default test_partitioned_table$all_delete_files BASE TABLE +default default test_partitioned_table$all_files BASE TABLE +default default test_partitioned_table$all_manifests BASE TABLE +default default test_partitioned_table$data_files BASE TABLE +default default test_partitioned_table$delete_files BASE TABLE +default default test_partitioned_table$files BASE TABLE +default default test_partitioned_table$history BASE TABLE default default test_partitioned_table$manifests BASE TABLE +default default test_partitioned_table$metadata_log_entries BASE TABLE +default default test_partitioned_table$refs BASE TABLE default default test_partitioned_table$snapshots BASE TABLE default information_schema columns VIEW default information_schema df_settings VIEW