Skip to content

Commit e773e91

Browse files
committed
TEST prefetching Row Groups using next_reader API in parquet-rs
1 parent 607325a commit e773e91

File tree

1 file changed

+119
-3
lines changed

1 file changed

+119
-3
lines changed

datafusion/datasource-parquet/src/opener.rs

Lines changed: 119 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::{
2626
use arrow::array::RecordBatch;
2727
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
2828
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
29-
use std::pin::Pin;
29+
use std::pin::{pin, Pin};
3030
use std::sync::Arc;
3131
use std::task::{Context, Poll};
3232

@@ -47,14 +47,17 @@ use datafusion_pruning::{build_pruning_predicate, FilePruner, PruningPredicate};
4747

4848
#[cfg(feature = "parquet_encryption")]
4949
use datafusion_common::config::EncryptionFactoryOptions;
50+
use datafusion_common_runtime::SpawnedTask;
5051
#[cfg(feature = "parquet_encryption")]
5152
use datafusion_execution::parquet_encryption::EncryptionFactory;
5253
use futures::{ready, Stream, StreamExt, TryStreamExt};
5354
use itertools::Itertools;
5455
use log::debug;
5556
use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
56-
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
57-
use parquet::arrow::async_reader::AsyncFileReader;
57+
use parquet::arrow::arrow_reader::{
58+
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
59+
};
60+
use parquet::arrow::async_reader::{AsyncFileReader, ParquetRecordBatchStream};
5861
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
5962
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};
6063

@@ -449,6 +452,9 @@ impl FileOpener for ParquetOpener {
449452
.with_metrics(arrow_reader_metrics.clone())
450453
.build()?;
451454

455+
// eagerly prefetch row groups
456+
let stream = EagerRowGroupPrefetchStream::new(stream);
457+
452458
let files_ranges_pruned_statistics =
453459
file_metrics.files_ranges_pruned_statistics.clone();
454460
let predicate_cache_inner_records =
@@ -496,6 +502,116 @@ fn copy_arrow_reader_metrics(
496502
}
497503
}
498504

505+
/// Eagerly prefetches the next RowGroup from the underlying stream
506+
struct EagerRowGroupPrefetchStream<T> {
507+
/// Outstanding prefetch state
508+
state: EagerPrefetchState<T>,
509+
/// Active reader, if any
510+
parquet_record_batch_reader: Option<ParquetRecordBatchReader>,
511+
}
512+
513+
struct PrefetchResult<T> {
514+
stream: ParquetRecordBatchStream<T>,
515+
reader: Option<ParquetRecordBatchReader>,
516+
}
517+
518+
enum EagerPrefetchState<T> {
519+
/// Trying to open the next RowGroup in a new task
520+
Prefetching(SpawnedTask<Result<PrefetchResult<T>>>),
521+
Done,
522+
}
523+
524+
impl<T> EagerPrefetchState<T>
525+
where
526+
T: AsyncFileReader + Unpin + Send + 'static,
527+
{
528+
/// Begin fetching the next row group, if any
529+
fn next_row_group(mut stream: ParquetRecordBatchStream<T>) -> Self {
530+
let task = SpawnedTask::spawn(async move {
531+
let reader = stream.next_row_group().await?;
532+
let result = PrefetchResult { stream, reader };
533+
Ok(result)
534+
});
535+
Self::Prefetching(task)
536+
}
537+
}
538+
539+
impl<T> EagerRowGroupPrefetchStream<T>
540+
where
541+
T: AsyncFileReader + Unpin + Send + 'static,
542+
{
543+
pub fn new(stream: ParquetRecordBatchStream<T>) -> Self {
544+
Self {
545+
state: EagerPrefetchState::next_row_group(stream),
546+
parquet_record_batch_reader: None,
547+
}
548+
}
549+
}
550+
551+
impl<T> Stream for EagerRowGroupPrefetchStream<T>
552+
where
553+
T: AsyncFileReader + Unpin + Send + 'static,
554+
{
555+
type Item = Result<RecordBatch>;
556+
557+
fn poll_next(
558+
mut self: Pin<&mut Self>,
559+
cx: &mut Context<'_>,
560+
) -> Poll<Option<Self::Item>> {
561+
loop {
562+
// If we have an active reader, try to read from it first
563+
if let Some(mut reader) = self.parquet_record_batch_reader.take() {
564+
match reader.next() {
565+
Some(result) => {
566+
// Return the batch
567+
self.parquet_record_batch_reader = Some(reader);
568+
let result = result.map_err(DataFusionError::from);
569+
return Poll::Ready(Some(result));
570+
}
571+
None => {
572+
// Reader is exhausted, continue to prefetching the next row group
573+
}
574+
}
575+
}
576+
577+
use futures::Future;
578+
579+
match &mut self.state {
580+
EagerPrefetchState::Prefetching(handle) => {
581+
// check if the inner is ready
582+
let handle = pin!(handle);
583+
match ready!(handle.poll(cx)) {
584+
Ok(Ok(result)) => {
585+
let PrefetchResult { stream, reader } = result;
586+
// no reader means end of stream
587+
if reader.is_none() {
588+
self.state = EagerPrefetchState::Done;
589+
} else {
590+
// immediately start reading the next row group
591+
self.state = EagerPrefetchState::next_row_group(stream);
592+
}
593+
self.parquet_record_batch_reader = reader;
594+
}
595+
Ok(Err(err)) => {
596+
// error during prefetch, return it to the caller
597+
return Poll::Ready(Some(Err(err)));
598+
}
599+
Err(e) => {
600+
return Poll::Ready(Some(exec_err!(
601+
"Eager prefetch task panicked: {e}"
602+
)));
603+
}
604+
}
605+
}
606+
EagerPrefetchState::Done => {
607+
// stream is exhausted
608+
return Poll::Ready(None);
609+
}
610+
}
611+
}
612+
}
613+
}
614+
499615
/// Wraps an inner RecordBatchStream and a [`FilePruner`]
500616
///
501617
/// This can terminate the scan early when some dynamic filters is updated after

0 commit comments

Comments
 (0)