diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 9b81e8e56905..c5badea7f32c 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -2181,77 +2181,6 @@ mod tests { assert_eq!(requests.lock().unwrap().len(), 3); } - #[tokio::test] - async fn test_cache_projection_excludes_nested_columns() { - use arrow_array::{ArrayRef, StringArray}; - - // Build a simple RecordBatch with a primitive column `a` and a nested struct column `b { aa, bb }` - let a = StringArray::from_iter_values(["r1", "r2"]); - let b = StructArray::from(vec![ - ( - Arc::new(Field::new("aa", DataType::Utf8, true)), - Arc::new(StringArray::from_iter_values(["v1", "v2"])) as ArrayRef, - ), - ( - Arc::new(Field::new("bb", DataType::Utf8, true)), - Arc::new(StringArray::from_iter_values(["w1", "w2"])) as ArrayRef, - ), - ]); - - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Utf8, true), - Field::new("b", b.data_type().clone(), true), - ])); - - let mut buf = Vec::new(); - let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap(); - let batch = RecordBatch::try_from_iter([ - ("a", Arc::new(a) as ArrayRef), - ("b", Arc::new(b) as ArrayRef), - ]) - .unwrap(); - writer.write(&batch).unwrap(); - writer.close().unwrap(); - - // Load Parquet metadata - let data: Bytes = buf.into(); - let metadata = ParquetMetaDataReader::new() - .parse_and_finish(&data) - .unwrap(); - let metadata = Arc::new(metadata); - - // Build a RowFilter whose predicate projects a leaf under the nested root `b` - // Leaf indices are depth-first; with schema [a, b.aa, b.bb] we pick index 1 (b.aa) - let parquet_schema = metadata.file_metadata().schema_descr(); - let nested_leaf_mask = ProjectionMask::leaves(parquet_schema, vec![1]); - - let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(), |batch: RecordBatch| { - Ok(arrow_array::BooleanArray::from(vec![ - true; - batch.num_rows() - ])) - }); - let filter = RowFilter::new(vec![Box::new(always_true)]); - - // Construct a ReaderFactory and compute cache projection - let reader_factory = ReaderFactory { - metadata: Arc::clone(&metadata), - fields: None, - input: TestReader::new(data), - filter: Some(filter), - limit: None, - offset: None, - metrics: ArrowReaderMetrics::disabled(), - max_predicate_cache_size: 0, - }; - - // Provide an output projection that also selects the same nested leaf - let cache_projection = reader_factory.compute_cache_projection(&nested_leaf_mask); - - // Expect None since nested columns should be excluded from cache projection - assert!(cache_projection.is_none()); - } - #[tokio::test] #[allow(deprecated)] async fn empty_offset_index_doesnt_panic_in_read_row_group() { diff --git a/parquet/tests/arrow_reader/predicate_cache.rs b/parquet/tests/arrow_reader/predicate_cache.rs index ebcec2a44a5d..b2ad36b42113 100644 --- a/parquet/tests/arrow_reader/predicate_cache.rs +++ b/parquet/tests/arrow_reader/predicate_cache.rs @@ -23,7 +23,8 @@ use arrow::compute::and; use arrow::compute::kernels::cmp::{gt, lt}; use arrow_array::cast::AsArray; use arrow_array::types::Int64Type; -use arrow_array::{RecordBatch, StringViewArray}; +use arrow_array::{RecordBatch, StringArray, StringViewArray, StructArray}; +use arrow_schema::{DataType, Field}; use bytes::Bytes; use futures::future::BoxFuture; use futures::{FutureExt, StreamExt}; @@ -80,6 +81,19 @@ async fn test_cache_disabled_with_filters() { test.run_async(async_builder).await; } +#[tokio::test] +async fn test_cache_projection_excludes_nested_columns() { + let test = ParquetPredicateCacheTest::new_nested().with_expected_records_read_from_cache(0); + + let sync_builder = test.sync_builder(); + let sync_builder = test.add_nested_filter(sync_builder); + test.run_sync(sync_builder); + + let async_builder = test.async_builder().await; + let async_builder = test.add_nested_filter(async_builder); + test.run_async(async_builder).await; +} + // -- Begin test infrastructure -- /// A test parquet file @@ -104,6 +118,18 @@ impl ParquetPredicateCacheTest { } } + /// Create a new `TestParquetFile` with + /// 2 columns: + /// + /// * string column `a` + /// * nested struct column `b { aa, bb }` + fn new_nested() -> Self { + Self { + bytes: NESTED_TEST_FILE_DATA.clone(), + expected_records_read_from_cache: 0, + } + } + /// Set the expected number of records read from the cache fn with_expected_records_read_from_cache( mut self, @@ -154,6 +180,27 @@ impl ParquetPredicateCacheTest { .with_row_filter(RowFilter::new(vec![Box::new(row_filter)])) } + /// Add a filter on the nested leaf nodes + fn add_nested_filter(&self, builder: ArrowReaderBuilder) -> ArrowReaderBuilder { + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + + // Build a RowFilter whose predicate projects a leaf under the nested root `b` + // Leaf indices are depth-first; with schema [a, b.aa, b.bb] we pick index 1 (b.aa) + let nested_leaf_mask = ProjectionMask::leaves(&schema_descr, vec![1]); + + let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(), |batch: RecordBatch| { + Ok(arrow_array::BooleanArray::from(vec![ + true; + batch.num_rows() + ])) + }); + let row_filter = RowFilter::new(vec![Box::new(always_true)]); + + builder + .with_projection(nested_leaf_mask) + .with_row_filter(row_filter) + } + /// Build the reader from the specified builder, reading all batches from it, /// and asserts the fn run_sync(&self, builder: ParquetRecordBatchReaderBuilder) { @@ -239,6 +286,42 @@ static TEST_FILE_DATA: LazyLock = LazyLock::new(|| { Bytes::from(output) }); +/// Build a ParquetFile with a +/// +/// * string column `a` +/// * nested struct column `b { aa, bb }` +static NESTED_TEST_FILE_DATA: LazyLock = LazyLock::new(|| { + const NUM_ROWS: usize = 100; + let a: StringArray = (0..NUM_ROWS).map(|i| Some(format!("r{i}"))).collect(); + + let aa: StringArray = (0..NUM_ROWS).map(|i| Some(format!("v{i}"))).collect(); + let bb: StringArray = (0..NUM_ROWS).map(|i| Some(format!("w{i}"))).collect(); + let b = StructArray::from(vec![ + ( + Arc::new(Field::new("aa", DataType::Utf8, true)), + Arc::new(aa) as ArrayRef, + ), + ( + Arc::new(Field::new("bb", DataType::Utf8, true)), + Arc::new(bb) as ArrayRef, + ), + ]); + + let input_batch = RecordBatch::try_from_iter([ + ("a", Arc::new(a) as ArrayRef), + ("b", Arc::new(b) as ArrayRef), + ]) + .unwrap(); + + let mut output = Vec::new(); + let writer_options = None; + let mut writer = + ArrowWriter::try_new(&mut output, input_batch.schema(), writer_options).unwrap(); + writer.write(&input_batch).unwrap(); + writer.close().unwrap(); + Bytes::from(output) +}); + /// Copy paste version of the `AsyncFileReader` trait for testing purposes 🤮 /// TODO put this in a common place #[derive(Clone)]