Skip to content

Commit 185af9f

Browse files
committed
Clean up predicate_cache tests
1 parent 1512116 commit 185af9f

File tree

1 file changed

+63
-59
lines changed

1 file changed

+63
-59
lines changed

parquet/tests/arrow_reader/predicate_cache.rs

Lines changed: 63 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,7 @@ async fn test_default_read() {
5252
#[tokio::test]
5353
async fn test_async_cache_with_filters() {
5454
let test = ParquetPredicateCacheTest::new().with_expected_records_read_from_cache(49);
55-
let async_builder = test.async_builder().await;
56-
let async_builder = test.add_project_ab_and_filter_b(async_builder);
55+
let async_builder = test.async_builder().await.add_project_ab_and_filter_b();
5756
test.run_async(async_builder).await;
5857
}
5958

@@ -63,34 +62,36 @@ async fn test_sync_cache_with_filters() {
6362
// The sync reader does not use the cache. See https://github.com/apache/arrow-rs/issues/8000
6463
.with_expected_records_read_from_cache(0);
6564

66-
let sync_builder = test.sync_builder();
67-
let sync_builder = test.add_project_ab_and_filter_b(sync_builder);
65+
let sync_builder = test.sync_builder().add_project_ab_and_filter_b();
6866
test.run_sync(sync_builder);
6967
}
7068

7169
#[tokio::test]
7270
async fn test_cache_disabled_with_filters() {
7371
// expect no records to be read from cache, because the cache is disabled
7472
let test = ParquetPredicateCacheTest::new().with_expected_records_read_from_cache(0);
75-
let sync_builder = test.sync_builder().with_max_predicate_cache_size(0);
76-
let sync_builder = test.add_project_ab_and_filter_b(sync_builder);
73+
let sync_builder = test
74+
.sync_builder()
75+
.with_max_predicate_cache_size(0)
76+
.add_project_ab_and_filter_b();
7777
test.run_sync(sync_builder);
7878

79-
let async_builder = test.async_builder().await.with_max_predicate_cache_size(0);
80-
let async_builder = test.add_project_ab_and_filter_b(async_builder);
79+
let async_builder = test
80+
.async_builder()
81+
.await
82+
.with_max_predicate_cache_size(0)
83+
.add_project_ab_and_filter_b();
8184
test.run_async(async_builder).await;
8285
}
8386

8487
#[tokio::test]
8588
async fn test_cache_projection_excludes_nested_columns() {
8689
let test = ParquetPredicateCacheTest::new_nested().with_expected_records_read_from_cache(0);
8790

88-
let sync_builder = test.sync_builder();
89-
let sync_builder = test.add_nested_filter(sync_builder);
91+
let sync_builder = test.sync_builder().add_nested_filter();
9092
test.run_sync(sync_builder);
9193

92-
let async_builder = test.async_builder().await;
93-
let async_builder = test.add_nested_filter(async_builder);
94+
let async_builder = test.async_builder().await.add_nested_filter();
9495
test.run_async(async_builder).await;
9596
}
9697

@@ -154,53 +155,6 @@ impl ParquetPredicateCacheTest {
154155
.unwrap()
155156
}
156157

157-
/// Return a [`ParquetRecordBatchReaderBuilder`] for reading the file with
158-
///
159-
/// 1. a projection selecting the "a" and "b" column
160-
/// 2. a row_filter applied to "b": 575 < "b" < 625 (select 1 data page from each row group)
161-
fn add_project_ab_and_filter_b<T>(
162-
&self,
163-
builder: ArrowReaderBuilder<T>,
164-
) -> ArrowReaderBuilder<T> {
165-
let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
166-
167-
// "b" > 575 and "b" < 625
168-
let row_filter = ArrowPredicateFn::new(
169-
ProjectionMask::columns(&schema_descr, ["b"]),
170-
|batch: RecordBatch| {
171-
let scalar_575 = Int64Array::new_scalar(575);
172-
let scalar_625 = Int64Array::new_scalar(625);
173-
let column = batch.column(0).as_primitive::<Int64Type>();
174-
and(&gt(column, &scalar_575)?, &lt(column, &scalar_625)?)
175-
},
176-
);
177-
178-
builder
179-
.with_projection(ProjectionMask::columns(&schema_descr, ["a", "b"]))
180-
.with_row_filter(RowFilter::new(vec![Box::new(row_filter)]))
181-
}
182-
183-
/// Add a filter on the nested leaf nodes
184-
fn add_nested_filter<T>(&self, builder: ArrowReaderBuilder<T>) -> ArrowReaderBuilder<T> {
185-
let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
186-
187-
// Build a RowFilter whose predicate projects a leaf under the nested root `b`
188-
// Leaf indices are depth-first; with schema [a, b.aa, b.bb] we pick index 1 (b.aa)
189-
let nested_leaf_mask = ProjectionMask::leaves(&schema_descr, vec![1]);
190-
191-
let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(), |batch: RecordBatch| {
192-
Ok(arrow_array::BooleanArray::from(vec![
193-
true;
194-
batch.num_rows()
195-
]))
196-
});
197-
let row_filter = RowFilter::new(vec![Box::new(always_true)]);
198-
199-
builder
200-
.with_projection(nested_leaf_mask)
201-
.with_row_filter(row_filter)
202-
}
203-
204158
/// Build the reader from the specified builder, reading all batches from it,
205159
/// and asserts the
206160
fn run_sync(&self, builder: ParquetRecordBatchReaderBuilder<Bytes>) {
@@ -322,6 +276,56 @@ static NESTED_TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
322276
Bytes::from(output)
323277
});
324278

