Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
6d66621
Adds new crate for window functions
jcsherin Aug 8, 2024
d49e4b8
Moves `row_number` to window functions crate
jcsherin Aug 8, 2024
4886ebf
Fixes build errors
jcsherin Aug 8, 2024
a8692e0
Regenerates protobuf
jcsherin Aug 8, 2024
2dcc41f
Makes `row_number` no-op temporarily
jcsherin Aug 8, 2024
772bd6d
Minor: fixes formatting
jcsherin Aug 8, 2024
3286d4b
Implements `WindowUDF` for `row_number`
jcsherin Aug 9, 2024
fa03665
Minor: fixes formatting
jcsherin Aug 9, 2024
ea5dc67
Adds singleton instance of UDWF: `row_number`
jcsherin Aug 9, 2024
b8798db
Adds partition evaluator
jcsherin Aug 9, 2024
706b36d
Registers default window functions
jcsherin Aug 9, 2024
ec0c4ed
Implements `evaluate_all`
jcsherin Aug 10, 2024
712010d
Fixes: allow non-uppercase globals
jcsherin Aug 10, 2024
5a28098
Minor: prefix underscore for unused variable
jcsherin Aug 10, 2024
2950e7f
Minor: fixes formatting
jcsherin Aug 10, 2024
0785a54
Uses `row_number_udwf`
jcsherin Aug 10, 2024
3a9b8d6
Fixes: unparser test for `row_number`
jcsherin Aug 10, 2024
bb9ec52
Uses row number to represent functional dependency
jcsherin Aug 13, 2024
8e5588e
Minor: fixes formatting
jcsherin Aug 13, 2024
152c447
Removes `row_number` from case-insensitive name test
jcsherin Aug 13, 2024
b901840
Deletes wrapper for `row_number` window expression
jcsherin Aug 13, 2024
6a5f4fe
Merge branch 'main' into window-udf-row-number
jcsherin Aug 13, 2024
666ea21
Fixes: lowercase name in error statement
jcsherin Aug 13, 2024
29563b4
Fixes: `row_number` fields are not nullable
jcsherin Aug 13, 2024
1920dd1
Fixes: lowercase name in explain output
jcsherin Aug 13, 2024
7950bb1
Updates Cargo.lock
jcsherin Aug 14, 2024
5b4ce46
Fixes: lowercase name in explain output
jcsherin Aug 14, 2024
4782eee
Adds support for result ordering
jcsherin Aug 14, 2024
1c9fa23
Merge branch 'main' into window-udf-row-number
jcsherin Aug 14, 2024
c3c923d
Minor: add newline between methods
jcsherin Aug 14, 2024
c7e1836
Fixes: re-export crate name in doc comments
jcsherin Aug 14, 2024
2c3725f
Adds doc comment for `WindowUDFImpl::nullable`
jcsherin Aug 14, 2024
49659d1
Minor: renames variable
jcsherin Aug 14, 2024
8f33840
Minor: update doc comments
jcsherin Aug 14, 2024
3dd236b
Deletes code
jcsherin Aug 14, 2024
8400cf9
Minor: update doc comments
jcsherin Aug 14, 2024
f1c9d15
Minor: adds period
jcsherin Aug 14, 2024
3e2da97
Adds doc comment for `row_number` window UDF
jcsherin Aug 15, 2024
a3ef37b
Adds fluent API for creating `row_number` expression
jcsherin Aug 15, 2024
bf516d9
Minor: removes unnecessary path prefix
jcsherin Aug 15, 2024
ac902ef
Adds roundtrip logical plan test case
jcsherin Aug 15, 2024
33a39b7
Merge branch 'main' into window-udf-row-number
jcsherin Aug 16, 2024
c6d39d0
Updates unit tests for `row_number`
jcsherin Aug 16, 2024
e3f2196
Deletes code
jcsherin Aug 16, 2024
2162649
Merge branch 'main' into window-udf-row-number
jcsherin Aug 16, 2024
157bc5f
Minor: copy edit doc comments
jcsherin Aug 16, 2024
d2fed86
Minor: deletes comment
jcsherin Aug 16, 2024
f54c07c
Minor: copy edits udwf doc comments
jcsherin Aug 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ members = [
"datafusion/functions-aggregate",
"datafusion/functions-aggregate-common",
"datafusion/functions-nested",
"datafusion/functions-window",
"datafusion/optimizer",
"datafusion/physical-expr",
"datafusion/physical-expr-common",
Expand Down Expand Up @@ -102,6 +103,7 @@ datafusion-functions = { path = "datafusion/functions", version = "41.0.0" }
datafusion-functions-aggregate = { path = "datafusion/functions-aggregate", version = "41.0.0" }
datafusion-functions-aggregate-common = { path = "datafusion/functions-aggregate-common", version = "41.0.0" }
datafusion-functions-nested = { path = "datafusion/functions-nested", version = "41.0.0" }
datafusion-functions-window = { path = "datafusion/functions-window", version = "41.0.0" }
datafusion-optimizer = { path = "datafusion/optimizer", version = "41.0.0", default-features = false }
datafusion-physical-expr = { path = "datafusion/physical-expr", version = "41.0.0", default-features = false }
datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", version = "41.0.0", default-features = false }
Expand Down
11 changes: 11 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ datafusion-expr = { workspace = true }
datafusion-functions = { workspace = true }
datafusion-functions-aggregate = { workspace = true }
datafusion-functions-nested = { workspace = true, optional = true }
datafusion-functions-window = { workspace = true }
datafusion-optimizer = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,7 @@ impl SessionStateBuilder {
self.scalar_functions = Some(SessionStateDefaults::default_scalar_functions());
self.aggregate_functions =
Some(SessionStateDefaults::default_aggregate_functions());
self.window_functions = Some(SessionStateDefaults::default_window_functions());
self
}

Expand Down
9 changes: 7 additions & 2 deletions datafusion/core/src/execution/session_state_defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ use crate::datasource::provider::DefaultTableFactory;
use crate::execution::context::SessionState;
#[cfg(feature = "nested_expressions")]
use crate::functions_nested;
use crate::{functions, functions_aggregate};
use crate::{functions, functions_aggregate, functions_window};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_expr::planner::ExprPlanner;
use datafusion_expr::{AggregateUDF, ScalarUDF};
use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
use std::collections::HashMap;
use std::sync::Arc;
use url::Url;
Expand Down Expand Up @@ -112,6 +112,11 @@ impl SessionStateDefaults {
functions_aggregate::all_default_aggregate_functions()
}

/// returns the list of default [`WindowUDF']'s
pub fn default_window_functions() -> Vec<Arc<WindowUDF>> {
functions_window::all_default_window_functions()
}

/// returns the list of default [`FileFormatFactory']'s
pub fn default_file_formats() -> Vec<Arc<dyn FileFormatFactory>> {
let file_formats: Vec<Arc<dyn FileFormatFactory>> = vec![
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,11 @@ pub mod functions_aggregate {
pub use datafusion_functions_aggregate::*;
}

/// re-export of [`datafusion_functions_window`] crate
pub mod functions_window {
pub use datafusion_functions_window::*;
}

#[cfg(test)]
pub mod test;
pub mod test_util;
Expand Down
13 changes: 5 additions & 8 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use datafusion_physical_expr::expressions::{cast, col, lit};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use test_utils::add_empty_batches;

use datafusion::functions_window::row_number::row_number_udwf;
use hashbrown::HashMap;
use rand::distributions::Alphanumeric;
use rand::rngs::StdRng;
Expand Down Expand Up @@ -180,12 +181,10 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
// ROWS BETWEEN UNBOUNDED PRECEDING AND <end_bound> PRECEDING/FOLLOWING
// )
(
// Window function
WindowFunctionDefinition::BuiltInWindowFunction(
BuiltInWindowFunction::RowNumber,
),
// user-defined window function
WindowFunctionDefinition::WindowUDF(row_number_udwf()),
// its name
"ROW_NUMBER",
"row_number",
// no argument
vec![],
// Expected causality, for None cases causality will be determined from window frame boundaries
Expand Down Expand Up @@ -377,9 +376,7 @@ fn get_random_function(
window_fn_map.insert(
"row_number",
(
WindowFunctionDefinition::BuiltInWindowFunction(
BuiltInWindowFunction::RowNumber,
),
WindowFunctionDefinition::WindowUDF(row_number_udwf()),
vec![],
),
);
Expand Down
10 changes: 2 additions & 8 deletions datafusion/expr/src/built_in_window_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ impl fmt::Display for BuiltInWindowFunction {
/// [window function]: https://en.wikipedia.org/wiki/Window_function_(SQL)
#[derive(Debug, Clone, PartialEq, Eq, Hash, EnumIter)]
pub enum BuiltInWindowFunction {
/// number of the current row within its partition, counting from 1
RowNumber,
/// rank of the current row with gaps; same as row_number of its first peer
Rank,
/// rank of the current row without gaps; this function counts peer groups
Expand Down Expand Up @@ -74,7 +72,6 @@ impl BuiltInWindowFunction {
pub fn name(&self) -> &str {
use BuiltInWindowFunction::*;
match self {
RowNumber => "ROW_NUMBER",
Rank => "RANK",
DenseRank => "DENSE_RANK",
PercentRank => "PERCENT_RANK",
Expand All @@ -93,7 +90,6 @@ impl FromStr for BuiltInWindowFunction {
type Err = DataFusionError;
fn from_str(name: &str) -> Result<BuiltInWindowFunction> {
Ok(match name.to_uppercase().as_str() {
"ROW_NUMBER" => BuiltInWindowFunction::RowNumber,
"RANK" => BuiltInWindowFunction::Rank,
"DENSE_RANK" => BuiltInWindowFunction::DenseRank,
"PERCENT_RANK" => BuiltInWindowFunction::PercentRank,
Expand Down Expand Up @@ -131,8 +127,7 @@ impl BuiltInWindowFunction {
})?;

match self {
BuiltInWindowFunction::RowNumber
| BuiltInWindowFunction::Rank
BuiltInWindowFunction::Rank
| BuiltInWindowFunction::DenseRank
| BuiltInWindowFunction::Ntile => Ok(DataType::UInt64),
BuiltInWindowFunction::PercentRank | BuiltInWindowFunction::CumeDist => {
Expand All @@ -150,8 +145,7 @@ impl BuiltInWindowFunction {
pub fn signature(&self) -> Signature {
// note: the physical expression must accept the type returned by this function or the execution panics.
match self {
BuiltInWindowFunction::RowNumber
| BuiltInWindowFunction::Rank
BuiltInWindowFunction::Rank
| BuiltInWindowFunction::DenseRank
| BuiltInWindowFunction::PercentRank
| BuiltInWindowFunction::CumeDist => Signature::any(0, Volatility::Immutable),
Expand Down
1 change: 0 additions & 1 deletion datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2896,7 +2896,6 @@ mod test {
#[test]
fn test_window_function_case_insensitive() -> Result<()> {
let names = vec![
"row_number",
"rank",
"dense_rank",
"percent_rank",
Expand Down
14 changes: 5 additions & 9 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ use crate::utils::{
split_conjunction,
};
use crate::{
build_join_schema, expr_vec_fmt, BinaryExpr, BuiltInWindowFunction,
CreateMemoryTable, CreateView, Expr, ExprSchemable, LogicalPlanBuilder, Operator,
TableProviderFilterPushDown, TableSource, WindowFunctionDefinition,
build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Expr,
ExprSchemable, LogicalPlanBuilder, Operator, TableProviderFilterPushDown,
TableSource, WindowFunctionDefinition,
};

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
Expand Down Expand Up @@ -2214,18 +2214,14 @@ impl Window {
.enumerate()
.filter_map(|(idx, expr)| {
if let Expr::WindowFunction(WindowFunction {
// Function is ROW_NUMBER
fun:
WindowFunctionDefinition::BuiltInWindowFunction(
BuiltInWindowFunction::RowNumber,
),
fun: WindowFunctionDefinition::WindowUDF(udwf),
partition_by,
..
}) = expr
{
// When there is no PARTITION BY, row number will be unique
// across the entire table.
if partition_by.is_empty() {
if udwf.name() == "row_number" && partition_by.is_empty() {
return Some(idx + input_len);
}
}
Expand Down
34 changes: 34 additions & 0 deletions datafusion/expr/src/udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! [`WindowUDF`]: User Defined Window Functions

use arrow::compute::SortOptions;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::{
any::Any,
Expand Down Expand Up @@ -176,6 +177,21 @@ impl WindowUDF {
pub fn partition_evaluator_factory(&self) -> Result<Box<dyn PartitionEvaluator>> {
self.inner.partition_evaluator()
}

/// Returns if column values are nullable for this window function.
///
/// See [`WindowUDFImpl::nullable`] for more details.
pub fn nullable(&self) -> bool {
self.inner.nullable()
}

/// Returns custom result ordering introduced by this window function
/// which is used to update ordering equivalences.
///
/// See [`WindowUDFImpl::sort_options`] for more details.
pub fn sort_options(&self) -> Option<SortOptions> {
self.inner.sort_options()
}
}

impl<F> From<F> for WindowUDF
Expand Down Expand Up @@ -319,6 +335,24 @@ pub trait WindowUDFImpl: Debug + Send + Sync {
self.signature().hash(hasher);
hasher.finish()
}

/// Allows customizing nullable of column for this window UDF.
///
/// By default, the final result of evaluating the window UDF is
/// allowed to have null values. But if that is not the case then
/// it can be customized in the window UDF implementation.
fn nullable(&self) -> bool {
true
}

/// Allows the window UDF to define a custom result ordering.
///
/// By default, a window UDF doesn't introduce an ordering.
/// But when specified by a window UDF this is used to update
/// ordering equivalences.
fn sort_options(&self) -> Option<SortOptions> {
None
}
}

/// WindowUDF that adds an alias to the underlying function. It is better to
Expand Down
8 changes: 0 additions & 8 deletions datafusion/expr/src/window_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,6 @@ use datafusion_common::ScalarValue;

use crate::{expr::WindowFunction, BuiltInWindowFunction, Expr, Literal};

/// Create an expression to represent the `row_number` window function
pub fn row_number() -> Expr {
Expr::WindowFunction(WindowFunction::new(
BuiltInWindowFunction::RowNumber,
vec![],
))
}

/// Create an expression to represent the `rank` window function
pub fn rank() -> Expr {
Expr::WindowFunction(WindowFunction::new(BuiltInWindowFunction::Rank, vec![]))
Expand Down
47 changes: 47 additions & 0 deletions datafusion/functions-window/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "datafusion-functions-window"
description = "Window function packages for the DataFusion query engine"
keywords = ["datafusion", "logical", "plan", "expressions"]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any suggestions on what keywords to place here?

readme = "README.md"
version = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
repository = { workspace = true }
license = { workspace = true }
authors = { workspace = true }
rust-version = { workspace = true }

[lints]
workspace = true

[lib]
name = "datafusion_functions_window"
path = "src/lib.rs"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
datafusion-common = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
log = { workspace = true }

[dev-dependencies]
arrow = { workspace = true }
26 changes: 26 additions & 0 deletions datafusion/functions-window/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# DataFusion Window Function Library

[DataFusion][df] is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format.

This crate contains user-defined window functions.

[df]: https://crates.io/crates/datafusion
Loading