Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 54 additions & 48 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::utils::{

use datafusion_common::error::DataFusionErrorBuilder;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
use datafusion_common::{not_impl_err, plan_err, Result};
use datafusion_common::{RecursionUnnestOption, UnnestOptions};
use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions};
use datafusion_expr::expr_rewriter::{
Expand Down Expand Up @@ -85,7 +85,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {

// Handle named windows before processing the projection expression
check_conflicting_windows(&select.named_window)?;
match_window_definitions(&mut select.projection, &select.named_window)?;
self.match_window_definitions(&mut select.projection, &select.named_window)?;
// Process the SELECT expressions
let select_exprs = self.prepare_select_exprs(
&base_plan,
Expand Down Expand Up @@ -868,42 +868,34 @@ impl<S: ContextProvider> SqlToRel<'_, S> {

Ok((plan, select_exprs_post_aggr, having_expr_post_aggr))
}
}

// If there are any multiple-defined windows, we raise an error.
fn check_conflicting_windows(window_defs: &[NamedWindowDefinition]) -> Result<()> {
for (i, window_def_i) in window_defs.iter().enumerate() {
for window_def_j in window_defs.iter().skip(i + 1) {
if window_def_i.0 == window_def_j.0 {
return plan_err!(
"The window {} is defined multiple times!",
window_def_i.0
);
}
}
}
Ok(())
}

// If the projection is done over a named window, that window
// name must be defined. Otherwise, it gives an error.
fn match_window_definitions(
projection: &mut [SelectItem],
named_windows: &[NamedWindowDefinition],
) -> Result<()> {
for proj in projection.iter_mut() {
if let SelectItem::ExprWithAlias { expr, alias: _ }
| SelectItem::UnnamedExpr(expr) = proj
{
let mut err = None;
visit_expressions_mut(expr, |expr| {
if let SQLExpr::Function(f) = expr {
if let Some(WindowType::NamedWindow(_)) = &f.over {
for NamedWindowDefinition(window_ident, window_expr) in
named_windows
{
if let Some(WindowType::NamedWindow(ident)) = &f.over {
if ident.eq(window_ident) {
// If the projection is done over a named window, that window
// name must be defined. Otherwise, it gives an error.
fn match_window_definitions(
&self,
projection: &mut [SelectItem],
named_windows: &[NamedWindowDefinition],
) -> Result<()> {
let named_windows: Vec<(&NamedWindowDefinition, String)> = named_windows
.iter()
.map(|w| (w, self.ident_normalizer.normalize(w.0.clone())))
.collect();
for proj in projection.iter_mut() {
if let SelectItem::ExprWithAlias { expr, alias: _ }
| SelectItem::UnnamedExpr(expr) = proj
{
let mut err = None;
visit_expressions_mut(expr, |expr| {
if let SQLExpr::Function(f) = expr {
if let Some(WindowType::NamedWindow(ident)) = &f.over {
let normalized_ident =
self.ident_normalizer.normalize(ident.clone());
for (
NamedWindowDefinition(_, window_expr),
normalized_window_ident,
) in named_windows.iter()
{
if normalized_ident.eq(normalized_window_ident) {
f.over = Some(match window_expr {
NamedWindowExpr::NamedWindow(ident) => {
WindowType::NamedWindow(ident.clone())
Expand All @@ -914,20 +906,34 @@ fn match_window_definitions(
})
}
}
}
// All named windows must be defined with a WindowSpec.
if let Some(WindowType::NamedWindow(ident)) = &f.over {
err = Some(DataFusionError::Plan(format!(
"The window {ident} is not defined!"
)));
return ControlFlow::Break(());
// All named windows must be defined with a WindowSpec.
if let Some(WindowType::NamedWindow(ident)) = &f.over {
err =
Some(plan_err!("The window {ident} is not defined!"));
return ControlFlow::Break(());
}
}
}
ControlFlow::Continue(())
});
if let Some(err) = err {
return err;
}
ControlFlow::Continue(())
});
if let Some(err) = err {
return Err(err);
}
}
Ok(())
}
}

// If there are any multiple-defined windows, we raise an error.
fn check_conflicting_windows(window_defs: &[NamedWindowDefinition]) -> Result<()> {
for (i, window_def_i) in window_defs.iter().enumerate() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this method for identifying duplicate named windows? if so probably we can simplify this a little bit like

let mut seen = HashSet::new();
for (name, _) in window_defs {
    if !seen.insert(name) {
        return plan_err!("The window {} is defined multiple times!", name);
    }
}

Copy link
Contributor Author

@chenkovsky chenkovsky Apr 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't modified this method, It was there before I changed this file. I think your suggestion make sense, but maybe we can change it in another PR.

for window_def_j in window_defs.iter().skip(i + 1) {
if window_def_i.0 == window_def_j.0 {
return plan_err!(
"The window {} is defined multiple times!",
window_def_i.0
);
}
}
}
Expand Down
32 changes: 32 additions & 0 deletions datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5610,3 +5610,35 @@ DROP TABLE aggregate_test_100_utf8view;

statement ok
DROP TABLE aggregate_test_100

# window definitions with aliases
query II rowsort
SELECT
t1.v1,
SUM(t1.v1) OVER W + 1
FROM
generate_series(1, 5) AS t1(v1)
WINDOW
w AS (ORDER BY t1.v1);
----
1 2
2 4
3 7
4 11
5 16

# window definitions with aliases
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it the same test as above? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the difference is lower case & upper case.

query II rowsort
SELECT
t1.v1,
SUM(t1.v1) OVER w + 1
FROM
generate_series(1, 5) AS t1(v1)
WINDOW
W AS (ORDER BY t1.v1);
----
1 2
2 4
3 7
4 11
5 16