Skip to content
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
ea83c37
separate implementation of oeq properties
mustafasrepo Aug 10, 2023
0d9f208
Merge branch 'apache_main' into refactor/oeq_properties
mustafasrepo Aug 17, 2023
4ad5ec5
Simplifications
mustafasrepo Aug 17, 2023
016558b
Move utils to methods
mustafasrepo Aug 17, 2023
8007b1b
Remove unnecesary code
mustafasrepo Aug 17, 2023
5d896a8
Address todo
mustafasrepo Aug 17, 2023
b8def0a
Buggy is_aggressive mod eklenecek
mustafasrepo Aug 17, 2023
8850f33
start implementing aggressive mode
mustafasrepo Aug 17, 2023
0d32ca5
all tests pass
mustafasrepo Sep 6, 2023
aac0a0c
minor changes
mustafasrepo Sep 6, 2023
f0dbd85
All tests pass
mustafasrepo Sep 6, 2023
7112a25
Minor changes
mustafasrepo Sep 6, 2023
ec41194
All tests pass
mustafasrepo Sep 7, 2023
b16ad15
minor changes
mustafasrepo Sep 7, 2023
717631e
all tests pass
mustafasrepo Sep 7, 2023
b93cc5d
Simplifications
mustafasrepo Sep 7, 2023
b832b2d
minor changes
mustafasrepo Sep 7, 2023
5a92633
Merge branch 'apache_main' into refactor/oeq_properties
mustafasrepo Sep 7, 2023
858576b
Resolve linter error
mustafasrepo Sep 7, 2023
09aa6c8
Minor changes
mustafasrepo Sep 7, 2023
7212e56
minor changes
mustafasrepo Sep 8, 2023
49ea333
Update plan
mustafasrepo Sep 8, 2023
eb81b43
Merge branch 'apache_main' into refactor/oeq_properties
mustafasrepo Sep 8, 2023
18c4bab
Simplifications, update comments
mustafasrepo Sep 8, 2023
fe322b4
Update comments, Use existing stats to find constants
mustafasrepo Sep 8, 2023
0cb1ee2
Merge branch 'apache_main' into refactor/oeq_properties
mustafasrepo Sep 12, 2023
cff6f2f
Simplifications
mustafasrepo Sep 12, 2023
6cb4d5a
Unknown input stats are handled
berkaysynnada Sep 12, 2023
ef994fb
Address reviews
mustafasrepo Sep 13, 2023
0290184
Merge branch 'apache_main' into refactor/oeq_properties
mustafasrepo Sep 13, 2023
4fe9c0d
Simplifications
mustafasrepo Sep 15, 2023
1c1de0d
Simplifications
mustafasrepo Sep 15, 2023
23e30ae
Merge branch 'apache_main' into refactor/oeq_properties
mustafasrepo Sep 18, 2023
abe0f31
Address reviews
mustafasrepo Sep 18, 2023
2eaf755
Fix subdirectories
mustafasrepo Sep 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

use std::fmt::Display;

use arrow::datatypes::DataType;

use crate::ScalarValue;

/// Statistics for a relation
Expand Down Expand Up @@ -70,3 +72,24 @@ pub struct ColumnStatistics {
/// Number of distinct values
pub distinct_count: Option<usize>,
}

impl ColumnStatistics {
pub fn is_singleton(&self) -> bool {
match (&self.min_value, &self.max_value) {
// Min and max values are the same and not infinity.
(Some(min), Some(max)) => !min.is_null() && !max.is_null() && (min == max),
(_, _) => false,
}
}

/// Returns the [`ColumnStatistics`] corresponding to the given datatype by assigning infinite bounds.
pub fn new_with_unbounded_column(dt: &DataType) -> ColumnStatistics {
let null = ScalarValue::try_from(dt.clone()).ok();
ColumnStatistics {
null_count: None,
max_value: null.clone(),
min_value: null,
distinct_count: None,
}
}
}
24 changes: 4 additions & 20 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ use datafusion_physical_expr::utils::{
map_columns_before_projection, ordering_satisfy_requirement_concrete,
};
use datafusion_physical_expr::{
expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, PhysicalExpr,
PhysicalSortRequirement,
expr_list_eq_strict_order, PhysicalExpr, PhysicalSortRequirement,
};

