Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 119 additions & 3 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
use arrow::array::RecordBatch;
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
use std::pin::Pin;
use std::pin::{pin, Pin};
use std::sync::Arc;
use std::task::{Context, Poll};

Expand All @@ -47,14 +47,17 @@ use datafusion_pruning::{build_pruning_predicate, FilePruner, PruningPredicate};

#[cfg(feature = "parquet_encryption")]
use datafusion_common::config::EncryptionFactoryOptions;
use datafusion_common_runtime::SpawnedTask;
#[cfg(feature = "parquet_encryption")]
use datafusion_execution::parquet_encryption::EncryptionFactory;
use futures::{ready, Stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use log::debug;
use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::arrow_reader::{
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
};
use parquet::arrow::async_reader::{AsyncFileReader, ParquetRecordBatchStream};
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};

Expand Down Expand Up @@ -449,6 +452,9 @@ impl FileOpener for ParquetOpener {
.with_metrics(arrow_reader_metrics.clone())
.build()?;

// eagerly prefetch row groups
let stream = EagerRowGroupPrefetchStream::new(stream);

let files_ranges_pruned_statistics =
file_metrics.files_ranges_pruned_statistics.clone();
let predicate_cache_inner_records =
Expand Down Expand Up @@ -496,6 +502,116 @@ fn copy_arrow_reader_metrics(
}
}

/// Eagerly prefetches the next RowGroup from the underlying stream
struct EagerRowGroupPrefetchStream<T> {
/// Outstanding prefetch state
state: EagerPrefetchState<T>,
/// Active reader, if any
parquet_record_batch_reader: Option<ParquetRecordBatchReader>,
}

struct PrefetchResult<T> {
stream: ParquetRecordBatchStream<T>,
reader: Option<ParquetRecordBatchReader>,
}

enum EagerPrefetchState<T> {
/// Trying to open the next RowGroup in a new task
Prefetching(SpawnedTask<Result<PrefetchResult<T>>>),
Done,
}

impl<T> EagerPrefetchState<T>
where
T: AsyncFileReader + Unpin + Send + 'static,
{
/// Begin fetching the next row group, if any
fn next_row_group(mut stream: ParquetRecordBatchStream<T>) -> Self {
let task = SpawnedTask::spawn(async move {
let reader = stream.next_row_group().await?;
let result = PrefetchResult { stream, reader };
Ok(result)
});
Self::Prefetching(task)
}
}

impl<T> EagerRowGroupPrefetchStream<T>
where
T: AsyncFileReader + Unpin + Send + 'static,
{
pub fn new(stream: ParquetRecordBatchStream<T>) -> Self {
Self {
state: EagerPrefetchState::next_row_group(stream),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in real code, this shouldn't start until the first batch is requested I suspect

parquet_record_batch_reader: None,
}
}
}

impl<T> Stream for EagerRowGroupPrefetchStream<T>
where
T: AsyncFileReader + Unpin + Send + 'static,
{
type Item = Result<RecordBatch>;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
loop {
// If we have an active reader, try to read from it first
if let Some(mut reader) = self.parquet_record_batch_reader.take() {
match reader.next() {
Some(result) => {
// Return the batch
self.parquet_record_batch_reader = Some(reader);
let result = result.map_err(DataFusionError::from);
return Poll::Ready(Some(result));
}
None => {
// Reader is exhausted, continue to prefetching the next row group
}
}
}

use futures::Future;

match &mut self.state {
EagerPrefetchState::Prefetching(handle) => {
// check if the inner is ready
let handle = pin!(handle);
match ready!(handle.poll(cx)) {
Ok(Ok(result)) => {
let PrefetchResult { stream, reader } = result;
// no reader means end of stream
if reader.is_none() {
self.state = EagerPrefetchState::Done;
} else {
// immediately start reading the next row group
self.state = EagerPrefetchState::next_row_group(stream);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might even make this configurable (i.e. prefetch >=2 row groups?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call -- I will look into that. I am not sure buffering more than 1 row group will really help (if you have more than one reader buffered it means the rest of the plan can't consume the data fast enough (and we need to apply backpressure)

}
self.parquet_record_batch_reader = reader;
}
Ok(Err(err)) => {
// error during prefetch, return it to the caller
return Poll::Ready(Some(Err(err)));
}
Err(e) => {
return Poll::Ready(Some(exec_err!(
"Eager prefetch task panicked: {e}"
)));
}
}
}
EagerPrefetchState::Done => {
// stream is exhausted
return Poll::Ready(None);
}
}
}
}
}

/// Wraps an inner RecordBatchStream and a [`FilePruner`]
///
/// This can terminate the scan early when some dynamic filters is updated after
Expand Down
Loading