Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
4c7318a
[EXPERIMENTAL] Split Iceberg FileScanTask serialization for reduced n…
andygrove Jan 27, 2026
183a74a
Change iceberg-rust CI jobs to trigger on [iceberg-rust] tag
andygrove Jan 27, 2026
4492d67
Allow clippy::large_enum_variant for spark_operator module
andygrove Jan 27, 2026
537f450
trigger CI
andygrove Jan 27, 2026
0af529d
Remove unused import
andygrove Jan 27, 2026
d78d9dd
Simplify split mode detection for Iceberg scan serialization
andygrove Jan 27, 2026
5066cc9
Revert "Simplify split mode detection for Iceberg scan serialization"
andygrove Jan 27, 2026
074b88c
Fix partition index handling in IcebergScanExec for split mode
andygrove Jan 27, 2026
894e02d
Keep legacy code path for IcebergScan as child of other operators
andygrove Jan 27, 2026
c76ddf3
Implement per-partition plan injection for Iceberg split serialization
andygrove Jan 27, 2026
2617d6c
Remove unused legacy apply method from CometIcebergNativeScanExec
andygrove Jan 27, 2026
89cd3f9
Remove split_mode flag and simplify IcebergScan protobuf
andygrove Jan 27, 2026
a5b03d7
Remove PR_DESCRIPTION.md
andygrove Jan 27, 2026
3624aaf
fix: Use perPartitionData.length for Iceberg partition count
andygrove Jan 27, 2026
3d53a56
fix: Override convertBlock() to preserve transient fields
andygrove Jan 27, 2026
1b78f75
style: Remove unused KeyGroupedPartitioning import
andygrove Jan 27, 2026
63e3ad6
trigger CI
andygrove Jan 27, 2026
76b6047
Handle partition index out of bounds for joins/multi-input scenarios
andygrove Jan 27, 2026
e20b09e
Revert "Handle partition index out of bounds for joins/multi-input sc…
andygrove Jan 27, 2026
6372c1c
fix: Handle joins over multiple Iceberg tables in split serialization
andygrove Jan 28, 2026
c2698fd
scalastyle
andygrove Jan 28, 2026
faafb15
chore: Remove debug logging from CometIcebergSplitRDD
andygrove Jan 28, 2026
0a02bb7
fix: Handle broadcast joins over Iceberg tables in split serialization
andygrove Jan 28, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/iceberg_spark_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ jobs:
-Pquick=true -x javadoc

iceberg-spark-rust:
if: contains(github.event.pull_request.title, '[iceberg]')
if: contains(github.event.pull_request.title, '[iceberg-rust]')
strategy:
matrix:
os: [ubuntu-24.04]
Expand Down Expand Up @@ -203,7 +203,7 @@ jobs:
-Pquick=true -x javadoc

iceberg-spark-extensions-rust:
if: contains(github.event.pull_request.title, '[iceberg]')
if: contains(github.event.pull_request.title, '[iceberg-rust]')
strategy:
matrix:
os: [ubuntu-24.04]
Expand Down Expand Up @@ -242,7 +242,7 @@ jobs:
-Pquick=true -x javadoc

