From 34fa7e4d07b6951e6709a8267b2dbc99b4d6d420 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Wed, 7 Jan 2026 18:14:46 -0800 Subject: [PATCH 1/8] add_support_partition_overwrite_mode --- native/proto/src/proto/operator.proto | 2 ++ .../operator/CometDataWritingCommand.scala | 12 ++++---- .../parquet/CometParquetWriterSuite.scala | 28 +++++++++++++++++++ 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 015b5d96b6..1511a107c8 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -245,6 +245,8 @@ message ParquetWriter { optional string job_id = 6; // Task attempt ID for this specific task optional int32 task_attempt_id = 7; +// set of partition columns + repeated string partition_columns = 8; } enum AggregateMode { diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala index 8349329841..6c1d9ce5b9 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCommand, WriteFilesExec} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.comet.{CometConf, ConfigEntry, DataTypeSupport} import org.apache.comet.CometSparkSessionExtensions.withInfo @@ -62,7 +63,7 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec } if (cmd.partitionColumns.nonEmpty || cmd.staticPartitions.nonEmpty) { - return Unsupported(Some("Partitioned writes are not supported")) + return Incompatible(Some("Partitioned writes are not supported")) } if (cmd.query.output.exists(attr => DataTypeSupport.isComplexType(attr.dataType))) { @@ -167,6 +168,9 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec other } + val isDynamicOverWriteMode = cmd.partitionColumns.nonEmpty && + SQLConf.get.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC + // Create FileCommitProtocol for atomic writes val jobId = java.util.UUID.randomUUID().toString val committer = @@ -178,11 +182,7 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec committerClass.getConstructor(classOf[String], classOf[String], classOf[Boolean]) Some( constructor - .newInstance( - jobId, - outputPath, - java.lang.Boolean.FALSE // dynamicPartitionOverwrite = false for now - ) + .newInstance(jobId, outputPath, isDynamicOverWriteMode) .asInstanceOf[org.apache.spark.internal.io.FileCommitProtocol]) } catch { case e: Exception => diff --git a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala index 3ae7f949ab..f1f5276aef 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala @@ -228,4 +228,32 @@ class CometParquetWriterSuite extends CometTestBase { } } } + + test("parquet write with mode overwrite") { + withTempPath { dir => + val outputPath = new File(dir, "output.parquet").getAbsolutePath + + withTempPath { inputDir => + val inputPath = createTestData(inputDir) + + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Halifax", + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true") { + + val df = spark.read.parquet(inputPath) + + // First write + df.repartition(2).write.parquet(outputPath) + // verifyWrittenFile(outputPath) + // Second write (with overwrite mode and a different record count to make sure we are not reading the same data) + df.limit(500).repartition(2).write.mode("overwrite").parquet(outputPath) + // // Verify the data was written + val resultDf = spark.read.parquet(outputPath) + assert(resultDf.count() == 500, "Expected 1000 rows after overwrite") + } + } + } + } } From 859489e87c838c3c23b2a9590813ae47ada65e09 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Tue, 13 Jan 2026 08:43:53 -0800 Subject: [PATCH 2/8] parquet_writes_2 --- .../src/execution/operators/parquet_writer.rs | 238 +++++++++++++----- native/core/src/execution/planner.rs | 1 + .../operator/CometDataWritingCommand.scala | 3 +- 3 files changed, 185 insertions(+), 57 deletions(-) diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index 2ca1e9cfd5..1d2b9b27c6 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -25,11 +25,13 @@ use std::{ io::Cursor, sync::Arc, }; - +use std::collections::HashMap; +use arrow::array::{ArrayRef, AsArray}; +use arrow::compute::cast; use opendal::{services::Hdfs, Operator}; use url::Url; -use arrow::datatypes::{Schema, SchemaRef}; +use arrow::datatypes::{DataType, Int16Type, Int32Type, Int64Type, Int8Type, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; use datafusion::{ @@ -69,6 +71,53 @@ enum ParquetWriter { ), } +fn needs_escaping(c: char) -> bool { + matches!(c, '"' | '#' | '%' | '\'' | '*' | '/' | ':' | '=' | '?' | '\\' | '\x7F') + || c.is_control() +} + +fn escape_partition_value(value: &str) -> String { + let mut result = String::with_capacity(value.len()); + for c in value.chars() { + if needs_escaping(c) { + result.push_str(&format!("%{:02X}", c as u32)); + } else { + result.push(c); + } + } + result +} + +fn build_partition_path( + batch: &RecordBatch, + row: usize, + partition_columns: &[String], + partition_indices: &[usize], +) -> Result { + let mut path = String::new(); + for (name, &idx) in partition_columns.iter().zip(partition_indices.iter()) { + let value = get_partition_value(batch.column(idx), row)?; + if !path.is_empty() { + path.push('/'); + } + path.push_str(name); + path.push('='); + path.push_str(&escape_partition_value(&value)); + } + Ok(path) +} + +fn get_partition_value(array: &ArrayRef, row: usize) -> Result { + if array.is_null(row) { + return Ok("__HIVE_DEFAULT_PARTITION__".to_string()); + } + else{ + // relying on arrow's cast op to get string representation + let string_array = cast(array, &DataType::Utf8)?; + Ok(string_array.as_string::().value(row).to_string()) + } +} + impl ParquetWriter { /// Write a RecordBatch to the underlying writer async fn write( @@ -204,6 +253,8 @@ pub struct ParquetWriterExec { metrics: ExecutionPlanMetricsSet, /// Cache for plan properties cache: PlanProperties, + // partition columns + partition_columns: Vec, } impl ParquetWriterExec { @@ -218,6 +269,7 @@ impl ParquetWriterExec { compression: CompressionCodec, partition_id: i32, column_names: Vec, + partition_columns: Vec, ) -> Result { // Preserve the input's partitioning so each partition writes its own file let input_partitioning = input.output_partitioning().clone(); @@ -240,9 +292,33 @@ impl ParquetWriterExec { column_names, metrics: ExecutionPlanMetricsSet::new(), cache, + partition_columns }) } + fn get_partition_value(array: &ArrayRef, row: usize) -> Result { + use arrow::array::*; + use arrow::datatypes::DataType; + + if array.is_null(row) { + return Ok("__HIVE_DEFAULT_PARTITION__".to_string()); + } + + let value = match array.data_type() { + DataType::Utf8 => array.as_string::().value(row).to_string(), + DataType::Int64 => array.as_primitive::().value(row).to_string(), + DataType::Int32 => array.as_primitive::().value(row).to_string(), + DataType::Int16 => array.as_primitive::().value(row).to_string(), + DataType::Int8 => array.as_primitive::().value(row).to_string(), + DataType::Boolean => array.as_boolean().value(row).to_string(), + DataType::Utf8 => array.value(row).to_string(), + dt => return Err(DataFusionError::Execution( + format!("Unsupported partition column type: {:?}", dt) + )), + }; + Ok(value) + } + fn compression_to_parquet(&self) -> Result { match self.compression { CompressionCodec::None => Ok(Compression::UNCOMPRESSED), @@ -435,6 +511,7 @@ impl ExecutionPlan for ParquetWriterExec { self.compression.clone(), self.partition_id, self.column_names.clone(), + self.partition_columns.clone() )?)), _ => Err(DataFusionError::Internal( "ParquetWriterExec requires exactly one child".to_string(), @@ -472,6 +549,11 @@ impl ExecutionPlan for ParquetWriterExec { .collect(); let output_schema = Arc::new(arrow::datatypes::Schema::new(fields)); + for partition_name in &self.partition_columns { + let index = input_schema.index_of(partition_name)?; + let data_type = input_schema.field(index).data_type().clone(); + } + // Generate part file name for this partition // If using FileCommitProtocol (work_dir is set), include task_attempt_id in the filename let part_file = if let Some(attempt_id) = task_attempt_id { @@ -488,69 +570,111 @@ impl ExecutionPlan for ParquetWriterExec { .set_compression(compression) .build(); - let mut writer = Self::create_arrow_writer(&part_file, Arc::clone(&output_schema), props)?; - // Clone schema for use in async closure let schema_for_write = Arc::clone(&output_schema); - // Write batches - let write_task = async move { - let mut stream = input; - let mut total_rows = 0i64; - - while let Some(batch_result) = stream.try_next().await.transpose() { - let batch = batch_result?; - - // Track row count - total_rows += batch.num_rows() as i64; - - // Rename columns in the batch to match output schema - let renamed_batch = if !column_names.is_empty() { - RecordBatch::try_new(Arc::clone(&schema_for_write), batch.columns().to_vec()) - .map_err(|e| { - DataFusionError::Execution(format!( - "Failed to rename batch columns: {}", - e - )) - })? - } else { - batch - }; - - writer.write(&renamed_batch).await.map_err(|e| { - DataFusionError::Execution(format!("Failed to write batch: {}", e)) - })?; - } + if !self.partition_columns.is_empty() { + let write_task = async move { + let mut stream = input; + let mut total_rows = 0i64; + let mut writers: HashMap = HashMap::new(); + let mut partition_files: Vec = Vec::new(); - writer.close().await.map_err(|e| { - DataFusionError::Execution(format!("Failed to close writer: {}", e)) - })?; + while let Some(batch_result) = stream.try_next().await.transpose() { + let batch = batch_result?; + total_rows += batch.num_rows() as i64; - // Get file size - let file_size = std::fs::metadata(&part_file) - .map(|m| m.len() as i64) - .unwrap_or(0); + // Step 1: Rename columns + let renamed_batch = RecordBatch::try_new( + Arc::clone(&schema_for_write), + batch.columns().to_vec() + )?; - // Update metrics with write statistics - files_written.add(1); - bytes_written.add(file_size as usize); - rows_written.add(total_rows as usize); + // Step 2: Sort by partition columns + // ... (next step) - // Log metadata for debugging - eprintln!( - "Wrote Parquet file: path={}, size={}, rows={}", - part_file, file_size, total_rows - ); + // Step 3: Get partition ranges + // ... (next step) - // Return empty stream to indicate completion - Ok::<_, DataFusionError>(futures::stream::empty()) - }; + // Step 4: Write each partition slice + // ... (next step) + } + + // Step 5: Close all writers + for (_, writer) in writers { + writer.close().await?; + } + + Ok::<_, DataFusionError>(futures::stream::empty()) + }; + + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.schema(), + futures::stream::once(write_task).try_flatten(), + ))) - // Execute the write task and create a stream that does not return any batches - Ok(Box::pin(RecordBatchStreamAdapter::new( - self.schema(), - futures::stream::once(write_task).try_flatten(), - ))) + } else { + let mut writer = Self::create_arrow_writer(&part_file, Arc::clone(&output_schema), props)?; + + // Write batches + let write_task = async move { + let mut stream = input; + let mut total_rows = 0i64; + + while let Some(batch_result) = stream.try_next().await.transpose() { + let batch = batch_result?; + + // Track row count + total_rows += batch.num_rows() as i64; + + // Rename columns in the batch to match output schema + let renamed_batch = if !column_names.is_empty() { + RecordBatch::try_new(Arc::clone(&schema_for_write), batch.columns().to_vec()) + .map_err(|e| { + DataFusionError::Execution(format!( + "Failed to rename batch columns: {}", + e + )) + })? + } else { + batch + }; + + writer.write(&renamed_batch).await.map_err(|e| { + DataFusionError::Execution(format!("Failed to write batch: {}", e)) + })?; + } + + writer.close().await.map_err(|e| { + DataFusionError::Execution(format!("Failed to close writer: {}", e)) + })?; + + // Get file size + let file_size = std::fs::metadata(&part_file) + .map(|m| m.len() as i64) + .unwrap_or(0); + + // Update metrics with write statistics + files_written.add(1); + bytes_written.add(file_size as usize); + rows_written.add(total_rows as usize); + + // Log metadata for debugging + eprintln!( + "Wrote Parquet file: path={}, size={}, rows={}", + part_file, file_size, total_rows + ); + + // Return empty stream to indicate completion + Ok::<_, DataFusionError>(futures::stream::empty()) + }; + + // Execute the write task and create a stream that does not return any batches + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.schema(), + futures::stream::once(write_task).try_flatten(), + ))) + } } } @@ -792,6 +916,7 @@ mod tests { let output_path = "unused".to_string(); let work_dir = "hdfs://namenode:9000/user/test_parquet_writer_exec".to_string(); let column_names = vec!["id".to_string(), "name".to_string()]; + let partition_columns = vec!["id".to_string(), "name".to_string()]; let parquet_writer = ParquetWriterExec::try_new( memory_exec, @@ -802,6 +927,7 @@ mod tests { CompressionCodec::None, 0, // partition_id column_names, + partition_columns )?; // Create a session context and execute the plan diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 93fbb59c11..844d3c5ccb 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1261,6 +1261,7 @@ impl PhysicalPlanner { codec, self.partition, writer.column_names.clone(), + writer.partition_columns.clone(), )?); Ok(( diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala index 6c1d9ce5b9..b50e589b58 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala @@ -63,7 +63,7 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec } if (cmd.partitionColumns.nonEmpty || cmd.staticPartitions.nonEmpty) { - return Incompatible(Some("Partitioned writes are not supported")) + return Incompatible(Some("Partitioned writes are highly experimental")) } if (cmd.query.output.exists(attr => DataTypeSupport.isComplexType(attr.dataType))) { @@ -132,6 +132,7 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec .setOutputPath(outputPath) .setCompression(codec) .addAllColumnNames(cmd.query.output.map(_.name).asJava) + .addAllPartitionColumns(cmd.partitionColumns.map(_.name).asJava) // Note: work_dir, job_id, and task_attempt_id will be set at execution time // in CometNativeWriteExec, as they depend on the Spark task context .build() From c8eb2d49aca7659ef3074fcaf2944c47f85ab830 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Wed, 14 Jan 2026 14:38:03 -0800 Subject: [PATCH 3/8] parquet_writer --- .../src/execution/operators/parquet_writer.rs | 219 +++++++++++------- .../operator/CometDataWritingCommand.scala | 2 +- 2 files changed, 141 insertions(+), 80 deletions(-) diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index 1d2b9b27c6..2ce5c482e1 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -17,6 +17,12 @@ //! Parquet writer operator for writing RecordBatches to Parquet files +use arrow::array::{ArrayRef, AsArray}; +use arrow::compute::{ + cast, lexsort_to_indices, partition, take, Partitions, SortColumn, SortOptions, +}; +use opendal::{services::Hdfs, Operator}; +use std::collections::HashMap; use std::{ any::Any, fmt, @@ -25,10 +31,6 @@ use std::{ io::Cursor, sync::Arc, }; -use std::collections::HashMap; -use arrow::array::{ArrayRef, AsArray}; -use arrow::compute::cast; -use opendal::{services::Hdfs, Operator}; use url::Url; use arrow::datatypes::{DataType, Int16Type, Int32Type, Int64Type, Int8Type, Schema, SchemaRef}; @@ -72,8 +74,10 @@ enum ParquetWriter { } fn needs_escaping(c: char) -> bool { - matches!(c, '"' | '#' | '%' | '\'' | '*' | '/' | ':' | '=' | '?' | '\\' | '\x7F') - || c.is_control() + matches!( + c, + '"' | '#' | '%' | '\'' | '*' | '/' | ':' | '=' | '?' | '\\' | '\x7F' + ) || c.is_control() } fn escape_partition_value(value: &str) -> String { @@ -93,25 +97,24 @@ fn build_partition_path( row: usize, partition_columns: &[String], partition_indices: &[usize], -) -> Result { +) -> String { let mut path = String::new(); for (name, &idx) in partition_columns.iter().zip(partition_indices.iter()) { - let value = get_partition_value(batch.column(idx), row)?; + let value = get_partition_value(batch.column(idx), row); if !path.is_empty() { path.push('/'); } path.push_str(name); path.push('='); - path.push_str(&escape_partition_value(&value)); + path.push_str(&escape_partition_value(&value.unwrap())); } - Ok(path) + path } fn get_partition_value(array: &ArrayRef, row: usize) -> Result { if array.is_null(row) { return Ok("__HIVE_DEFAULT_PARTITION__".to_string()); - } - else{ + } else { // relying on arrow's cast op to get string representation let string_array = cast(array, &DataType::Utf8)?; Ok(string_array.as_string::().value(row).to_string()) @@ -292,33 +295,10 @@ impl ParquetWriterExec { column_names, metrics: ExecutionPlanMetricsSet::new(), cache, - partition_columns + partition_columns, }) } - fn get_partition_value(array: &ArrayRef, row: usize) -> Result { - use arrow::array::*; - use arrow::datatypes::DataType; - - if array.is_null(row) { - return Ok("__HIVE_DEFAULT_PARTITION__".to_string()); - } - - let value = match array.data_type() { - DataType::Utf8 => array.as_string::().value(row).to_string(), - DataType::Int64 => array.as_primitive::().value(row).to_string(), - DataType::Int32 => array.as_primitive::().value(row).to_string(), - DataType::Int16 => array.as_primitive::().value(row).to_string(), - DataType::Int8 => array.as_primitive::().value(row).to_string(), - DataType::Boolean => array.as_boolean().value(row).to_string(), - DataType::Utf8 => array.value(row).to_string(), - dt => return Err(DataFusionError::Execution( - format!("Unsupported partition column type: {:?}", dt) - )), - }; - Ok(value) - } - fn compression_to_parquet(&self) -> Result { match self.compression { CompressionCodec::None => Ok(Compression::UNCOMPRESSED), @@ -511,7 +491,7 @@ impl ExecutionPlan for ParquetWriterExec { self.compression.clone(), self.partition_id, self.column_names.clone(), - self.partition_columns.clone() + self.partition_columns.clone(), )?)), _ => Err(DataFusionError::Internal( "ParquetWriterExec requires exactly one child".to_string(), @@ -521,22 +501,27 @@ impl ExecutionPlan for ParquetWriterExec { fn execute( &self, - partition: usize, + partition_size: usize, context: Arc, ) -> Result { use datafusion::physical_plan::metrics::MetricBuilder; // Create metrics for tracking write statistics - let files_written = MetricBuilder::new(&self.metrics).counter("files_written", partition); - let bytes_written = MetricBuilder::new(&self.metrics).counter("bytes_written", partition); - let rows_written = MetricBuilder::new(&self.metrics).counter("rows_written", partition); - - let input = self.input.execute(partition, context)?; + let files_written = + MetricBuilder::new(&self.metrics).counter("files_written", partition_size); + let bytes_written = + MetricBuilder::new(&self.metrics).counter("bytes_written", partition_size); + let rows_written = + MetricBuilder::new(&self.metrics).counter("rows_written", partition_size); + + let input = self.input.execute(partition_size, context)?; let input_schema = self.input.schema(); let work_dir = self.work_dir.clone(); let task_attempt_id = self.task_attempt_id; let compression = self.compression_to_parquet()?; let column_names = self.column_names.clone(); + let partition_cols = self.partition_columns.clone(); + let partition_id = self.partition_id; assert_eq!(input_schema.fields().len(), column_names.len()); @@ -549,22 +534,6 @@ impl ExecutionPlan for ParquetWriterExec { .collect(); let output_schema = Arc::new(arrow::datatypes::Schema::new(fields)); - for partition_name in &self.partition_columns { - let index = input_schema.index_of(partition_name)?; - let data_type = input_schema.field(index).data_type().clone(); - } - - // Generate part file name for this partition - // If using FileCommitProtocol (work_dir is set), include task_attempt_id in the filename - let part_file = if let Some(attempt_id) = task_attempt_id { - format!( - "{}/part-{:05}-{:05}.parquet", - work_dir, self.partition_id, attempt_id - ) - } else { - format!("{}/part-{:05}.parquet", work_dir, self.partition_id) - }; - // Configure writer properties let props = WriterProperties::builder() .set_compression(compression) @@ -574,33 +543,112 @@ impl ExecutionPlan for ParquetWriterExec { let schema_for_write = Arc::clone(&output_schema); if !self.partition_columns.is_empty() { + // get partition col idx + let partition_indices: Vec = self + .partition_columns + .iter() + .map(|name| schema_for_write.index_of(name).unwrap()) + .collect(); + + // get all other col idx + let non_partition_indices: Vec = (0..schema_for_write.fields().len()) + .filter(|i| !partition_indices.contains(i)) + .collect(); + + let props = props.clone(); + let write_task = async move { let mut stream = input; let mut total_rows = 0i64; let mut writers: HashMap = HashMap::new(); - let mut partition_files: Vec = Vec::new(); while let Some(batch_result) = stream.try_next().await.transpose() { let batch = batch_result?; + total_rows += batch.num_rows() as i64; - // Step 1: Rename columns let renamed_batch = RecordBatch::try_new( Arc::clone(&schema_for_write), - batch.columns().to_vec() + batch.columns().to_vec(), )?; - // Step 2: Sort by partition columns - // ... (next step) + // sort batch by the partition columns and split them later to write into separate files + let sort_columns: Vec = partition_indices + .iter() + .map(|&idx| SortColumn { + values: Arc::clone(renamed_batch.column(idx)), + options: Some(SortOptions::default()), + }) + .collect(); + + // TODO : benchmark against row comparator + let sorted_indices = lexsort_to_indices(&sort_columns, None)?; + let sorted_batch = RecordBatch::try_new( + Arc::clone(&schema_for_write), + renamed_batch + .columns() + .iter() + .map(|col| take(col.as_ref(), &sorted_indices, None).unwrap()) + .collect(), + )?; + + let partition_columns: Vec = partition_indices + .iter() + .map(|&idx| Arc::clone(sorted_batch.column(idx))) + .collect(); - // Step 3: Get partition ranges - // ... (next step) + let partition_ranges: Partitions = partition(&partition_columns)?; - // Step 4: Write each partition slice - // ... (next step) + for partition_batch in partition_ranges.ranges() { + let record_batch: RecordBatch = sorted_batch + .slice( + partition_batch.start, + partition_batch.end - partition_batch.start, + ) + .project(&non_partition_indices) + .expect("cannot project partition columns"); + let partition_path: String = build_partition_path( + &record_batch, + 0, + &partition_cols, + &partition_indices, + ); + eprintln!("Partition path: {:?}", partition_path); + + let full_path_part_file = if let Some(attempt_id) = task_attempt_id { + format!( + "{}/{}/part-{:05}-{:05}.parquet", + work_dir, partition_path, partition_id, attempt_id + ) + } else { + format!( + "{}/{}/part-{:05}.parquet", + work_dir, partition_path, partition_id + ) + }; + eprintln!("full path: {:?}", full_path_part_file); + + if !writers.contains_key(&partition_path) { + let writer = Self::create_arrow_writer( + &full_path_part_file, + Arc::clone(&output_schema), + props.clone(), + )?; + writers.insert(partition_path.clone(), writer); + } + + // write data now + writers + .get_mut(&partition_path) + .unwrap() + .write(&record_batch) + .await + .map_err(|e| { + DataFusionError::Execution(format!("Failed to write batch: {}", e)) + })?; + } } - // Step 5: Close all writers for (_, writer) in writers { writer.close().await?; } @@ -612,9 +660,19 @@ impl ExecutionPlan for ParquetWriterExec { self.schema(), futures::stream::once(write_task).try_flatten(), ))) - } else { - let mut writer = Self::create_arrow_writer(&part_file, Arc::clone(&output_schema), props)?; + // Generate part file name for this partition + // If using FileCommitProtocol (work_dir is set), include task_attempt_id in the filename + let part_file = if let Some(attempt_id) = task_attempt_id { + format!( + "{}/part-{:05}-{:05}.parquet", + work_dir, self.partition_id, attempt_id + ) + } else { + format!("{}/part-{:05}.parquet", work_dir, self.partition_id) + }; + let mut writer = + Self::create_arrow_writer(&part_file, Arc::clone(&output_schema), props)?; // Write batches let write_task = async move { @@ -629,13 +687,16 @@ impl ExecutionPlan for ParquetWriterExec { // Rename columns in the batch to match output schema let renamed_batch = if !column_names.is_empty() { - RecordBatch::try_new(Arc::clone(&schema_for_write), batch.columns().to_vec()) - .map_err(|e| { - DataFusionError::Execution(format!( - "Failed to rename batch columns: {}", - e - )) - })? + RecordBatch::try_new( + Arc::clone(&schema_for_write), + batch.columns().to_vec(), + ) + .map_err(|e| { + DataFusionError::Execution(format!( + "Failed to rename batch columns: {}", + e + )) + })? } else { batch }; @@ -927,7 +988,7 @@ mod tests { CompressionCodec::None, 0, // partition_id column_names, - partition_columns + partition_columns, )?; // Create a session context and execute the plan diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala index b50e589b58..253991948b 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala @@ -183,7 +183,7 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec committerClass.getConstructor(classOf[String], classOf[String], classOf[Boolean]) Some( constructor - .newInstance(jobId, outputPath, isDynamicOverWriteMode) + .newInstance(jobId, outputPath, isDynamicOverWriteMode: java.lang.Boolean) .asInstanceOf[org.apache.spark.internal.io.FileCommitProtocol]) } catch { case e: Exception => From f249f9708ef9f7b9b530553d68441d46df9f6669 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Wed, 14 Jan 2026 17:14:22 -0800 Subject: [PATCH 4/8] parquet_writer --- .../src/execution/operators/parquet_writer.rs | 29 +++++++++---- .../operator/CometDataWritingCommand.scala | 6 +-- .../parquet/CometParquetWriterSuite.scala | 41 +++++++++++++------ 3 files changed, 51 insertions(+), 25 deletions(-) diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index 2ce5c482e1..8f36732eeb 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -33,7 +33,7 @@ use std::{ }; use url::Url; -use arrow::datatypes::{DataType, Int16Type, Int32Type, Int64Type, Int8Type, Schema, SchemaRef}; +use arrow::datatypes::{DataType, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; use datafusion::{ @@ -113,7 +113,7 @@ fn build_partition_path( fn get_partition_value(array: &ArrayRef, row: usize) -> Result { if array.is_null(row) { - return Ok("__HIVE_DEFAULT_PARTITION__".to_string()); + Ok("__HIVE_DEFAULT_PARTITION__".to_string()) } else { // relying on arrow's cast op to get string representation let string_array = cast(array, &DataType::Utf8)?; @@ -600,6 +600,13 @@ impl ExecutionPlan for ParquetWriterExec { let partition_ranges: Partitions = partition(&partition_columns)?; for partition_batch in partition_ranges.ranges() { + let partition_path: String = build_partition_path( + &sorted_batch, + partition_batch.start, + &partition_cols, + &partition_indices, + ); + let record_batch: RecordBatch = sorted_batch .slice( partition_batch.start, @@ -607,12 +614,6 @@ impl ExecutionPlan for ParquetWriterExec { ) .project(&non_partition_indices) .expect("cannot project partition columns"); - let partition_path: String = build_partition_path( - &record_batch, - 0, - &partition_cols, - &partition_indices, - ); eprintln!("Partition path: {:?}", partition_path); let full_path_part_file = if let Some(attempt_id) = task_attempt_id { @@ -627,17 +628,19 @@ impl ExecutionPlan for ParquetWriterExec { ) }; eprintln!("full path: {:?}", full_path_part_file); + let write_schema = Arc::new(output_schema.project(&non_partition_indices)?); if !writers.contains_key(&partition_path) { let writer = Self::create_arrow_writer( &full_path_part_file, - Arc::clone(&output_schema), + Arc::clone(&write_schema), props.clone(), )?; writers.insert(partition_path.clone(), writer); } // write data now + // TODO : Write success file in base dir after the writes are completed and also support dynamic partition overwrite writers .get_mut(&partition_path) .unwrap() @@ -646,6 +649,14 @@ impl ExecutionPlan for ParquetWriterExec { .map_err(|e| { DataFusionError::Execution(format!("Failed to write batch: {}", e)) })?; + + let file_size = std::fs::metadata(&full_path_part_file) + .map(|m| m.len() as i64) + .unwrap_or(0); + + files_written.add(1); + bytes_written.add(file_size as usize); + rows_written.add(total_rows as usize); } } diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala index 253991948b..548c88697d 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCommand, WriteFilesExec} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.comet.{CometConf, ConfigEntry, DataTypeSupport} import org.apache.comet.CometSparkSessionExtensions.withInfo @@ -169,8 +168,7 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec other } - val isDynamicOverWriteMode = cmd.partitionColumns.nonEmpty && - SQLConf.get.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC + val isDynamicOverWriteMode = cmd.partitionColumns.nonEmpty // Create FileCommitProtocol for atomic writes val jobId = java.util.UUID.randomUUID().toString @@ -183,7 +181,7 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec committerClass.getConstructor(classOf[String], classOf[String], classOf[Boolean]) Some( constructor - .newInstance(jobId, outputPath, isDynamicOverWriteMode: java.lang.Boolean) + .newInstance(jobId, outputPath, false: java.lang.Boolean) .asInstanceOf[org.apache.spark.internal.io.FileCommitProtocol]) } catch { case e: Exception => diff --git a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala index f1f5276aef..f19888b38a 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.{CometTestBase, DataFrame} import org.apache.spark.sql.comet.{CometNativeScanExec, CometNativeWriteExec} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.DataWritingCommandExec +import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf import org.apache.comet.CometConf @@ -229,10 +230,9 @@ class CometParquetWriterSuite extends CometTestBase { } } - test("parquet write with mode overwrite") { + test("partitioned parquet write") { withTempPath { dir => - val outputPath = new File(dir, "output.parquet").getAbsolutePath - + val outputPath = dir.getAbsolutePath withTempPath { inputDir => val inputPath = createTestData(inputDir) @@ -243,15 +243,32 @@ class CometParquetWriterSuite extends CometTestBase { CometConf.COMET_EXEC_ENABLED.key -> "true") { val df = spark.read.parquet(inputPath) - - // First write - df.repartition(2).write.parquet(outputPath) - // verifyWrittenFile(outputPath) - // Second write (with overwrite mode and a different record count to make sure we are not reading the same data) - df.limit(500).repartition(2).write.mode("overwrite").parquet(outputPath) - // // Verify the data was written - val resultDf = spark.read.parquet(outputPath) - assert(resultDf.count() == 500, "Expected 1000 rows after overwrite") + // Pick first column to partition by + val partCols = df.columns.take(3) + + val uniquePartitions = df.select(partCols.map(col): _*).distinct().collect() + val expectedPaths = uniquePartitions.map { row => + partCols.zipWithIndex + .map { case (colName, i) => + val value = + if (row.isNullAt(i)) "__HIVE_DEFAULT_PARTITION__" else row.get(i).toString + s"$colName=$value" + } + .mkString("/") + }.toSet + + df.write.partitionBy(partCols: _*).parquet(outputPath) + + val result = spark.read.parquet(outputPath) + val actualFiles = result.inputFiles + actualFiles.foreach { filePath => + val matchesPartition = expectedPaths.exists(p => filePath.contains(p)) + assert( + matchesPartition, + s"File $filePath doesn't match any expected partition: $expectedPaths") + } + // Verify data + assert(result.count() == 1000) } } } From 47bc3bc9b02de696a2e94279e3347f7a32d2d7f8 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Thu, 15 Jan 2026 14:50:28 -0800 Subject: [PATCH 5/8] parquet_writer --- .../operator/CometDataWritingCommand.scala | 16 +++++- .../parquet/CometParquetWriterSuite.scala | 50 ++++++++++++++++++- 2 files changed, 63 insertions(+), 3 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala index 548c88697d..0409106e0d 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala @@ -24,11 +24,13 @@ import java.util.Locale import scala.jdk.CollectionConverters._ import org.apache.spark.SparkException +import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.comet.{CometNativeExec, CometNativeWriteExec} import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCommand, WriteFilesExec} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.comet.{CometConf, ConfigEntry, DataTypeSupport} import org.apache.comet.CometSparkSessionExtensions.withInfo @@ -61,6 +63,10 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec return Unsupported(Some("Bucketed writes are not supported")) } + if (SQLConf.get.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC) { + return Unsupported(Some("Dynamic partition overwrite is not supported")) + } + if (cmd.partitionColumns.nonEmpty || cmd.staticPartitions.nonEmpty) { return Incompatible(Some("Partitioned writes are highly experimental")) } @@ -158,6 +164,14 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec val cmd = op.cmd.asInstanceOf[InsertIntoHadoopFsRelationCommand] val outputPath = cmd.outputPath.toString +// TODO : support dynamic partition overwrite + if (cmd.mode == SaveMode.Overwrite) { + val fs = cmd.outputPath.getFileSystem(SparkSession.active.sparkContext.hadoopConfiguration) + if (fs.exists(cmd.outputPath)) { + fs.delete(cmd.outputPath, true) + } + } + // Get the child plan from the WriteFilesExec or use the child directly val childPlan = op.child match { case writeFiles: WriteFilesExec => @@ -168,8 +182,6 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec other } - val isDynamicOverWriteMode = cmd.partitionColumns.nonEmpty - // Create FileCommitProtocol for atomic writes val jobId = java.util.UUID.randomUUID().toString val committer = diff --git a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala index f19888b38a..ef4ec0435f 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.{CometTestBase, DataFrame} import org.apache.spark.sql.comet.{CometNativeScanExec, CometNativeWriteExec} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.DataWritingCommandExec -import org.apache.spark.sql.functions.col +import org.apache.spark.sql.functions.{col, lit} import org.apache.spark.sql.internal.SQLConf import org.apache.comet.CometConf @@ -273,4 +273,52 @@ class CometParquetWriterSuite extends CometTestBase { } } } + + test("partitioned write - data correctness per partition") { + withTempPath { dir => + val outputPath = new File(dir, "output").getAbsolutePath + + withTempPath { inputDir => + val inputPath = createTestData(inputDir) + + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.getOperatorAllowIncompatConfigKey( + classOf[DataWritingCommandExec]) -> "true") { + + val inputDf = spark.read.parquet(inputPath).filter(col("c1") <= lit(10)) + val partCols = inputDf.columns.take(2) + val col1 = partCols(0) + val col2 = partCols(1) + + inputDf.write.partitionBy(partCols: _*).parquet(outputPath) + + // unique combinations + val combinations = inputDf + .select(partCols.head, partCols.last) + .distinct() + .collect() + .map(r => (r.getBoolean(0), r.getByte(1))) + + combinations.foreach { tuple => + val val1 = tuple._1 + val val2 = tuple._2 + + val partitionPath = s"$outputPath/${partCols.head}=$val1/${partCols.last}=$val2" + + val actualDf = spark.read.parquet(partitionPath) + val expectedDf = inputDf + .filter(col(col1) === val1) + .filter(col(col2) === val2) + .drop(col1, col2) + + checkAnswer(actualDf, expectedDf) + } + + // Verify total count as well + checkAnswer(spark.read.parquet(outputPath), inputDf) + } + } + } + } } From 82b27d08f3c90f2a547527ee98602c5bc6f34eb8 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Fri, 16 Jan 2026 10:59:29 -0800 Subject: [PATCH 6/8] parquet_writer --- .../src/execution/operators/parquet_writer.rs | 32 ++++--------------- 1 file changed, 7 insertions(+), 25 deletions(-) diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index f923dee2b5..2f93c0b93a 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -17,12 +17,6 @@ //! Parquet writer operator for writing RecordBatches to Parquet files -use arrow::array::{ArrayRef, AsArray}; -use arrow::compute::{ - cast, lexsort_to_indices, partition, take, Partitions, SortColumn, SortOptions, -}; -use opendal::{services::Hdfs, Operator}; -use std::collections::HashMap; use std::{ any::Any, collections::HashMap, @@ -32,20 +26,15 @@ use std::{ io::Cursor, sync::Arc, }; - - -use url::Url; - -use arrow::datatypes::{DataType, Schema, SchemaRef}; - - +use arrow::array::{ArrayRef, AsArray}; +use arrow::compute::{cast, lexsort_to_indices, partition, take, Partitions, SortColumn, SortOptions}; use opendal::Operator; use crate::execution::shuffle::CompressionCodec; use crate::parquet::parquet_support::{ create_hdfs_operator, is_hdfs_scheme, prepare_object_store_with_configs, }; - +use arrow::datatypes::{DataType, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; use datafusion::{ @@ -516,12 +505,12 @@ impl ExecutionPlan for ParquetWriterExec { use datafusion::physical_plan::metrics::MetricBuilder; // Create metrics for tracking write statistics - let files_written = MetricBuilder::new(&self.metrics).counter("files_written", partition); - let bytes_written = MetricBuilder::new(&self.metrics).counter("bytes_written", partition); - let rows_written = MetricBuilder::new(&self.metrics).counter("rows_written", partition); + let files_written = MetricBuilder::new(&self.metrics).counter("files_written", partition_size); + let bytes_written = MetricBuilder::new(&self.metrics).counter("bytes_written", partition_size); + let rows_written = MetricBuilder::new(&self.metrics).counter("rows_written", partition_size); let runtime_env = context.runtime_env(); - let input = self.input.execute(partition, context)?; + let input = self.input.execute(partition_size, context)?; let input_schema = self.input.schema(); let work_dir = self.work_dir.clone(); let task_attempt_id = self.task_attempt_id; @@ -547,13 +536,6 @@ impl ExecutionPlan for ParquetWriterExec { .build(); let object_store_options = self.object_store_options.clone(); - let mut writer = Self::create_arrow_writer( - &part_file, - Arc::clone(&output_schema), - props, - runtime_env, - &object_store_options, - )?; // Clone schema for use in async closure let schema_for_write = Arc::clone(&output_schema); From 252f47cb702fdbdadde75d9e81af1da2dc267c4a Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Fri, 16 Jan 2026 11:00:10 -0800 Subject: [PATCH 7/8] parquet_writer --- .../src/execution/operators/parquet_writer.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index 2f93c0b93a..e0dd754083 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -17,6 +17,11 @@ //! Parquet writer operator for writing RecordBatches to Parquet files +use arrow::array::{ArrayRef, AsArray}; +use arrow::compute::{ + cast, lexsort_to_indices, partition, take, Partitions, SortColumn, SortOptions, +}; +use opendal::Operator; use std::{ any::Any, collections::HashMap, @@ -26,9 +31,6 @@ use std::{ io::Cursor, sync::Arc, }; -use arrow::array::{ArrayRef, AsArray}; -use arrow::compute::{cast, lexsort_to_indices, partition, take, Partitions, SortColumn, SortOptions}; -use opendal::Operator; use crate::execution::shuffle::CompressionCodec; use crate::parquet::parquet_support::{ @@ -505,9 +507,12 @@ impl ExecutionPlan for ParquetWriterExec { use datafusion::physical_plan::metrics::MetricBuilder; // Create metrics for tracking write statistics - let files_written = MetricBuilder::new(&self.metrics).counter("files_written", partition_size); - let bytes_written = MetricBuilder::new(&self.metrics).counter("bytes_written", partition_size); - let rows_written = MetricBuilder::new(&self.metrics).counter("rows_written", partition_size); + let files_written = + MetricBuilder::new(&self.metrics).counter("files_written", partition_size); + let bytes_written = + MetricBuilder::new(&self.metrics).counter("bytes_written", partition_size); + let rows_written = + MetricBuilder::new(&self.metrics).counter("rows_written", partition_size); let runtime_env = context.runtime_env(); let input = self.input.execute(partition_size, context)?; From 52ca86c92bde751586ff18205187f7d6d2ba382d Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Sun, 25 Jan 2026 11:27:32 -0800 Subject: [PATCH 8/8] fix_build_failures --- native/core/src/execution/operators/parquet_writer.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index e0dd754083..b7b45aef99 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -539,9 +539,7 @@ impl ExecutionPlan for ParquetWriterExec { let props = WriterProperties::builder() .set_compression(compression) .build(); - - let object_store_options = self.object_store_options.clone(); - + // Clone schema for use in async closure let schema_for_write = Arc::clone(&output_schema); @@ -638,6 +636,8 @@ impl ExecutionPlan for ParquetWriterExec { &full_path_part_file, Arc::clone(&write_schema), props.clone(), + runtime_env.clone(), + &HashMap::new(), )?; writers.insert(partition_path.clone(), writer); } @@ -686,7 +686,8 @@ impl ExecutionPlan for ParquetWriterExec { format!("{}/part-{:05}.parquet", work_dir, self.partition_id) }; let mut writer = - Self::create_arrow_writer(&part_file, Arc::clone(&output_schema), props)?; + Self::create_arrow_writer(&part_file, Arc::clone(&output_schema), props,runtime_env, + &HashMap::new(),)?; // Write batches let write_task = async move {