279+
trait ArrowReaderBuilderExt {
280+
/// Applies the following:
281+
/// 1. a projection selecting the "a" and "b" column
282+
/// 2. a row_filter applied to "b": 575 < "b" < 625 (select 1 data page from each row group)
283+
fn add_project_ab_and_filter_b(self) -> Self;
284+
285+
/// Adds a row filter that projects the nested leaf column "b.aa" and
286+
/// returns true for all rows.
287+
fn add_nested_filter(self) -> Self;
288+
}
289+
290+
impl<T> ArrowReaderBuilderExt for ArrowReaderBuilder<T> {
291+
fn add_project_ab_and_filter_b(self) -> Self {
292+
let schema_descr = self.metadata().file_metadata().schema_descr_ptr();
293+
294+
// "b" > 575 and "b" < 625
295+
let row_filter = ArrowPredicateFn::new(
296+
ProjectionMask::columns(&schema_descr, ["b"]),
297+
|batch: RecordBatch| {
298+
let scalar_575 = Int64Array::new_scalar(575);
299+
let scalar_625 = Int64Array::new_scalar(625);
300+
let column = batch.column(0).as_primitive::<Int64Type>();
301+
and(&gt(column, &scalar_575)?, &lt(column, &scalar_625)?)
302+
},
303+
);
304+
305+
self.with_projection(ProjectionMask::columns(&schema_descr, ["a", "b"]))
306+
.with_row_filter(RowFilter::new(vec![Box::new(row_filter)]))
307+
}
308+
309+
fn add_nested_filter(self) -> Self {
310+
let schema_descr = self.metadata().file_metadata().schema_descr_ptr();
311+
312+
// Build a RowFilter whose predicate projects a leaf under the nested root `b`
313+
// Leaf indices are depth-first; with schema [a, b.aa, b.bb] we pick index 1 (b.aa)
314+
let nested_leaf_mask = ProjectionMask::leaves(&schema_descr, vec![1]);
315+
316+
let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(), |batch: RecordBatch| {
317+
Ok(arrow_array::BooleanArray::from(vec![
318+
true;
319+
batch.num_rows()
320+
]))
321+
});
322+
let row_filter = RowFilter::new(vec![Box::new(always_true)]);
323+
324+
self.with_projection(nested_leaf_mask)
325+
.with_row_filter(row_filter)
326+
}
327+
}
328+
325329
/// Copy paste version of the `AsyncFileReader` trait for testing purposes 🤮
326330
/// TODO put this in a common place
327331
#[derive(Clone)]

0 commit comments

Comments
 (0)