Skip to content

Commit 06f7f07

Browse files
authored
refactor test_cache_projection_excludes_nested_columns to use high level APIs (#8754)
# Which issue does this PR close? - Related to #8677 - part of #8159 # Rationale for this change I am reworking how the parquet decoder's state machine works in #8159 One of the unit tests, `test_cache_projection_excludes_nested_columns` uses non-public APIs that I am changing Rather than rewrite them into other non public APIs I think it would be better if this test is in terms of public APIs # What changes are included in this PR? 1. refactor `test_cache_projection_excludes_nested_columns` to use high level APIs # Are these changes tested? They are run in CI I also verified this test covers the intended functionality by commenting it out: ```diff --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -724,7 +724,9 @@ where cache_projection.union(predicate.projection()); } cache_projection.intersect(projection); - self.exclude_nested_columns_from_cache(&cache_projection) + // TEMP don't exclude nested columns + //self.exclude_nested_columns_from_cache(&cache_projection) + Some(cache_projection) } /// Exclude leaves belonging to roots that span multiple parquet leaves (i.e. nested columns) ``` And then running the test: ```shell cargo test --all-features --test arrow_reader ``` And the test fails (as expected) ``` ---- predicate_cache::test_cache_projection_excludes_nested_columns stdout ---- thread 'predicate_cache::test_cache_projection_excludes_nested_columns' panicked at parquet/tests/arrow_reader/predicate_cache.rs:244:9: assertion `left == right` failed: Expected 0 records read from cache, but got 100 left: 100 right: 0 note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace failures: predicate_cache::test_cache_projection_excludes_nested_columns test result: FAILED. 88 passed; 1 failed; 1 ignored; 0 measured; 0 filtered out; finished in 0.20s ``` # Are there any user-facing changes? No, this is only test changes
1 parent 7e54bb2 commit 06f7f07

File tree

2 files changed

+84
-72
lines changed

2 files changed

+84
-72
lines changed

parquet/src/arrow/async_reader/mod.rs

Lines changed: 0 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -2181,77 +2181,6 @@ mod tests {
21812181
assert_eq!(requests.lock().unwrap().len(), 3);
21822182
}
21832183

2184-
#[tokio::test]
2185-
async fn test_cache_projection_excludes_nested_columns() {
2186-
use arrow_array::{ArrayRef, StringArray};
2187-
2188-
// Build a simple RecordBatch with a primitive column `a` and a nested struct column `b { aa, bb }`
2189-
let a = StringArray::from_iter_values(["r1", "r2"]);
2190-
let b = StructArray::from(vec![
2191-
(
2192-
Arc::new(Field::new("aa", DataType::Utf8, true)),
2193-
Arc::new(StringArray::from_iter_values(["v1", "v2"])) as ArrayRef,
2194-
),
2195-
(
2196-
Arc::new(Field::new("bb", DataType::Utf8, true)),
2197-
Arc::new(StringArray::from_iter_values(["w1", "w2"])) as ArrayRef,
2198-
),
2199-
]);
2200-
2201-
let schema = Arc::new(Schema::new(vec![
2202-
Field::new("a", DataType::Utf8, true),
2203-
Field::new("b", b.data_type().clone(), true),
2204-
]));
2205-
2206-
let mut buf = Vec::new();
2207-
let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap();
2208-
let batch = RecordBatch::try_from_iter([
2209-
("a", Arc::new(a) as ArrayRef),
2210-
("b", Arc::new(b) as ArrayRef),
2211-
])
2212-
.unwrap();
2213-
writer.write(&batch).unwrap();
2214-
writer.close().unwrap();
2215-
2216-
// Load Parquet metadata
2217-
let data: Bytes = buf.into();
2218-
let metadata = ParquetMetaDataReader::new()
2219-
.parse_and_finish(&data)
2220-
.unwrap();
2221-
let metadata = Arc::new(metadata);
2222-
2223-
// Build a RowFilter whose predicate projects a leaf under the nested root `b`
2224-
// Leaf indices are depth-first; with schema [a, b.aa, b.bb] we pick index 1 (b.aa)
2225-
let parquet_schema = metadata.file_metadata().schema_descr();
2226-
let nested_leaf_mask = ProjectionMask::leaves(parquet_schema, vec![1]);
2227-
2228-
let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(), |batch: RecordBatch| {
2229-
Ok(arrow_array::BooleanArray::from(vec![
2230-
true;
2231-
batch.num_rows()
2232-
]))
2233-
});
2234-
let filter = RowFilter::new(vec![Box::new(always_true)]);
2235-
2236-
// Construct a ReaderFactory and compute cache projection
2237-
let reader_factory = ReaderFactory {
2238-
metadata: Arc::clone(&metadata),
2239-
fields: None,
2240-
input: TestReader::new(data),
2241-
filter: Some(filter),
2242-
limit: None,
2243-
offset: None,
2244-
metrics: ArrowReaderMetrics::disabled(),
2245-
max_predicate_cache_size: 0,
2246-
};
2247-
2248-
// Provide an output projection that also selects the same nested leaf
2249-
let cache_projection = reader_factory.compute_cache_projection(&nested_leaf_mask);
2250-
2251-
// Expect None since nested columns should be excluded from cache projection
2252-
assert!(cache_projection.is_none());
2253-
}
2254-
22552184
#[tokio::test]
22562185
#[allow(deprecated)]
22572186
async fn empty_offset_index_doesnt_panic_in_read_row_group() {

parquet/tests/arrow_reader/predicate_cache.rs

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ use arrow::compute::and;
2323
use arrow::compute::kernels::cmp::{gt, lt};
2424
use arrow_array::cast::AsArray;
2525
use arrow_array::types::Int64Type;
26-
use arrow_array::{RecordBatch, StringViewArray};
26+
use arrow_array::{RecordBatch, StringArray, StringViewArray, StructArray};
27+
use arrow_schema::{DataType, Field};
2728
use bytes::Bytes;
2829
use futures::future::BoxFuture;
2930
use futures::{FutureExt, StreamExt};
@@ -80,6 +81,19 @@ async fn test_cache_disabled_with_filters() {
8081
test.run_async(async_builder).await;
8182
}
8283

84+
#[tokio::test]
85+
async fn test_cache_projection_excludes_nested_columns() {
86+
let test = ParquetPredicateCacheTest::new_nested().with_expected_records_read_from_cache(0);
87+
88+
let sync_builder = test.sync_builder();
89+
let sync_builder = test.add_nested_filter(sync_builder);
90+
test.run_sync(sync_builder);
91+
92+
let async_builder = test.async_builder().await;
93+
let async_builder = test.add_nested_filter(async_builder);
94+
test.run_async(async_builder).await;
95+
}
96+
8397
// -- Begin test infrastructure --
8498

8599
/// A test parquet file
@@ -104,6 +118,18 @@ impl ParquetPredicateCacheTest {
104118
}
105119
}
106120

