Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 93 additions & 2 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,14 @@ pub struct AsyncReader<T>(T);
/// to use this information to select what specific columns, row groups, etc.
/// they wish to be read by the resulting stream.
///
/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
/// See examples on [`ParquetRecordBatchStreamBuilder::new`], including how to
/// issue multiple I/O requests in parallel using multiple streams.
///
/// # See also:
/// * [`ParquetPushDecoderBuilder`] for lower level control over buffering and
/// decoding.
/// * [`ParquetRecordBatchStream::next_row_group`] for I/O prefetching
///
///
/// See [`ArrowReaderBuilder`] for additional member functions
pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;
Expand All @@ -224,6 +231,11 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
/// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
/// specified source.
///
/// # Examples:
/// * [Basic example reading from an async source](#example)
/// * [Configuring options and reading metadata](#example-configuring-options-and-reading-metadata)
/// * [Reading Row Groups in Parallel](#example-reading-row-groups-in-parallel)
///
/// # Example
/// ```
/// # #[tokio::main(flavor="current_thread")]
Expand Down Expand Up @@ -282,7 +294,7 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
/// # }
/// ```
///
/// # Example configuring options and reading metadata
/// # Example Configuring Options and Reading Metadata
///
/// There are many options that control the behavior of the reader, such as
/// `with_batch_size`, `with_projection`, `with_filter`, etc...
Expand Down Expand Up @@ -351,6 +363,85 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
/// assert_eq!(results.len(), 3);
/// # }
/// ```
///
/// # Example reading Row Groups in Parallel
///
/// Each [`ParquetRecordBatchStream`] is independent and can be used to read
/// from the same underlying source in parallel. Use
/// [`ParquetRecordBatchStream::next_row_group`] with a single stream to
/// begin prefetching the next Row Group. To read a read in parallel, create
/// a stream for each subset of the file. For example, you can read each
/// row group in parallel by creating a stream for each row group using the
/// [`ParquetRecordBatchStreamBuilder::with_row_groups`] API as shown below
///
/// ```
/// # use std::sync::Arc;
/// # use arrow_array::{ArrayRef, Int32Array, RecordBatch};
/// # use arrow::util::pretty::pretty_format_batches;
/// # use futures::{StreamExt, TryStreamExt};
/// # use tempfile::NamedTempFile;
/// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask};
/// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
/// # use parquet::file::metadata::ParquetMetaDataReader;
/// # use parquet::file::properties::{WriterProperties};
/// # // write to a temporary file with 10 RowGroups and read back with async API
/// # fn write_file() -> parquet::errors::Result<NamedTempFile> {
/// # let mut file = NamedTempFile::new().unwrap();
/// # let small_batch = RecordBatch::try_from_iter([
/// # ("id", Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef),
/// # ]).unwrap();
/// # let props = WriterProperties::builder()
/// # .set_max_row_group_row_count(Some(5))
/// # .set_write_batch_size(5)
/// # .build();
/// # let mut writer = ArrowWriter::try_new(&mut file, small_batch.schema(), Some(props))?;
/// # for i in 0..10 {
/// # writer.write(&small_batch)?
/// # };
/// # writer.close()?;
/// # Ok(file)
/// # }
/// # #[tokio::main(flavor="current_thread")]
/// # async fn main() -> parquet::errors::Result<()> {
/// # let t = write_file()?;
/// # let path = t.path();
/// // This example uses a tokio::fs::File as the async source, but it
/// // could be any async source such as an object store reader)
/// let mut file = tokio::fs::File::open(path).await?;
/// // To read Row Groups in parallel, create a separate stream builder for each Row Group.
/// // First get the metadata to find the row group information
/// let file_size = file.metadata().await?.len();
/// let metadata = ParquetMetaDataReader::new().load_and_finish(&mut file, file_size).await?;
/// assert_eq!(metadata.num_row_groups(), 10); // file has 10 row groups with 5 rows each
/// // Create a stream reader for each row group
/// let reader_metadata = ArrowReaderMetadata::try_new(
/// Arc::new(metadata),
/// ArrowReaderOptions::new()
/// )?;
/// let mut streams = vec![];
/// for row_group_index in 0..10 {
/// // each stream needs its own source instance to issue parallel IO requests, so we clone the file for each stream
/// let this_file = file.try_clone().await?;
/// let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
/// this_file,
/// reader_metadata.clone()
/// )
/// .with_row_groups(vec![row_group_index]) // read only this row group
/// .build()?;
/// streams.push(stream);
/// }
/// // Each reader can now be polled independently and in parallel, for
/// // example using StreamExt::buffered to read from 3 at a time
/// let results = futures::stream::iter(streams)
/// .map(|stream| async move { stream })
/// .buffered(3)
/// .flatten()
/// .try_collect::<Vec<_>>().await?;
/// // read all 50 rows (10 row groups x 5 rows per group)
/// assert_eq!(50, results.iter().map(|s| s.num_rows()).sum::<usize>());
/// # Ok(())
/// # }
/// ```
pub async fn new(input: T) -> Result<Self> {
Self::new_with_options(input, Default::default()).await
}
Expand Down
Loading