Skip to content

Commit 6e618b3

Browse files
authored
Merge pull request #7 from alamb/alamb/test_memory_limit
Add option to control predicate cache, documentation, `ArrowReaderMetrics` and tests
2 parents c240a52 + 42d5520 commit 6e618b3

File tree

10 files changed

+660
-71
lines changed

10 files changed

+660
-71
lines changed

parquet/examples/read_with_rowgroup.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use arrow::util::pretty::print_batches;
1919
use bytes::{Buf, Bytes};
20+
use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
2021
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowGroups, RowSelection};
2122
use parquet::arrow::async_reader::AsyncFileReader;
2223
use parquet::arrow::{parquet_to_arrow_field_levels, ProjectionMask};
@@ -38,7 +39,11 @@ async fn main() -> Result<()> {
3839
let metadata = file.get_metadata(None).await?;
3940

4041
for rg in metadata.row_groups() {
41-
let mut rowgroup = InMemoryRowGroup::create(rg.clone(), ProjectionMask::all());
42+
let mut rowgroup = InMemoryRowGroup::create(
43+
rg.clone(),
44+
ProjectionMask::all(),
45+
ArrowReaderMetrics::disabled(),
46+
);
4247
rowgroup.async_fetch_data(&mut file, None).await?;
4348
let reader = rowgroup.build_reader(1024, None)?;
4449

@@ -103,6 +108,7 @@ pub struct InMemoryRowGroup {
103108
pub metadata: RowGroupMetaData,
104109
mask: ProjectionMask,
105110
column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
111+
metrics: ArrowReaderMetrics,
106112
}
107113

108114
impl RowGroups for InMemoryRowGroup {
@@ -132,13 +138,18 @@ impl RowGroups for InMemoryRowGroup {
132138
}
133139

134140
impl InMemoryRowGroup {
135-
pub fn create(metadata: RowGroupMetaData, mask: ProjectionMask) -> Self {
141+
pub fn create(
142+
metadata: RowGroupMetaData,
143+
mask: ProjectionMask,
144+
metrics: ArrowReaderMetrics,
145+
) -> Self {
136146
let column_chunks = metadata.columns().iter().map(|_| None).collect::<Vec<_>>();
137147

138148
Self {
139149
metadata,
140150
mask,
141151
column_chunks,
152+
metrics,
142153
}
143154
}
144155

@@ -153,7 +164,13 @@ impl InMemoryRowGroup {
153164
None,
154165
)?;
155166

156-
ParquetRecordBatchReader::try_new_with_row_groups(&levels, self, batch_size, selection)
167+
ParquetRecordBatchReader::try_new_with_row_groups(
168+
&levels,
169+
self,
170+
batch_size,
171+
selection,
172+
&self.metrics,
173+
)
157174
}
158175

159176
/// fetch data from a reader in sync mode

parquet/src/arrow/array_reader/builder.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,14 @@ use crate::arrow::array_reader::{
3030
FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
3131
PrimitiveArrayReader, RowGroups, StructArrayReader,
3232
};
33+
use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
3334
use crate::arrow::schema::{ParquetField, ParquetFieldType};
3435
use crate::arrow::ProjectionMask;
3536
use crate::basic::Type as PhysicalType;
3637
use crate::data_type::{BoolType, DoubleType, FloatType, Int32Type, Int64Type, Int96Type};
3738
use crate::errors::{ParquetError, Result};
3839
use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
3940

40-
/// Builds [`ArrayReader`]s from parquet schema, projection mask, and RowGroups reader
41-
pub struct ArrayReaderBuilder<'a> {
42-
/// Source of row group data
43-
row_groups: &'a dyn RowGroups,
44-
/// Optional cache options for the array reader
45-
cache_options: Option<&'a CacheOptions<'a>>,
46-
}
47-
4841
/// Builder for [`CacheOptions`]
4942
#[derive(Debug, Clone)]
5043
pub struct CacheOptionsBuilder<'a> {
@@ -90,11 +83,22 @@ pub struct CacheOptions<'a> {
9083
pub role: CacheRole,
9184
}
9285

86+
/// Builds [`ArrayReader`]s from parquet schema, projection mask, and RowGroups reader
87+
pub struct ArrayReaderBuilder<'a> {
88+
/// Source of row group data
89+
row_groups: &'a dyn RowGroups,
90+
/// Optional cache options for the array reader
91+
cache_options: Option<&'a CacheOptions<'a>>,
92+
/// metrics
93+
metrics: &'a ArrowReaderMetrics,
94+
}
95+
9396
impl<'a> ArrayReaderBuilder<'a> {
94-
pub fn new(row_groups: &'a dyn RowGroups) -> Self {
97+
pub fn new(row_groups: &'a dyn RowGroups, metrics: &'a ArrowReaderMetrics) -> Self {
9598
Self {
9699
row_groups,
97100
cache_options: None,
101+
metrics,
98102
}
99103
}
100104

@@ -143,6 +147,7 @@ impl<'a> ArrayReaderBuilder<'a> {
143147
Arc::clone(&cache_options.cache),
144148
col_idx,
145149
cache_options.role,
150+
self.metrics.clone(), // cheap clone
146151
))))
147152
} else {
148153
Ok(Some(reader))
@@ -453,7 +458,8 @@ mod tests {
453458
)
454459
.unwrap();
455460

456-
let array_reader = ArrayReaderBuilder::new(&file_reader)
461+
let metrics = ArrowReaderMetrics::disabled();
462+
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
457463
.build_array_reader(fields.as_ref(), &mask)
458464
.unwrap();
459465

0 commit comments

Comments
 (0)