Skip to content

Commit e10d3e2

Browse files
authored
Rewrite bloom filters to use contains API (#8442)
1 parent ec8fd44 commit e10d3e2

File tree

2 files changed

+91
-155
lines changed

2 files changed

+91
-155
lines changed

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,7 @@ impl FileOpener for ParquetOpener {
522522
if enable_bloom_filter && !row_groups.is_empty() {
523523
if let Some(predicate) = predicate {
524524
row_groups = row_groups::prune_row_groups_by_bloom_filters(
525+
&file_schema,
525526
&mut builder,
526527
&row_groups,
527528
file_metadata.row_groups(),

datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs

Lines changed: 90 additions & 155 deletions
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,21 @@
1818
use arrow::{array::ArrayRef, datatypes::Schema};
1919
use arrow_array::BooleanArray;
2020
use arrow_schema::FieldRef;
21-
use datafusion_common::tree_node::{TreeNode, VisitRecursion};
22-
use datafusion_common::{Column, DataFusionError, Result, ScalarValue};
21+
use datafusion_common::{Column, ScalarValue};
2322
use parquet::file::metadata::ColumnChunkMetaData;
2423
use parquet::schema::types::SchemaDescriptor;
2524
use parquet::{
2625
arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder},
2726
bloom_filter::Sbbf,
2827
file::metadata::RowGroupMetaData,
2928
};
30-
use std::{
31-
collections::{HashMap, HashSet},
32-
sync::Arc,
33-
};
29+
use std::collections::{HashMap, HashSet};
3430

3531
use crate::datasource::listing::FileRange;
3632
use crate::datasource::physical_plan::parquet::statistics::{
3733
max_statistics, min_statistics, parquet_column,
3834
};
39-
use crate::logical_expr::Operator;
40-
use crate::physical_expr::expressions as phys_expr;
4135
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
42-
use crate::physical_plan::PhysicalExpr;
4336

4437
use super::ParquetFileMetrics;
4538

@@ -118,188 +111,129 @@ pub(crate) fn prune_row_groups_by_statistics(
118111
pub(crate) async fn prune_row_groups_by_bloom_filters<
119112
T: AsyncFileReader + Send + 'static,
120113
>(
114+
arrow_schema: &Schema,
121115
builder: &mut ParquetRecordBatchStreamBuilder<T>,
122116
row_groups: &[usize],
123117
groups: &[RowGroupMetaData],
124118
predicate: &PruningPredicate,
125119
metrics: &ParquetFileMetrics,
126120
) -> Vec<usize> {
127-
let bf_predicates = match BloomFilterPruningPredicate::try_new(predicate.orig_expr())
128-
{
129-
Ok(predicates) => predicates,
130-
Err(_) => {
131-
return row_groups.to_vec();
132-
}
133-
};
134121
let mut filtered = Vec::with_capacity(groups.len());
135122
for idx in row_groups {
136-
let rg_metadata = &groups[*idx];
137-
// get all columns bloom filter
138-
let mut column_sbbf =
139-
HashMap::with_capacity(bf_predicates.required_columns.len());
140-
for column_name in bf_predicates.required_columns.iter() {
141-
let column_idx = match rg_metadata
142-
.columns()
143-
.iter()
144-
.enumerate()
145-
.find(|(_, column)| column.column_path().string().eq(column_name))
146-
{
147-
Some((column_idx, _)) => column_idx,
148-
None => continue,
123+
// get all columns in the predicate that we could use a bloom filter with
124+
let literal_columns = predicate.literal_columns();
125+
let mut column_sbbf = HashMap::with_capacity(literal_columns.len());
126+
127+
for column_name in literal_columns {
128+
let Some((column_idx, _field)) =
129+
parquet_column(builder.parquet_schema(), arrow_schema, &column_name)
130+
else {
131+
continue;
149132
};
133+
150134
let bf = match builder
151135
.get_row_group_column_bloom_filter(*idx, column_idx)
152136
.await
153137
{
154-
Ok(bf) => match bf {
155-
Some(bf) => bf,
156-
None => {
157-
continue;
158-
}
159-
},
138+
Ok(Some(bf)) => bf,
139+
Ok(None) => continue, // no bloom filter for this column
160140
Err(e) => {
161-
log::error!("Error evaluating row group predicate values when using BloomFilterPruningPredicate {e}");
141+
log::debug!("Ignoring error reading bloom filter: {e}");
162142
metrics.predicate_evaluation_errors.add(1);
163143
continue;
164144
}
165145
};
166-
column_sbbf.insert(column_name.to_owned(), bf);
146+
column_sbbf.insert(column_name.to_string(), bf);
167147
}
168-
if bf_predicates.prune(&column_sbbf) {
148+
149+
let stats = BloomFilterStatistics { column_sbbf };
150+
151+
// Can this group be pruned?
152+
let prune_group = match predicate.prune(&stats) {
153+
Ok(values) => !values[0],
154+
Err(e) => {
155+
log::debug!("Error evaluating row group predicate on bloom filter: {e}");
156+
metrics.predicate_evaluation_errors.add(1);
157+
false
158+
}
159+
};
160+
161+
if prune_group {
169162
metrics.row_groups_pruned.add(1);
170-
continue;
163+
} else {
164+
filtered.push(*idx);
171165
}
172-
filtered.push(*idx);
173166
}
174167
filtered
175168
}
176169

177-
struct BloomFilterPruningPredicate {
178-
/// Actual pruning predicate
179-
predicate_expr: Option<phys_expr::BinaryExpr>,
180-
/// The statistics required to evaluate this predicate
181-
required_columns: Vec<String>,
170+
/// Implements `PruningStatistics` for Parquet Split Block Bloom Filters (SBBF)
171+
struct BloomFilterStatistics {
172+
/// Maps column name to the parquet bloom filter
173+
column_sbbf: HashMap<String, Sbbf>,
182174
}
183175

184-
impl BloomFilterPruningPredicate {
185-
fn try_new(expr: &Arc<dyn PhysicalExpr>) -> Result<Self> {
186-
let binary_expr = expr.as_any().downcast_ref::<phys_expr::BinaryExpr>();
187-
match binary_expr {
188-
Some(binary_expr) => {
189-
let columns = Self::get_predicate_columns(expr);
190-
Ok(Self {
191-
predicate_expr: Some(binary_expr.clone()),
192-
required_columns: columns.into_iter().collect(),
193-
})
194-
}
195-
None => Err(DataFusionError::Execution(
196-
"BloomFilterPruningPredicate only support binary expr".to_string(),
197-
)),
198-
}
176+
impl PruningStatistics for BloomFilterStatistics {
177+
fn min_values(&self, _column: &Column) -> Option<ArrayRef> {
178+
None
199179
}
200180

201-
fn prune(&self, column_sbbf: &HashMap<String, Sbbf>) -> bool {
202-
Self::prune_expr_with_bloom_filter(self.predicate_expr.as_ref(), column_sbbf)
181+
fn max_values(&self, _column: &Column) -> Option<ArrayRef> {
182+
None
203183
}
204184

205-
/// Return true if the `expr` can be proved not `true`
206-
/// based on the bloom filter.
207-
///
208-
/// We only checked `BinaryExpr` but it also support `InList`,
209-
/// Because of the `optimizer` will convert `InList` to `BinaryExpr`.
210-
fn prune_expr_with_bloom_filter(
211-
expr: Option<&phys_expr::BinaryExpr>,
212-
column_sbbf: &HashMap<String, Sbbf>,
213-
) -> bool {
214-
let Some(expr) = expr else {
215-
// unsupported predicate
216-
return false;
217-
};
218-
match expr.op() {
219-
Operator::And | Operator::Or => {
220-
let left = Self::prune_expr_with_bloom_filter(
221-
expr.left().as_any().downcast_ref::<phys_expr::BinaryExpr>(),
222-
column_sbbf,
223-
);
224-
let right = Self::prune_expr_with_bloom_filter(
225-
expr.right()
226-
.as_any()
227-
.downcast_ref::<phys_expr::BinaryExpr>(),
228-
column_sbbf,
229-
);
230-
match expr.op() {
231-
Operator::And => left || right,
232-
Operator::Or => left && right,
233-
_ => false,
234-
}
235-
}
236-
Operator::Eq => {
237-
if let Some((col, val)) = Self::check_expr_is_col_equal_const(expr) {
238-
if let Some(sbbf) = column_sbbf.get(col.name()) {
239-
match val {
240-
ScalarValue::Utf8(Some(v)) => !sbbf.check(&v.as_str()),
241-
ScalarValue::Boolean(Some(v)) => !sbbf.check(&v),
242-
ScalarValue::Float64(Some(v)) => !sbbf.check(&v),
243-
ScalarValue::Float32(Some(v)) => !sbbf.check(&v),
244-
ScalarValue::Int64(Some(v)) => !sbbf.check(&v),
245-
ScalarValue::Int32(Some(v)) => !sbbf.check(&v),
246-
ScalarValue::Int16(Some(v)) => !sbbf.check(&v),
247-
ScalarValue::Int8(Some(v)) => !sbbf.check(&v),
248-
_ => false,
249-
}
250-
} else {
251-
false
252-
}
253-
} else {
254-
false
255-
}
256-
}
257-
_ => false,
258-
}
185+
fn num_containers(&self) -> usize {
186+
1
259187
}
260188

261-
fn get_predicate_columns(expr: &Arc<dyn PhysicalExpr>) -> HashSet<String> {
262-
let mut columns = HashSet::new();
263-
expr.apply(&mut |expr| {
264-
if let Some(binary_expr) =
265-
expr.as_any().downcast_ref::<phys_expr::BinaryExpr>()
266-
{
267-
if let Some((column, _)) =
268-
Self::check_expr_is_col_equal_const(binary_expr)
269-
{
270-
columns.insert(column.name().to_string());
271-
}
272-
}
273-
Ok(VisitRecursion::Continue)
274-
})
275-
// no way to fail as only Ok(VisitRecursion::Continue) is returned
276-
.unwrap();
277-
278-
columns
189+
fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
190+
None
279191
}
280192

281-
fn check_expr_is_col_equal_const(
282-
exr: &phys_expr::BinaryExpr,
283-
) -> Option<(phys_expr::Column, ScalarValue)> {
284-
if Operator::Eq.ne(exr.op()) {
285-
return None;
286-
}
193+
/// Use bloom filters to determine if we are sure this column can not
194+
/// possibly contain `values`
195+
///
196+
/// The `contained` API returns false if the bloom filters knows that *ALL*
197+
/// of the values in a column are not present.
198+
fn contained(
199+
&self,
200+
column: &Column,
201+
values: &HashSet<ScalarValue>,
202+
) -> Option<BooleanArray> {
203+
let sbbf = self.column_sbbf.get(column.name.as_str())?;
287204

288-
let left_any = exr.left().as_any();
289-
let right_any = exr.right().as_any();
290-
if let (Some(col), Some(liter)) = (
291-
left_any.downcast_ref::<phys_expr::Column>(),
292-
right_any.downcast_ref::<phys_expr::Literal>(),
293-
) {
294-
return Some((col.clone(), liter.value().clone()));
295-
}
296-
if let (Some(liter), Some(col)) = (
297-
left_any.downcast_ref::<phys_expr::Literal>(),
298-
right_any.downcast_ref::<phys_expr::Column>(),
299-
) {
300-
return Some((col.clone(), liter.value().clone()));
301-
}
302-
None
205+
// Bloom filters are probabilistic data structures that can return false
206+
// positives (i.e. it might return true even if the value is not
207+
// present) however, the bloom filter will return `false` if the value is
208+
// definitely not present.
209+
210+
let known_not_present = values
211+
.iter()
212+
.map(|value| match value {
213+
ScalarValue::Utf8(Some(v)) => sbbf.check(&v.as_str()),
214+
ScalarValue::Boolean(Some(v)) => sbbf.check(v),
215+
ScalarValue::Float64(Some(v)) => sbbf.check(v),
216+
ScalarValue::Float32(Some(v)) => sbbf.check(v),
217+
ScalarValue::Int64(Some(v)) => sbbf.check(v),
218+
ScalarValue::Int32(Some(v)) => sbbf.check(v),
219+
ScalarValue::Int16(Some(v)) => sbbf.check(v),
220+
ScalarValue::Int8(Some(v)) => sbbf.check(v),
221+
_ => true,
222+
})
223+
// The row group doesn't contain any of the values if
224+
// all the checks are false
225+
.all(|v| !v);
226+
227+
let contains = if known_not_present {
228+
Some(false)
229+
} else {
230+
// Given the bloom filter is probabilistic, we can't be sure that
231+
// the row group actually contains the values. Return `None` to
232+
// indicate this uncertainty
233+
None
234+
};
235+
236+
Some(BooleanArray::from(vec![contains]))
303237
}
304238
}
305239

@@ -1367,6 +1301,7 @@ mod tests {
13671301

13681302
let metadata = builder.metadata().clone();
13691303
let pruned_row_group = prune_row_groups_by_bloom_filters(
1304+
pruning_predicate.schema(),
13701305
&mut builder,
13711306
row_groups,
13721307
metadata.row_groups(),

0 commit comments

Comments
 (0)