use datafusion_common::internal_err;
Expand Down Expand Up @@ -804,36 +803,21 @@ fn try_reorder(
} else if !equivalence_properties.classes().is_empty() {
normalized_expected = expected
.iter()
.map(|e| {
normalize_expr_with_equivalence_properties(
e.clone(),
equivalence_properties.classes(),
)
})
.map(|e| equivalence_properties.normalize_expr(e.clone()))
.collect::<Vec<_>>();
assert_eq!(normalized_expected.len(), expected.len());

normalized_left_keys = join_keys
.left_keys
.iter()
.map(|e| {
normalize_expr_with_equivalence_properties(
e.clone(),
equivalence_properties.classes(),
)
})
.map(|e| equivalence_properties.normalize_expr(e.clone()))
.collect::<Vec<_>>();
assert_eq!(join_keys.left_keys.len(), normalized_left_keys.len());

normalized_right_keys = join_keys
.right_keys
.iter()
.map(|e| {
normalize_expr_with_equivalence_properties(
e.clone(),
equivalence_properties.classes(),
)
})
.map(|e| equivalence_properties.normalize_expr(e.clone()))
.collect::<Vec<_>>();
assert_eq!(join_keys.right_keys.len(), normalized_right_keys.len());

Expand Down
26 changes: 23 additions & 3 deletions datafusion/core/src/physical_plan/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ use datafusion_common::{plan_err, DataFusionError, Result};
use datafusion_execution::TaskContext;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::BinaryExpr;
use datafusion_physical_expr::intervals::utils::check_support;
use datafusion_physical_expr::{
analyze, split_conjunction, AnalysisContext, ExprBoundaries,
OrderingEquivalenceProperties, PhysicalExpr,
};

use datafusion_physical_expr::intervals::utils::check_support;
use datafusion_physical_expr::utils::collect_columns;
use futures::stream::{Stream, StreamExt};
use log::trace;

Expand Down Expand Up @@ -153,7 +154,19 @@ impl ExecutionPlan for FilterExec {
}

fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
self.input.ordering_equivalence_properties()
let stats = self.statistics();
// Add the columns that have only one value (singleton) after filtering to constants.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there a a difference between OrderingEquivalenceProperties and EquivalenceProperties -- it seems like using statistics based equivalence as well as predicate based equivalence would be relevant to both

In other words, if the filter has a predicate like column = 5 shouldn't column be added to list of constants even if the column had more than one value in the statistics?

Copy link
Contributor Author

@mustafasrepo mustafasrepo Sep 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other words, if the filter has a predicate like column = 5 shouldn't column be added to list of constants even if the column had more than one value in the statistics?

In this case, statistics should be able to determine column will have single value(5) onwards. Hence I presumed, using statistics is sufficient. However, if there are cases, where it is obvious from the predicate value is constant, but statistics fail to resolve it. We can change this implementation I think

if let Some(col_stats) = stats.column_statistics {
let constants = collect_columns(self.predicate())
.into_iter()
.filter(|column| col_stats[column.index()].is_singleton())
.map(|column| Arc::new(column) as Arc<dyn PhysicalExpr>)
.collect::<Vec<_>>();
let filter_oeq = self.input.ordering_equivalence_properties();
filter_oeq.with_constants(constants)
} else {
self.input.ordering_equivalence_properties()
}
}

fn with_new_children(
Expand Down Expand Up @@ -197,7 +210,14 @@ impl ExecutionPlan for FilterExec {
let input_stats = self.input.statistics();
let input_column_stats = match input_stats.column_statistics {
Some(stats) => stats,
None => return Statistics::default(),
None => self
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like it is more general than just for FilterExec -- shouldn't all ExecutionPlan's return statistics that have unbounded columns in the absence of other information? Maybe we should change Statistics::default() to do this 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you. We can remove Statistics::default() implementation, and propagate unbounded columns (in the absence of information) from the source. I will try it.

.schema()
.fields
.iter()
.map(|field| {
ColumnStatistics::new_with_unbounded_column(field.data_type())
})
.collect::<Vec<_>>(),
};

let starter_ctx =
Expand Down
Loading