Skip to content
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
408cee1
#17801 Improve nullability reporting of case expressions
pepijnve Sep 28, 2025
045fc9c
#17801 Clarify logical expression test cases
pepijnve Sep 29, 2025
de8b780
#17801 Attempt to clarify const evaluation logic
pepijnve Sep 30, 2025
bbd2949
#17801 Extend predicate const evaluation
pepijnve Sep 30, 2025
2075f4b
#17801 Correctly report nullability of implicit casts in predicates
pepijnve Oct 1, 2025
8c87937
#17801 Code formatting
pepijnve Oct 6, 2025
e155d41
Merge branch 'main' into issue_17801
alamb Oct 8, 2025
5cfe8b6
Merge branch 'main' into issue_17801
alamb Oct 8, 2025
ac4267c
Add comment explaining why the logical plan optimizer is triggered
pepijnve Oct 9, 2025
101db28
Simplify predicate eval code
pepijnve Oct 9, 2025
f4c8579
Code formatting
pepijnve Oct 9, 2025
81b6ec1
Add license header
pepijnve Oct 9, 2025
b6ebd13
Merge branch 'main' into issue_17801
alamb Oct 15, 2025
ebc2d38
Merge branch 'refs/heads/main' into issue_17801
pepijnve Nov 6, 2025
3131899
Try to align logical and physical implementations as much as possible
pepijnve Nov 6, 2025
3da92e5
Allow optimizations to change fields from nullable to not-nullable
pepijnve Nov 6, 2025
0a6b2e7
Correctly handle case-with-expression nullability analysis
pepijnve Nov 7, 2025
113e899
Add unit tests for predicate_eval
pepijnve Nov 7, 2025
9dee1e8
Another attempt to make the code easier to read
pepijnve Nov 7, 2025
4a22dfc
Rework predicate_eval to use set arithmetic
pepijnve Nov 8, 2025
a1bc263
Rename predicate_eval to predicate_bounds
pepijnve Nov 8, 2025
ac765e9
Add unit tests for NullableInterval::is_certainly_...
pepijnve Nov 8, 2025
51af749
Formatting
pepijnve Nov 8, 2025
4af84a7
Simplify logical and physical case branch filtering logic
pepijnve Nov 9, 2025
427fc30
Further simplification of `is_null`
pepijnve Nov 10, 2025
0223a54
Merge remote-tracking branch 'upstream/HEAD' into issue_17801
pepijnve Nov 10, 2025
c5914d6
Update bitflags version declaration to match arrow-schema
pepijnve Nov 10, 2025
4b879e4
Silence "needless pass by value" lint
pepijnve Nov 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use crate::schema_equivalence::schema_satisfied_by;
use arrow::array::{builder::StringBuilder, RecordBatch};
use arrow::compute::SortOptions;
use arrow::datatypes::Schema;
use arrow_schema::Field;
use datafusion_catalog::ScanArgs;
use datafusion_common::display::ToStringifiedPlan;
use datafusion_common::format::ExplainAnalyzeLevel;
Expand Down Expand Up @@ -2516,7 +2517,9 @@ impl<'a> OptimizationInvariantChecker<'a> {
previous_schema: Arc<Schema>,
) -> Result<()> {
// if the rule is not permitted to change the schema, confirm that it did not change.
if self.rule.schema_check() && plan.schema() != previous_schema {
if self.rule.schema_check()
&& !is_allowed_schema_change(previous_schema.as_ref(), plan.schema().as_ref())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change relaxes the schema check slightly. It now allows individual fields to change from nullable to not-nullable which is ok because it only allow a strict subset of the original schema. schema_check has documentation stating that you should disable the schema check entirely if you want to do this. Seemed better to not have to disable checking entirely.

{
internal_err!("PhysicalOptimizer rule '{}' failed. Schema mismatch. Expected original schema: {:?}, got new schema: {:?}",
self.rule.name(),
previous_schema,
Expand All @@ -2532,6 +2535,33 @@ impl<'a> OptimizationInvariantChecker<'a> {
}
}

/// Checks if the change from `old` schema to `new` is allowed or not.
/// The current implementation only allows nullability of individual fields to change
/// from 'nullable' to 'not nullable'.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think adding rationale about why would be helpful here as it may not be immediately obvious

Suggested change
/// from 'nullable' to 'not nullable'.
/// from 'nullable' to 'not nullable' (that is that the physical expression knows
/// more about its null-ness than its logical counterpart)

fn is_allowed_schema_change(old: &Schema, new: &Schema) -> bool {
if new.metadata != old.metadata {
return false;
}

if new.fields.len() != old.fields.len() {
return false;
}

let new_fields = new.fields.iter().map(|f| f.as_ref());
let old_fields = old.fields.iter().map(|f| f.as_ref());
old_fields
.zip(new_fields)
.all(|(old, new)| is_allowed_field_change(old, new))
}

fn is_allowed_field_change(old_field: &Field, new_field: &Field) -> bool {
new_field.name() == old_field.name()
&& new_field.data_type() == old_field.data_type()
&& new_field.metadata() == old_field.metadata()
&& (new_field.is_nullable() == old_field.is_nullable()
|| !new_field.is_nullable())
}

impl<'n> TreeNodeVisitor<'n> for OptimizationInvariantChecker<'_> {
type Node = Arc<dyn ExecutionPlan>;

Expand Down
5 changes: 4 additions & 1 deletion datafusion/core/tests/tpcds_planning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1052,9 +1052,12 @@ async fn regression_test(query_no: u8, create_physical: bool) -> Result<()> {
for sql in &sql {
let df = ctx.sql(sql).await?;
let (state, plan) = df.into_parts();
let plan = state.optimize(&plan)?;
if create_physical {
let _ = state.create_physical_plan(&plan).await?;
} else {
// Run the logical optimizer even if we are not creating the physical plan
// to ensure it will properly succeed
let _ = state.optimize(&plan)?;
}
}

Expand Down
5 changes: 5 additions & 0 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,11 @@ pub fn is_null(expr: Expr) -> Expr {
Expr::IsNull(Box::new(expr))
}

/// Create is not null expression
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a nice drive by cleanup

pub fn is_not_null(expr: Expr) -> Expr {
Expr::IsNotNull(Box::new(expr))
}

/// Create is true expression
pub fn is_true(expr: Expr) -> Expr {
Expr::IsTrue(Box::new(expr))
Expand Down
203 changes: 193 additions & 10 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use super::{Between, Expr, Like};
use super::{predicate_eval, Between, Expr, Like};
use crate::expr::{
AggregateFunction, AggregateFunctionParams, Alias, BinaryExpr, Cast, InList,
InSubquery, Placeholder, ScalarFunction, TryCast, Unnest, WindowFunction,
Expand Down Expand Up @@ -282,14 +282,66 @@ impl ExprSchemable for Expr {
Expr::OuterReferenceColumn(field, _) => Ok(field.is_nullable()),
Expr::Literal(value, _) => Ok(value.is_null()),
Expr::Case(case) => {
// This expression is nullable if any of the input expressions are nullable
let then_nullable = case
.when_then_expr
.iter()
.map(|(_, t)| t.nullable(input_schema))
.collect::<Result<Vec<_>>>()?;
if then_nullable.contains(&true) {
Ok(true)
let nullable_then = if case.expr.is_some() {
// Case-with-expression is nullable if any of the 'then' expressions.
// Assume all 'then' expressions are reachable
case.when_then_expr
.iter()
.filter_map(|(_, t)| match t.nullable(input_schema) {
Ok(n) => {
if n {
Some(Ok(()))
} else {
None
}
}
Err(e) => Some(Err(e)),
})
.next()
} else {
// case-without-expression is nullable if any of the 'then' expressions is nullable
// and reachable when the 'then' expression evaluates to `null`.
case.when_then_expr
.iter()
.filter_map(|(w, t)| {
match t.nullable(input_schema) {
// Branches with a then expression that is not nullable can be skipped
Ok(false) => None,
// Pass on error determining nullability verbatim
Err(e) => Some(Err(e)),
// For branches with a nullable 'then' expression, try to determine
// using limited const evaluation if the branch will be taken when
// the 'then' expression evaluates to null.
Ok(true) => {
let is_null = |expr: &Expr /* Type */| {
if expr.eq(t) {
Some(true)
} else {
None
}
};

match predicate_eval::const_eval_predicate(
w,
is_null,
input_schema,
) {
// Const evaluation was inconclusive or determined the branch
// would be taken
None | Some(true) => Some(Ok(())),
// Const evaluation proves the branch will never be taken.
// The most common pattern for this is `WHEN x IS NOT NULL THEN x`.
Some(false) => None,
}
}
}
})
.next()
};

if let Some(nullable_then) = nullable_then {
// There is at least one reachable nullable then
nullable_then.map(|_| true)
} else if let Some(e) = &case.else_expr {
e.nullable(input_schema)
} else {
Expand Down Expand Up @@ -773,7 +825,7 @@ mod tests {
use std::collections::HashMap;

use super::*;
use crate::{col, lit, out_ref_col_with_metadata};
use crate::{and, col, lit, not, or, out_ref_col_with_metadata, when};

use datafusion_common::{internal_err, DFSchema, ScalarValue};

Expand Down Expand Up @@ -826,6 +878,137 @@ mod tests {
assert!(expr.nullable(&get_schema(false)).unwrap());
}

fn assert_nullability(expr: &Expr, schema: &dyn ExprSchema, expected: bool) {
assert_eq!(
expr.nullable(schema).unwrap(),
expected,
"Nullability of '{expr}' should be {expected}"
);
}

fn assert_not_nullable(expr: &Expr, schema: &dyn ExprSchema) {
assert_nullability(expr, schema, false);
}

fn assert_nullable(expr: &Expr, schema: &dyn ExprSchema) {
assert_nullability(expr, schema, true);
}

#[test]
fn test_case_expression_nullability() -> Result<()> {
let nullable_schema = MockExprSchema::new()
.with_data_type(DataType::Int32)
.with_nullable(true);

let not_nullable_schema = MockExprSchema::new()
.with_data_type(DataType::Int32)
.with_nullable(false);

// CASE WHEN x IS NOT NULL THEN x ELSE 0
let e = when(col("x").is_not_null(), col("x")).otherwise(lit(0))?;
assert_not_nullable(&e, &nullable_schema);
assert_not_nullable(&e, &not_nullable_schema);

// CASE WHEN NOT x IS NULL THEN x ELSE 0
let e = when(not(col("x").is_null()), col("x")).otherwise(lit(0))?;
assert_not_nullable(&e, &nullable_schema);
assert_not_nullable(&e, &not_nullable_schema);

// CASE WHEN X = 5 THEN x ELSE 0
let e = when(col("x").eq(lit(5)), col("x")).otherwise(lit(0))?;
assert_not_nullable(&e, &nullable_schema);
assert_not_nullable(&e, &not_nullable_schema);

// CASE WHEN x IS NOT NULL AND x = 5 THEN x ELSE 0
let e = when(and(col("x").is_not_null(), col("x").eq(lit(5))), col("x"))
.otherwise(lit(0))?;
assert_not_nullable(&e, &nullable_schema);
assert_not_nullable(&e, &not_nullable_schema);

// CASE WHEN x = 5 AND x IS NOT NULL THEN x ELSE 0
let e = when(and(col("x").eq(lit(5)), col("x").is_not_null()), col("x"))
.otherwise(lit(0))?;
assert_not_nullable(&e, &nullable_schema);
assert_not_nullable(&e, &not_nullable_schema);

// CASE WHEN x IS NOT NULL OR x = 5 THEN x ELSE 0
let e = when(or(col("x").is_not_null(), col("x").eq(lit(5))), col("x"))
.otherwise(lit(0))?;
assert_not_nullable(&e, &nullable_schema);
assert_not_nullable(&e, &not_nullable_schema);

// CASE WHEN x = 5 OR x IS NOT NULL THEN x ELSE 0
let e = when(or(col("x").eq(lit(5)), col("x").is_not_null()), col("x"))
.otherwise(lit(0))?;
assert_not_nullable(&e, &nullable_schema);
assert_not_nullable(&e, &not_nullable_schema);

// CASE WHEN (x = 5 AND x IS NOT NULL) OR (x = bar AND x IS NOT NULL) THEN x ELSE 0
let e = when(
or(
and(col("x").eq(lit(5)), col("x").is_not_null()),
and(col("x").eq(col("bar")), col("x").is_not_null()),
),
col("x"),
)
.otherwise(lit(0))?;
assert_not_nullable(&e, &nullable_schema);
assert_not_nullable(&e, &not_nullable_schema);

// CASE WHEN x = 5 OR x IS NULL THEN x ELSE 0
let e = when(or(col("x").eq(lit(5)), col("x").is_null()), col("x"))
.otherwise(lit(0))?;
assert_nullable(&e, &nullable_schema);
assert_not_nullable(&e, &not_nullable_schema);

// CASE WHEN x IS TRUE THEN x ELSE 0
let e = when(col("x").is_true(), col("x")).otherwise(lit(0))?;
assert_not_nullable(&e, &nullable_schema);
assert_not_nullable(&e, &not_nullable_schema);

// CASE WHEN x IS NOT TRUE THEN x ELSE 0
let e = when(col("x").is_not_true(), col("x")).otherwise(lit(0))?;
assert_nullable(&e, &nullable_schema);
assert_not_nullable(&e, &not_nullable_schema);

// CASE WHEN x IS FALSE THEN x ELSE 0
let e = when(col("x").is_false(), col("x")).otherwise(lit(0))?;
assert_not_nullable(&e, &nullable_schema);
assert_not_nullable(&e, &not_nullable_schema);

// CASE WHEN x IS NOT FALSE THEN x ELSE 0
let e = when(col("x").is_not_false(), col("x")).otherwise(lit(0))?;
assert_nullable(&e, &nullable_schema);
assert_not_nullable(&e, &not_nullable_schema);

// CASE WHEN x IS UNKNOWN THEN x ELSE 0
let e = when(col("x").is_unknown(), col("x")).otherwise(lit(0))?;
assert_nullable(&e, &nullable_schema);
assert_not_nullable(&e, &not_nullable_schema);

// CASE WHEN x IS NOT UNKNOWN THEN x ELSE 0
let e = when(col("x").is_not_unknown(), col("x")).otherwise(lit(0))?;
assert_not_nullable(&e, &nullable_schema);
assert_not_nullable(&e, &not_nullable_schema);

// CASE WHEN x LIKE 'x' THEN x ELSE 0
let e = when(col("x").like(lit("x")), col("x")).otherwise(lit(0))?;
assert_not_nullable(&e, &nullable_schema);
assert_not_nullable(&e, &not_nullable_schema);

// CASE WHEN 0 THEN x ELSE 0
let e = when(lit(0), col("x")).otherwise(lit(0))?;
assert_not_nullable(&e, &nullable_schema);
assert_not_nullable(&e, &not_nullable_schema);

// CASE WHEN 1 THEN x ELSE 0
let e = when(lit(1), col("x")).otherwise(lit(0))?;
assert_nullable(&e, &nullable_schema);
assert_not_nullable(&e, &not_nullable_schema);

Ok(())
}

#[test]
fn test_inlist_nullability() {
let get_schema = |nullable| {
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub mod async_udf;
pub mod statistics {
pub use datafusion_expr_common::statistics::*;
}
mod predicate_eval;
pub mod ptr_eq;
pub mod test;
pub mod tree_node;
Expand Down
Loading