iceberg-spark-runtime-rust:
if: contains(github.event.pull_request.title, '[iceberg]')
if: contains(github.event.pull_request.title, '[iceberg-rust]')
strategy:
matrix:
os: [ubuntu-24.04]
Expand Down
14 changes: 12 additions & 2 deletions native/core/src/execution/operators/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,18 @@ impl ExecutionPlan for IcebergScanExec {
partition: usize,
context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
if partition < self.file_task_groups.len() {
let tasks = &self.file_task_groups[partition];
// In split mode (single task group), always use index 0 regardless of requested partition.
// This is because in Comet's per-partition execution model, each task builds its own plan
// with only its partition's data. The parent operator may request partition N, but this
// IcebergScanExec already contains the correct data for partition N in task_groups[0].
let effective_partition = if self.file_task_groups.len() == 1 {
0
} else {
partition
};

if effective_partition < self.file_task_groups.len() {
let tasks = &self.file_task_groups[effective_partition];
self.execute_with_tasks(tasks.clone(), partition, context)
} else {
Err(DataFusionError::Execution(format!(
Expand Down
65 changes: 31 additions & 34 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1132,26 +1132,27 @@ impl PhysicalPlanner {
))
}
OpStruct::IcebergScan(scan) => {
let required_schema: SchemaRef =
convert_spark_types_to_arrow_schema(scan.required_schema.as_slice());
// Extract common data and single partition's file tasks
// Per-partition injection happens in Scala before sending to native
let common = scan
.common
.as_ref()
.ok_or_else(|| GeneralError("IcebergScan missing common data".into()))?;
let partition = scan
.partition
.as_ref()
.ok_or_else(|| GeneralError("IcebergScan missing partition data".into()))?;

let catalog_properties: HashMap<String, String> = scan
let required_schema =
convert_spark_types_to_arrow_schema(common.required_schema.as_slice());
let catalog_properties: HashMap<String, String> = common
.catalog_properties
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
let metadata_location = common.metadata_location.clone();
let tasks = parse_file_scan_tasks_from_common(common, &partition.file_scan_tasks)?;

let metadata_location = scan.metadata_location.clone();

debug_assert!(
!scan.file_partitions.is_empty(),
"IcebergScan must have at least one file partition. This indicates a bug in Scala serialization."
);

let tasks = parse_file_scan_tasks(
scan,
&scan.file_partitions[self.partition as usize].file_scan_tasks,
)?;
let file_task_groups = vec![tasks];

let iceberg_scan = IcebergScanExec::new(
Expand Down Expand Up @@ -2743,15 +2744,14 @@ fn partition_data_to_struct(
/// Each task contains a residual predicate that is used for row-group level filtering
/// during Parquet scanning.
///
/// This function uses deduplication pools from the IcebergScan to avoid redundant parsing
/// of schemas, partition specs, partition types, name mappings, and other repeated data.
fn parse_file_scan_tasks(
proto_scan: &spark_operator::IcebergScan,
/// This function uses deduplication pools from the IcebergScanCommon to avoid redundant
/// parsing of schemas, partition specs, partition types, name mappings, and other repeated data.
fn parse_file_scan_tasks_from_common(
proto_common: &spark_operator::IcebergScanCommon,
proto_tasks: &[spark_operator::IcebergFileScanTask],
) -> Result<Vec<iceberg::scan::FileScanTask>, ExecutionError> {
// Build caches upfront: for 10K tasks with 1 schema, this parses the schema
// once instead of 10K times, eliminating redundant JSON deserialization
let schema_cache: Vec<Arc<iceberg::spec::Schema>> = proto_scan
// Build caches upfront from common data
let schema_cache: Vec<Arc<iceberg::spec::Schema>> = proto_common
.schema_pool
.iter()
.map(|json| {
Expand All @@ -2764,7 +2764,7 @@ fn parse_file_scan_tasks(
})
.collect::<Result<Vec<_>, _>>()?;

let partition_spec_cache: Vec<Option<Arc<iceberg::spec::PartitionSpec>>> = proto_scan
let partition_spec_cache: Vec<Option<Arc<iceberg::spec::PartitionSpec>>> = proto_common
.partition_spec_pool
.iter()
.map(|json| {
Expand All @@ -2774,7 +2774,7 @@ fn parse_file_scan_tasks(
})
.collect();

let name_mapping_cache: Vec<Option<Arc<iceberg::spec::NameMapping>>> = proto_scan
let name_mapping_cache: Vec<Option<Arc<iceberg::spec::NameMapping>>> = proto_common
.name_mapping_pool
.iter()
.map(|json| {
Expand All @@ -2784,7 +2784,7 @@ fn parse_file_scan_tasks(
})
.collect();

let delete_files_cache: Vec<Vec<iceberg::scan::FileScanTaskDeleteFile>> = proto_scan
let delete_files_cache: Vec<Vec<iceberg::scan::FileScanTaskDeleteFile>> = proto_common
.delete_files_pool
.iter()
.map(|list| {
Expand All @@ -2796,7 +2796,7 @@ fn parse_file_scan_tasks(
"EQUALITY_DELETES" => iceberg::spec::DataContentType::EqualityDeletes,
other => {
return Err(GeneralError(format!(
"Invalid delete content type '{}'. This indicates a bug in Scala serialization.",
"Invalid delete content type '{}'",
other
)))
}
Expand All @@ -2817,7 +2817,6 @@ fn parse_file_scan_tasks(
})
.collect::<Result<Vec<_>, _>>()?;

// Partition data pool is in protobuf messages
let results: Result<Vec<_>, _> = proto_tasks
.iter()
.map(|proto_task| {
Expand Down Expand Up @@ -2851,7 +2850,7 @@ fn parse_file_scan_tasks(
};

let bound_predicate = if let Some(idx) = proto_task.residual_idx {
proto_scan
proto_common
.residual_pool
.get(idx as usize)
.and_then(convert_spark_expr_to_predicate)
Expand All @@ -2871,24 +2870,22 @@ fn parse_file_scan_tasks(
};

let partition = if let Some(partition_data_idx) = proto_task.partition_data_idx {
// Get partition data from protobuf pool
let partition_data_proto = proto_scan
let partition_data_proto = proto_common
.partition_data_pool
.get(partition_data_idx as usize)
.ok_or_else(|| {
ExecutionError::GeneralError(format!(
"Invalid partition_data_idx: {} (pool size: {})",
partition_data_idx,
proto_scan.partition_data_pool.len()
proto_common.partition_data_pool.len()
))
})?;

// Convert protobuf PartitionData to iceberg Struct
match partition_data_to_struct(partition_data_proto) {
Ok(s) => Some(s),
Err(e) => {
return Err(ExecutionError::GeneralError(format!(
"Failed to deserialize partition data from protobuf: {}",
"Failed to deserialize partition data: {}",
e
)))
}
Expand All @@ -2907,14 +2904,14 @@ fn parse_file_scan_tasks(
.and_then(|idx| name_mapping_cache.get(idx as usize))
.and_then(|opt| opt.clone());

let project_field_ids = proto_scan
let project_field_ids = proto_common
.project_field_ids_pool
.get(proto_task.project_field_ids_idx as usize)
.ok_or_else(|| {
ExecutionError::GeneralError(format!(
"Invalid project_field_ids_idx: {} (pool size: {})",
proto_task.project_field_ids_idx,
proto_scan.project_field_ids_pool.len()
proto_common.project_field_ids_pool.len()
))
})?
.field_ids
Expand Down
1 change: 1 addition & 0 deletions native/proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub mod spark_partitioning {

// Include generated modules from .proto files.
#[allow(missing_docs)]
#[allow(clippy::large_enum_variant)]
pub mod spark_operator {
include!(concat!("generated", "/spark.spark_operator.rs"));
}
Expand Down
42 changes: 24 additions & 18 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -156,28 +156,34 @@ message PartitionData {
repeated PartitionValue values = 1;
}

message IcebergScan {
// Common data shared by all partitions in split mode (sent once, captured in closure)
message IcebergScanCommon {
// Catalog-specific configuration for FileIO (credentials, S3/GCS config, etc.)
map<string, string> catalog_properties = 1;

// Table metadata file path for FileIO initialization
string metadata_location = 2;

// Schema to read
repeated SparkStructField required_schema = 1;
repeated SparkStructField required_schema = 3;

// Catalog-specific configuration for FileIO (credentials, S3/GCS config, etc.)
map<string, string> catalog_properties = 2;
// Deduplication pools (must contain ALL entries for cross-partition deduplication)
repeated string schema_pool = 4;
repeated string partition_type_pool = 5;
repeated string partition_spec_pool = 6;
repeated string name_mapping_pool = 7;
repeated ProjectFieldIdList project_field_ids_pool = 8;
repeated PartitionData partition_data_pool = 9;
repeated DeleteFileList delete_files_pool = 10;
repeated spark.spark_expression.Expr residual_pool = 11;
}

// Pre-planned file scan tasks grouped by Spark partition
repeated IcebergFilePartition file_partitions = 3;
message IcebergScan {
// Common data shared across partitions (pools, metadata, catalog props)
IcebergScanCommon common = 1;

// Table metadata file path for FileIO initialization
string metadata_location = 4;

// Deduplication pools - shared data referenced by index from tasks
repeated string schema_pool = 5;
repeated string partition_type_pool = 6;
repeated string partition_spec_pool = 7;
repeated string name_mapping_pool = 8;
repeated ProjectFieldIdList project_field_ids_pool = 9;
repeated PartitionData partition_data_pool = 10;
repeated DeleteFileList delete_files_pool = 11;
repeated spark.spark_expression.Expr residual_pool = 12;
// Single partition's file scan tasks
IcebergFilePartition partition = 2;
}

// Helper message for deduplicating field ID lists
Expand Down
Loading
Loading