121+
/// Create a new `TestParquetFile` with
122+
/// 2 columns:
123+
///
124+
/// * string column `a`
125+
/// * nested struct column `b { aa, bb }`
126+
fn new_nested() -> Self {
127+
Self {
128+
bytes: NESTED_TEST_FILE_DATA.clone(),
129+
expected_records_read_from_cache: 0,
130+
}
131+
}
132+
107133
/// Set the expected number of records read from the cache
108134
fn with_expected_records_read_from_cache(
109135
mut self,
@@ -154,6 +180,27 @@ impl ParquetPredicateCacheTest {
154180
.with_row_filter(RowFilter::new(vec![Box::new(row_filter)]))
155181
}
156182

183+
/// Add a filter on the nested leaf nodes
184+
fn add_nested_filter<T>(&self, builder: ArrowReaderBuilder<T>) -> ArrowReaderBuilder<T> {
185+
let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
186+
187+
// Build a RowFilter whose predicate projects a leaf under the nested root `b`
188+
// Leaf indices are depth-first; with schema [a, b.aa, b.bb] we pick index 1 (b.aa)
189+
let nested_leaf_mask = ProjectionMask::leaves(&schema_descr, vec![1]);
190+
191+
let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(), |batch: RecordBatch| {
192+
Ok(arrow_array::BooleanArray::from(vec![
193+
true;
194+
batch.num_rows()
195+
]))
196+
});
197+
let row_filter = RowFilter::new(vec![Box::new(always_true)]);
198+
199+
builder
200+
.with_projection(nested_leaf_mask)
201+
.with_row_filter(row_filter)
202+
}
203+
157204
/// Build the reader from the specified builder, reading all batches from it,
158205
/// and asserts the
159206
fn run_sync(&self, builder: ParquetRecordBatchReaderBuilder<Bytes>) {
@@ -239,6 +286,42 @@ static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
239286
Bytes::from(output)
240287
});
241288

289+
/// Build a ParquetFile with a
290+
///
291+
/// * string column `a`
292+
/// * nested struct column `b { aa, bb }`
293+
static NESTED_TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
294+
const NUM_ROWS: usize = 100;
295+
let a: StringArray = (0..NUM_ROWS).map(|i| Some(format!("r{i}"))).collect();
296+
297+
let aa: StringArray = (0..NUM_ROWS).map(|i| Some(format!("v{i}"))).collect();
298+
let bb: StringArray = (0..NUM_ROWS).map(|i| Some(format!("w{i}"))).collect();
299+
let b = StructArray::from(vec![
300+
(
301+
Arc::new(Field::new("aa", DataType::Utf8, true)),
302+
Arc::new(aa) as ArrayRef,
303+
),
304+
(
305+
Arc::new(Field::new("bb", DataType::Utf8, true)),
306+
Arc::new(bb) as ArrayRef,
307+
),
308+
]);
309+
310+
let input_batch = RecordBatch::try_from_iter([
311+
("a", Arc::new(a) as ArrayRef),
312+
("b", Arc::new(b) as ArrayRef),
313+
])
314+
.unwrap();
315+
316+
let mut output = Vec::new();
317+
let writer_options = None;
318+
let mut writer =
319+
ArrowWriter::try_new(&mut output, input_batch.schema(), writer_options).unwrap();
320+
writer.write(&input_batch).unwrap();
321+
writer.close().unwrap();
322+
Bytes::from(output)
323+
});
324+
242325
/// Copy paste version of the `AsyncFileReader` trait for testing purposes 🤮
243326
/// TODO put this in a common place
244327
#[derive(Clone)]

0 commit comments

Comments
 (0)