From 69d7d8872d9a5056297f4a5c58f9d41f137c5fc5 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 11 Feb 2026 16:40:03 -0500 Subject: [PATCH 1/2] Docs: add exmaple of how to read parquet row groups in parallel --- parquet/src/arrow/async_reader/mod.rs | 95 ++++++++++++++++++++++++++- 1 file changed, 93 insertions(+), 2 deletions(-) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 9e45a0c3168..776d259bc43 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -215,7 +215,14 @@ pub struct AsyncReader(T); /// to use this information to select what specific columns, row groups, etc. /// they wish to be read by the resulting stream. /// -/// See examples on [`ParquetRecordBatchStreamBuilder::new`] +/// See examples on [`ParquetRecordBatchStreamBuilder::new`], including how to +/// issue multiple I/O requests in parallel using multiple streams. +/// +/// # See also: +/// * [`ParquetPushDecoderBuilder`] for lower level control over buffering and +/// decoding. +/// * [`ParquetRecordBatchStream::next_row_group`] for I/O prefetching +/// /// /// See [`ArrowReaderBuilder`] for additional member functions pub type ParquetRecordBatchStreamBuilder = ArrowReaderBuilder>; @@ -224,6 +231,11 @@ impl ParquetRecordBatchStreamBuilder { /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the /// specified source. /// + /// # Examples: + /// * [Basic example reading from an async source](#example) + /// * [Configuring options and reading metadata](#example-configuring-options-and-reading-metadata) + /// * [Reading Row Groups in Parallel](#example-reading-row-groups-in-parallel) + /// /// # Example /// ``` /// # #[tokio::main(flavor="current_thread")] @@ -282,7 +294,7 @@ impl ParquetRecordBatchStreamBuilder { /// # } /// ``` /// - /// # Example configuring options and reading metadata + /// # Example Configuring Options and Reading Metadata /// /// There are many options that control the behavior of the reader, such as /// `with_batch_size`, `with_projection`, `with_filter`, etc... @@ -351,6 +363,85 @@ impl ParquetRecordBatchStreamBuilder { /// assert_eq!(results.len(), 3); /// # } /// ``` + /// + /// # Example reading Row Groups in Parallel + /// + /// Each [`ParquetRecordBatchStream`] is independent and can be used to read + /// from the same underlying source in parallel. Use + /// [`ParquetRecordBatchStream::next_row_group`] with a single stream to + /// begin prefetching the next Row Group. To read a read in parallel, create + /// a stream for each subset of the file. For example, you can read each + /// row group in parallel by creating a stream for each row group using the + /// [`ParquetRecordBatchStreamBuilder::with_row_groups`] API as shown below + /// + /// ``` + /// # use std::sync::Arc; + /// # use arrow_array::{ArrayRef, Int32Array, RecordBatch}; + /// # use arrow::util::pretty::pretty_format_batches; + /// # use futures::{StreamExt, TryStreamExt}; + /// # use tempfile::NamedTempFile; + /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask}; + /// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; + /// # use parquet::file::metadata::ParquetMetaDataReader; + /// # use parquet::file::properties::{WriterProperties}; + /// # // write to a temporary file with 10 RowGroups and read back with async API + /// # fn write_file() -> parquet::errors::Result { + /// # let mut file = NamedTempFile::new().unwrap(); + /// # let small_batch = RecordBatch::try_from_iter([ + /// # ("id", Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef), + /// # ]).unwrap(); + /// # let props = WriterProperties::builder() + /// # .set_max_row_group_row_count(Some(5)) + /// # .set_write_batch_size(5) + /// # .build(); + /// # let mut writer = ArrowWriter::try_new(&mut file, small_batch.schema(), Some(props))?; + /// # for i in 0..10 { + /// # writer.write(&small_batch)? + /// # }; + /// # writer.close()?; + /// # Ok(file) + /// # } + /// # #[tokio::main(flavor="current_thread")] + /// # async fn main() -> parquet::errors::Result<()> { + /// # let t = write_file()?; + /// # let path = t.path(); + /// // This example uses a tokio::fs::File as the async source, but it + /// // could be any async source such as an object store reader) + /// let mut file = tokio::fs::File::open(path).await?; + /// // To read Row Groups in parallel, create a separate stream builder for each Row Group. + /// // First get the metadata to find the row group information + /// let file_size = file.metadata().await?.len(); + /// let metadata = ParquetMetaDataReader::new().load_and_finish(&mut file, file_size).await?; + /// assert_eq!(metadata.num_row_groups(), 10); // file has 10 row groups with 5 rows each + /// // Create a stream reader for each row group + /// let reader_metadata = ArrowReaderMetadata::try_new( + /// Arc::new(metadata), + /// ArrowReaderOptions::new() + /// )?; + /// let mut streams = vec![]; + /// for row_group_index in 0..10 { + /// // each stream needs its own source instance to issue parallel IO requests, so we clone the file for each stream + /// let this_file = file.try_clone().await?; + /// let stream = ParquetRecordBatchStreamBuilder::new_with_metadata( + /// this_file, + /// reader_metadata.clone() + /// ) + /// .with_row_groups(vec![row_group_index]) // read only this row group + /// .build()?; + /// streams.push(stream); + /// } + /// // Each reader can now be polled independently and in parallel, for + /// // example using StreamExt::buffered to read from 3 at a time + /// let results = futures::stream::iter(streams) + /// .map(|stream| async move { stream }) + /// .buffered(3) + /// .flatten() + /// .try_collect::>().await?; + /// // read all 50 rows (10 row groups x 5 rows per group) + /// assert_eq!(50, results.iter().map(|s| s.num_rows()).sum::()); + /// # Ok(()) + /// # } + /// ``` pub async fn new(input: T) -> Result { Self::new_with_options(input, Default::default()).await } From fe1d43b329f004d8c2885513bf66dcff773a96ff Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 11 Feb 2026 17:33:56 -0500 Subject: [PATCH 2/2] clippy, you cruel master --- parquet/src/arrow/async_reader/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 776d259bc43..f7f4f053d4e 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -220,7 +220,7 @@ pub struct AsyncReader(T); /// /// # See also: /// * [`ParquetPushDecoderBuilder`] for lower level control over buffering and -/// decoding. +/// decoding. /// * [`ParquetRecordBatchStream::next_row_group`] for I/O prefetching /// ///