Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 0 additions & 71 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2181,77 +2181,6 @@ mod tests {
assert_eq!(requests.lock().unwrap().len(), 3);
}

#[tokio::test]
async fn test_cache_projection_excludes_nested_columns() {
use arrow_array::{ArrayRef, StringArray};

// Build a simple RecordBatch with a primitive column `a` and a nested struct column `b { aa, bb }`
let a = StringArray::from_iter_values(["r1", "r2"]);
let b = StructArray::from(vec![
(
Arc::new(Field::new("aa", DataType::Utf8, true)),
Arc::new(StringArray::from_iter_values(["v1", "v2"])) as ArrayRef,
),
(
Arc::new(Field::new("bb", DataType::Utf8, true)),
Arc::new(StringArray::from_iter_values(["w1", "w2"])) as ArrayRef,
),
]);

let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, true),
Field::new("b", b.data_type().clone(), true),
]));

let mut buf = Vec::new();
let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap();
let batch = RecordBatch::try_from_iter([
("a", Arc::new(a) as ArrayRef),
("b", Arc::new(b) as ArrayRef),
])
.unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();

// Load Parquet metadata
let data: Bytes = buf.into();
let metadata = ParquetMetaDataReader::new()
.parse_and_finish(&data)
.unwrap();
let metadata = Arc::new(metadata);

// Build a RowFilter whose predicate projects a leaf under the nested root `b`
// Leaf indices are depth-first; with schema [a, b.aa, b.bb] we pick index 1 (b.aa)
let parquet_schema = metadata.file_metadata().schema_descr();
let nested_leaf_mask = ProjectionMask::leaves(parquet_schema, vec![1]);

let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(), |batch: RecordBatch| {
Ok(arrow_array::BooleanArray::from(vec![
true;
batch.num_rows()
]))
});
let filter = RowFilter::new(vec![Box::new(always_true)]);

// Construct a ReaderFactory and compute cache projection
let reader_factory = ReaderFactory {
metadata: Arc::clone(&metadata),
fields: None,
input: TestReader::new(data),
filter: Some(filter),
limit: None,
offset: None,
metrics: ArrowReaderMetrics::disabled(),
max_predicate_cache_size: 0,
};

// Provide an output projection that also selects the same nested leaf
let cache_projection = reader_factory.compute_cache_projection(&nested_leaf_mask);

// Expect None since nested columns should be excluded from cache projection
assert!(cache_projection.is_none());
}

#[tokio::test]
#[allow(deprecated)]
async fn empty_offset_index_doesnt_panic_in_read_row_group() {
Expand Down
85 changes: 84 additions & 1 deletion parquet/tests/arrow_reader/predicate_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use arrow::compute::and;
use arrow::compute::kernels::cmp::{gt, lt};
use arrow_array::cast::AsArray;
use arrow_array::types::Int64Type;
use arrow_array::{RecordBatch, StringViewArray};
use arrow_array::{RecordBatch, StringArray, StringViewArray, StructArray};
use arrow_schema::{DataType, Field};
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt};
Expand Down Expand Up @@ -80,6 +81,19 @@ async fn test_cache_disabled_with_filters() {
test.run_async(async_builder).await;
}

#[tokio::test]
async fn test_cache_projection_excludes_nested_columns() {
let test = ParquetPredicateCacheTest::new_nested().with_expected_records_read_from_cache(0);

let sync_builder = test.sync_builder();
let sync_builder = test.add_nested_filter(sync_builder);
test.run_sync(sync_builder);

let async_builder = test.async_builder().await;
let async_builder = test.add_nested_filter(async_builder);
test.run_async(async_builder).await;
}

// -- Begin test infrastructure --

/// A test parquet file
Expand All @@ -104,6 +118,18 @@ impl ParquetPredicateCacheTest {
}
}

/// Create a new `TestParquetFile` with
/// 2 columns:
///
/// * string column `a`
/// * nested struct column `b { aa, bb }`
fn new_nested() -> Self {
Self {
bytes: NESTED_TEST_FILE_DATA.clone(),
expected_records_read_from_cache: 0,
}
}

/// Set the expected number of records read from the cache
fn with_expected_records_read_from_cache(
mut self,
Expand Down Expand Up @@ -154,6 +180,27 @@ impl ParquetPredicateCacheTest {
.with_row_filter(RowFilter::new(vec![Box::new(row_filter)]))
}

/// Add a filter on the nested leaf nodes
fn add_nested_filter<T>(&self, builder: ArrowReaderBuilder<T>) -> ArrowReaderBuilder<T> {
let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();

// Build a RowFilter whose predicate projects a leaf under the nested root `b`
// Leaf indices are depth-first; with schema [a, b.aa, b.bb] we pick index 1 (b.aa)
let nested_leaf_mask = ProjectionMask::leaves(&schema_descr, vec![1]);

let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(), |batch: RecordBatch| {
Ok(arrow_array::BooleanArray::from(vec![
true;
batch.num_rows()
]))
});
let row_filter = RowFilter::new(vec![Box::new(always_true)]);

builder
.with_projection(nested_leaf_mask)
.with_row_filter(row_filter)
}

/// Build the reader from the specified builder, reading all batches from it,
/// and asserts the
fn run_sync(&self, builder: ParquetRecordBatchReaderBuilder<Bytes>) {
Expand Down Expand Up @@ -239,6 +286,42 @@ static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
Bytes::from(output)
});

/// Build a ParquetFile with a
///
/// * string column `a`
/// * nested struct column `b { aa, bb }`
static NESTED_TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
const NUM_ROWS: usize = 100;
let a: StringArray = (0..NUM_ROWS).map(|i| Some(format!("r{i}"))).collect();

let aa: StringArray = (0..NUM_ROWS).map(|i| Some(format!("v{i}"))).collect();
let bb: StringArray = (0..NUM_ROWS).map(|i| Some(format!("w{i}"))).collect();
let b = StructArray::from(vec![
(
Arc::new(Field::new("aa", DataType::Utf8, true)),
Arc::new(aa) as ArrayRef,
),
(
Arc::new(Field::new("bb", DataType::Utf8, true)),
Arc::new(bb) as ArrayRef,
),
]);

let input_batch = RecordBatch::try_from_iter([
("a", Arc::new(a) as ArrayRef),
("b", Arc::new(b) as ArrayRef),
])
.unwrap();

let mut output = Vec::new();
let writer_options = None;
let mut writer =
ArrowWriter::try_new(&mut output, input_batch.schema(), writer_options).unwrap();
writer.write(&input_batch).unwrap();
writer.close().unwrap();
Bytes::from(output)
});

/// Copy paste version of the `AsyncFileReader` trait for testing purposes 🤮
/// TODO put this in a common place
#[derive(Clone)]
Expand Down
Loading