From fad31ec0f5e17f0cf1c3d6a9a6f58618714c3d17 Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Thu, 6 Mar 2025 00:27:54 +0800 Subject: [PATCH 01/10] fix: nested window function --- datafusion/core/tests/sql/sql_api.rs | 18 ++++++++ datafusion/sql/src/select.rs | 68 +++++++++++++++++++--------- 2 files changed, 64 insertions(+), 22 deletions(-) diff --git a/datafusion/core/tests/sql/sql_api.rs b/datafusion/core/tests/sql/sql_api.rs index 034d6fa23d9c..ac77fa9312e7 100644 --- a/datafusion/core/tests/sql/sql_api.rs +++ b/datafusion/core/tests/sql/sql_api.rs @@ -19,6 +19,24 @@ use datafusion::prelude::*; use tempfile::TempDir; +#[tokio::test] +async fn test_window_function() { + let ctx = SessionContext::new(); + let df = ctx + .sql( + r#"SELECT + t1.v1, + SUM(t1.v1) OVER w + 1 + FROM + generate_series(1, 10000) AS t1(v1) + WINDOW + w AS (ORDER BY t1.v1 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);"#, + ) + .await; + println!("{:?}", df); + assert!(df.is_ok()); +} + #[tokio::test] async fn unsupported_ddl_returns_error() { // Verify SessionContext::with_sql_options errors appropriately diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index e21def4c3941..b594fd6a35f1 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -16,6 +16,7 @@ // under the License. use std::collections::HashSet; +use std::ops::ControlFlow; use std::sync::Arc; use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; @@ -27,7 +28,7 @@ use crate::utils::{ use datafusion_common::error::DataFusionErrorBuilder; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; -use datafusion_common::{not_impl_err, plan_err, Result}; +use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result}; use datafusion_common::{RecursionUnnestOption, UnnestOptions}; use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions}; use datafusion_expr::expr_rewriter::{ @@ -44,8 +45,8 @@ use datafusion_expr::{ use indexmap::IndexMap; use sqlparser::ast::{ - Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, OrderByExpr, - WildcardAdditionalOptions, WindowType, + Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, OrderByExpr, VisitMut, + VisitorMut, WildcardAdditionalOptions, WindowType, }; use sqlparser::ast::{NamedWindowDefinition, Select, SelectItem, TableWithJoins}; @@ -849,21 +850,19 @@ fn check_conflicting_windows(window_defs: &[NamedWindowDefinition]) -> Result<() Ok(()) } -// If the projection is done over a named window, that window -// name must be defined. Otherwise, it gives an error. -fn match_window_definitions( - projection: &mut [SelectItem], - named_windows: &[NamedWindowDefinition], -) -> Result<()> { - for proj in projection.iter_mut() { - if let SelectItem::ExprWithAlias { - expr: SQLExpr::Function(f), - alias: _, - } - | SelectItem::UnnamedExpr(SQLExpr::Function(f)) = proj - { - for NamedWindowDefinition(window_ident, window_expr) in named_windows.iter() { - if let Some(WindowType::NamedWindow(ident)) = &f.over { +// Visit the expression to find all window functions +struct WindowFunctionVisitor<'a> { + named_windows: &'a [NamedWindowDefinition], +} + +impl VisitorMut for WindowFunctionVisitor<'_> { + fn pre_visit_expr(&mut self, expr: &mut SQLExpr) -> ControlFlow { + if let SQLExpr::Function(f) = expr { + if let Some(WindowType::NamedWindow(ident)) = &f.over { + let ident = ident.clone(); + for NamedWindowDefinition(window_ident, window_expr) in + self.named_windows.iter() + { if ident.eq(window_ident) { f.over = Some(match window_expr { NamedWindowExpr::NamedWindow(ident) => { @@ -875,11 +874,36 @@ fn match_window_definitions( }) } } + // All named windows must be defined with a WindowSpec. + if let Some(WindowType::NamedWindow(ident)) = &f.over { + return ControlFlow::Break(DataFusionError::Plan(format!( + "The window {ident} is not defined!" + ))); + } } - // All named windows must be defined with a WindowSpec. - if let Some(WindowType::NamedWindow(ident)) = &f.over { - return plan_err!("The window {ident} is not defined!"); - } + } + ControlFlow::Continue(()) + } + + type Break = DataFusionError; +} + +// If the projection is done over a named window, that window +// name must be defined. Otherwise, it gives an error. +fn match_window_definitions( + projection: &mut [SelectItem], + named_windows: &[NamedWindowDefinition], +) -> Result<()> { + for proj in projection.iter_mut() { + if let SelectItem::ExprWithAlias { expr, alias: _ } + | SelectItem::UnnamedExpr(expr) = proj + { + let mut visitor = WindowFunctionVisitor { named_windows }; + + match VisitMut::visit(expr, &mut visitor) { + ControlFlow::Continue(_) => (), + ControlFlow::Break(err) => return Err(err), + }; } } Ok(()) From 2d826b97d22f177077cac25c3bd05ac818a65412 Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Thu, 6 Mar 2025 08:58:46 +0800 Subject: [PATCH 02/10] prevent stackoverflow --- datafusion/sql/src/select.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index b594fd6a35f1..c67d6776c75b 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -894,6 +894,9 @@ fn match_window_definitions( projection: &mut [SelectItem], named_windows: &[NamedWindowDefinition], ) -> Result<()> { + if named_windows.is_empty() { + return Ok(()); + } for proj in projection.iter_mut() { if let SelectItem::ExprWithAlias { expr, alias: _ } | SelectItem::UnnamedExpr(expr) = proj From f64d9d78803be2a1efccbeca4b3985f89152ec7c Mon Sep 17 00:00:00 2001 From: Chen Chongchen Date: Thu, 6 Mar 2025 11:20:29 +0800 Subject: [PATCH 03/10] Update select.rs --- datafusion/sql/src/select.rs | 78 +++++++++++++++--------------------- 1 file changed, 32 insertions(+), 46 deletions(-) diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index c67d6776c75b..ca4aa5b9e8ed 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -45,8 +45,8 @@ use datafusion_expr::{ use indexmap::IndexMap; use sqlparser::ast::{ - Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, OrderByExpr, VisitMut, - VisitorMut, WildcardAdditionalOptions, WindowType, + visit_expressions_mut, Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, + OrderByExpr, WildcardAdditionalOptions, WindowType, }; use sqlparser::ast::{NamedWindowDefinition, Select, SelectItem, TableWithJoins}; @@ -850,60 +850,46 @@ fn check_conflicting_windows(window_defs: &[NamedWindowDefinition]) -> Result<() Ok(()) } -// Visit the expression to find all window functions -struct WindowFunctionVisitor<'a> { - named_windows: &'a [NamedWindowDefinition], -} - -impl VisitorMut for WindowFunctionVisitor<'_> { - fn pre_visit_expr(&mut self, expr: &mut SQLExpr) -> ControlFlow { - if let SQLExpr::Function(f) = expr { - if let Some(WindowType::NamedWindow(ident)) = &f.over { - let ident = ident.clone(); - for NamedWindowDefinition(window_ident, window_expr) in - self.named_windows.iter() - { - if ident.eq(window_ident) { - f.over = Some(match window_expr { - NamedWindowExpr::NamedWindow(ident) => { - WindowType::NamedWindow(ident.clone()) - } - NamedWindowExpr::WindowSpec(spec) => { - WindowType::WindowSpec(spec.clone()) - } - }) - } - } - // All named windows must be defined with a WindowSpec. - if let Some(WindowType::NamedWindow(ident)) = &f.over { - return ControlFlow::Break(DataFusionError::Plan(format!( - "The window {ident} is not defined!" - ))); - } - } - } - ControlFlow::Continue(()) - } - - type Break = DataFusionError; -} - // If the projection is done over a named window, that window // name must be defined. Otherwise, it gives an error. fn match_window_definitions( projection: &mut [SelectItem], named_windows: &[NamedWindowDefinition], ) -> Result<()> { - if named_windows.is_empty() { - return Ok(()); - } for proj in projection.iter_mut() { if let SelectItem::ExprWithAlias { expr, alias: _ } | SelectItem::UnnamedExpr(expr) = proj { - let mut visitor = WindowFunctionVisitor { named_windows }; - - match VisitMut::visit(expr, &mut visitor) { + let result = visit_expressions_mut(expr, |expr| { + if let SQLExpr::Function(f) = expr { + if let Some(WindowType::NamedWindow(_)) = &f.over { + for NamedWindowDefinition(window_ident, window_expr) in + named_windows + { + if let Some(WindowType::NamedWindow(ident)) = &f.over { + if ident.eq(window_ident) { + f.over = Some(match window_expr { + NamedWindowExpr::NamedWindow(ident) => { + WindowType::NamedWindow(ident.clone()) + } + NamedWindowExpr::WindowSpec(spec) => { + WindowType::WindowSpec(spec.clone()) + } + }) + } + } + } + // All named windows must be defined with a WindowSpec. + if let Some(WindowType::NamedWindow(ident)) = &f.over { + return ControlFlow::Break(DataFusionError::Plan(format!( + "The window {ident} is not defined!" + ))); + } + } + } + ControlFlow::Continue(()) + }); + match result { ControlFlow::Continue(_) => (), ControlFlow::Break(err) => return Err(err), }; From 71684b01fb67e9bb456529ffac99ac27a293a7ff Mon Sep 17 00:00:00 2001 From: Chen Chongchen Date: Thu, 6 Mar 2025 11:22:21 +0800 Subject: [PATCH 04/10] Update sqllogictests.rs --- datafusion/sqllogictest/bin/sqllogictests.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs b/datafusion/sqllogictest/bin/sqllogictests.rs index bbb88819efe0..af10bbb7e73e 100644 --- a/datafusion/sqllogictest/bin/sqllogictests.rs +++ b/datafusion/sqllogictest/bin/sqllogictests.rs @@ -53,6 +53,7 @@ const SQLITE_PREFIX: &str = "sqlite"; pub fn main() -> Result<()> { tokio::runtime::Builder::new_multi_thread() + .thread_stack_size(4 * 1024 * 1024) .enable_all() .build()? .block_on(run_tests()) From c58bdf49f645847888c0547a5db62750f1c4925a Mon Sep 17 00:00:00 2001 From: Chen Chongchen Date: Thu, 6 Mar 2025 11:22:51 +0800 Subject: [PATCH 05/10] Update sql_api.rs --- datafusion/core/tests/sql/sql_api.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/core/tests/sql/sql_api.rs b/datafusion/core/tests/sql/sql_api.rs index ac77fa9312e7..ec086bcc50c7 100644 --- a/datafusion/core/tests/sql/sql_api.rs +++ b/datafusion/core/tests/sql/sql_api.rs @@ -33,7 +33,6 @@ async fn test_window_function() { w AS (ORDER BY t1.v1 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);"#, ) .await; - println!("{:?}", df); assert!(df.is_ok()); } From 3a8e7c7be34d85af974cc868bc0b4dc67171fa99 Mon Sep 17 00:00:00 2001 From: Chen Chongchen Date: Wed, 12 Mar 2025 12:36:19 +0800 Subject: [PATCH 06/10] Update select.rs --- datafusion/sql/src/select.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index ca4aa5b9e8ed..504ab328f844 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -860,7 +860,8 @@ fn match_window_definitions( if let SelectItem::ExprWithAlias { expr, alias: _ } | SelectItem::UnnamedExpr(expr) = proj { - let result = visit_expressions_mut(expr, |expr| { + let mut err = None; + visit_expressions_mut(expr, |expr| { if let SQLExpr::Function(f) = expr { if let Some(WindowType::NamedWindow(_)) = &f.over { for NamedWindowDefinition(window_ident, window_expr) in @@ -881,18 +882,18 @@ fn match_window_definitions( } // All named windows must be defined with a WindowSpec. if let Some(WindowType::NamedWindow(ident)) = &f.over { - return ControlFlow::Break(DataFusionError::Plan(format!( + err = Some(DataFusionError::Plan(format!( "The window {ident} is not defined!" ))); + return ControlFlow::Break(()); } } } ControlFlow::Continue(()) }); - match result { - ControlFlow::Continue(_) => (), - ControlFlow::Break(err) => return Err(err), - }; + if let Some(err) = err { + return Err(err); + } } } Ok(()) From a12e8ca7d72091d25464cbef26645b958760c940 Mon Sep 17 00:00:00 2001 From: Chen Chongchen Date: Wed, 12 Mar 2025 12:37:42 +0800 Subject: [PATCH 07/10] Update sqllogictests.rs --- datafusion/sqllogictest/bin/sqllogictests.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs b/datafusion/sqllogictest/bin/sqllogictests.rs index af10bbb7e73e..bbb88819efe0 100644 --- a/datafusion/sqllogictest/bin/sqllogictests.rs +++ b/datafusion/sqllogictest/bin/sqllogictests.rs @@ -53,7 +53,6 @@ const SQLITE_PREFIX: &str = "sqlite"; pub fn main() -> Result<()> { tokio::runtime::Builder::new_multi_thread() - .thread_stack_size(4 * 1024 * 1024) .enable_all() .build()? .block_on(run_tests()) From ffa912492654389b8cf9de2cdced0e0a0d63c84c Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Fri, 4 Apr 2025 22:34:11 +0800 Subject: [PATCH 08/10] update slt --- datafusion/sqllogictest/test_files/window.slt | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 1a9acc0f531a..126115e215f6 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -5536,3 +5536,18 @@ physical_plan 01)ProjectionExec: expr=[max(aggregate_test_100_ordered.c5) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as max_c5] 02)--WindowAggExec: wdw=[max(aggregate_test_100_ordered.c5) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "max(aggregate_test_100_ordered.c5) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }] 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c5], file_type=csv, has_header=true + +query II +SELECT + t1.v1, + SUM(t1.v1) OVER w + 1 +FROM + generate_series(1, 5) AS t1(v1) +WINDOW + w AS (ORDER BY t1.v1); +---- +1 2 +2 4 +3 7 +4 11 +5 16 From b80f488d059998f7b7e86d7d093aac8ec119896d Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Fri, 4 Apr 2025 22:52:58 +0800 Subject: [PATCH 09/10] update slt --- datafusion/sqllogictest/test_files/window.slt | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 1e87e6ebb2b5..c5c094cad3da 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -5537,8 +5537,7 @@ physical_plan 02)--WindowAggExec: wdw=[max(aggregate_test_100_ordered.c5) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "max(aggregate_test_100_ordered.c5) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }] 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c5], file_type=csv, has_header=true -<<<<<<< HEAD -query II +query II rowsort SELECT t1.v1, SUM(t1.v1) OVER w + 1 @@ -5552,7 +5551,7 @@ WINDOW 3 7 4 11 5 16 -======= + # Testing Utf8View with window statement ok CREATE TABLE aggregate_test_100_utf8view AS SELECT @@ -5611,4 +5610,3 @@ DROP TABLE aggregate_test_100_utf8view; statement ok DROP TABLE aggregate_test_100 ->>>>>>> main From 18733fd3f50e835f45891a78daa11b0398cfded3 Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Fri, 4 Apr 2025 22:55:56 +0800 Subject: [PATCH 10/10] clippy --- datafusion/sql/src/select.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index f823330cb607..85dba0f43081 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -46,8 +46,8 @@ use datafusion_expr::{ use indexmap::IndexMap; use sqlparser::ast::{ - visit_expressions_mut, Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, OrderBy, - SelectItemQualifiedWildcardKind, WildcardAdditionalOptions, WindowType, + visit_expressions_mut, Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, + OrderBy, SelectItemQualifiedWildcardKind, WildcardAdditionalOptions, WindowType, }; use sqlparser::ast::{NamedWindowDefinition, Select, SelectItem, TableWithJoins};