Skip to content

Commit 1f06308

Browse files
Omega359alamb
andauthored
Support timestamps and steps of less than a day for range/generate_series (#12400)
* Support timestamps and steps of less than a day for timestamps. * Updated docs for range and generate_series to add additional info wrt timestamp support. * Updates based on code review. * Cleanup error message Co-authored-by: Andrew Lamb <[email protected]> --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 5b6b404 commit 1f06308

File tree

3 files changed

+322
-30
lines changed

3 files changed

+322
-30
lines changed

datafusion/functions-nested/src/range.rs

Lines changed: 165 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,31 @@
1818
//! [`ScalarUDFImpl`] definitions for range and gen_series functions.
1919
2020
use crate::utils::make_scalar_function;
21-
use arrow::array::{Array, ArrayRef, Date32Builder, Int64Array, ListArray, ListBuilder};
21+
use arrow::array::{Array, ArrayRef, Int64Array, ListArray, ListBuilder};
2222
use arrow::datatypes::{DataType, Field};
23-
use arrow_array::types::{Date32Type, IntervalMonthDayNanoType};
24-
use arrow_array::NullArray;
23+
use arrow_array::builder::{Date32Builder, TimestampNanosecondBuilder};
24+
use arrow_array::temporal_conversions::as_datetime_with_timezone;
25+
use arrow_array::timezone::Tz;
26+
use arrow_array::types::{
27+
Date32Type, IntervalMonthDayNanoType, TimestampNanosecondType as TSNT,
28+
};
29+
use arrow_array::{NullArray, TimestampNanosecondArray};
2530
use arrow_buffer::{BooleanBufferBuilder, NullBuffer, OffsetBuffer};
2631
use arrow_schema::DataType::*;
2732
use arrow_schema::IntervalUnit::MonthDayNano;
28-
use datafusion_common::cast::{as_date32_array, as_int64_array, as_interval_mdn_array};
29-
use datafusion_common::{exec_err, not_impl_datafusion_err, Result};
33+
use arrow_schema::TimeUnit::Nanosecond;
34+
use datafusion_common::cast::{
35+
as_date32_array, as_int64_array, as_interval_mdn_array, as_timestamp_nanosecond_array,
36+
};
37+
use datafusion_common::{
38+
exec_datafusion_err, exec_err, internal_err, not_impl_datafusion_err, Result,
39+
};
3040
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
3141
use itertools::Itertools;
3242
use std::any::Any;
43+
use std::cmp::Ordering;
3344
use std::iter::from_fn;
45+
use std::str::FromStr;
3446
use std::sync::Arc;
3547

3648
make_udf_expr_and_func!(
@@ -78,7 +90,7 @@ impl ScalarUDFImpl for Range {
7890
UInt16 => Ok(Int64),
7991
UInt32 => Ok(Int64),
8092
UInt64 => Ok(Int64),
81-
Timestamp(_, _) => Ok(Date32),
93+
Timestamp(_, tz) => Ok(Timestamp(Nanosecond, tz.clone())),
8294
Date32 => Ok(Date32),
8395
Date64 => Ok(Date32),
8496
Utf8 => Ok(Date32),
@@ -109,8 +121,11 @@ impl ScalarUDFImpl for Range {
109121
match args[0].data_type() {
110122
Int64 => make_scalar_function(|args| gen_range_inner(args, false))(args),
111123
Date32 => make_scalar_function(|args| gen_range_date(args, false))(args),
112-
_ => {
113-
exec_err!("unsupported type for range")
124+
Timestamp(_, _) => {
125+
make_scalar_function(|args| gen_range_timestamp(args, false))(args)
126+
}
127+
dt => {
128+
exec_err!("unsupported type for RANGE. Expected Int64, Date32 or Timestamp, got: {dt}")
114129
}
115130
}
116131
}
@@ -152,8 +167,8 @@ impl ScalarUDFImpl for GenSeries {
152167
&self.signature
153168
}
154169

155-
fn coerce_types(&self, _arg_types: &[DataType]) -> Result<Vec<DataType>> {
156-
_arg_types
170+
fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
171+
arg_types
157172
.iter()
158173
.map(|arg_type| match arg_type {
159174
Null => Ok(Null),
@@ -165,7 +180,7 @@ impl ScalarUDFImpl for GenSeries {
165180
UInt16 => Ok(Int64),
166181
UInt32 => Ok(Int64),
167182
UInt64 => Ok(Int64),
168-
Timestamp(_, _) => Ok(Date32),
183+
Timestamp(_, tz) => Ok(Timestamp(Nanosecond, tz.clone())),
169184
Date32 => Ok(Date32),
170185
Date64 => Ok(Date32),
171186
Utf8 => Ok(Date32),
@@ -196,9 +211,12 @@ impl ScalarUDFImpl for GenSeries {
196211
match args[0].data_type() {
197212
Int64 => make_scalar_function(|args| gen_range_inner(args, true))(args),
198213
Date32 => make_scalar_function(|args| gen_range_date(args, true))(args),
214+
Timestamp(_, _) => {
215+
make_scalar_function(|args| gen_range_timestamp(args, true))(args)
216+
}
199217
dt => {
200218
exec_err!(
201-
"unsupported type for gen_series. Expected Int64 or Date32, got: {}",
219+
"unsupported type for GENERATE_SERIES. Expected Int64, Date32 or Timestamp, got: {}",
202220
dt
203221
)
204222
}
@@ -334,7 +352,7 @@ fn gen_range_iter(
334352
}
335353
}
336354

337-
fn gen_range_date(args: &[ArrayRef], include_upper: bool) -> Result<ArrayRef> {
355+
fn gen_range_date(args: &[ArrayRef], include_upper_bound: bool) -> Result<ArrayRef> {
338356
if args.len() != 3 {
339357
return exec_err!("arguments length does not match");
340358
}
@@ -372,7 +390,7 @@ fn gen_range_date(args: &[ArrayRef], include_upper: bool) -> Result<ArrayRef> {
372390
}
373391

374392
let neg = months < 0 || days < 0;
375-
if !include_upper {
393+
if !include_upper_bound {
376394
stop = Date32Type::subtract_month_day_nano(stop, step);
377395
}
378396
let mut new_date = start;
@@ -394,3 +412,136 @@ fn gen_range_date(args: &[ArrayRef], include_upper: bool) -> Result<ArrayRef> {
394412

395413
Ok(arr)
396414
}
415+
416+
fn gen_range_timestamp(args: &[ArrayRef], include_upper_bound: bool) -> Result<ArrayRef> {
417+
if args.len() != 3 {
418+
return exec_err!(
419+
"Arguments length must be 3 for {}",
420+
if include_upper_bound {
421+
"GENERATE_SERIES"
422+
} else {
423+
"RANGE"
424+
}
425+
);
426+
}
427+
428+
// coerce_types fn should coerce all types to Timestamp(Nanosecond, tz)
429+
let (start_arr, start_tz_opt) = cast_timestamp_arg(&args[0], include_upper_bound)?;
430+
let (stop_arr, stop_tz_opt) = cast_timestamp_arg(&args[1], include_upper_bound)?;
431+
let step_arr = as_interval_mdn_array(&args[2])?;
432+
let start_tz = parse_tz(start_tz_opt)?;
433+
let stop_tz = parse_tz(stop_tz_opt)?;
434+
435+
// values are timestamps
436+
let values_builder = start_tz_opt
437+
.clone()
438+
.map_or_else(TimestampNanosecondBuilder::new, |start_tz_str| {
439+
TimestampNanosecondBuilder::new().with_timezone(start_tz_str)
440+
});
441+
let mut list_builder = ListBuilder::new(values_builder);
442+
443+
for idx in 0..start_arr.len() {
444+
if start_arr.is_null(idx) || stop_arr.is_null(idx) || step_arr.is_null(idx) {
445+
list_builder.append_null();
446+
continue;
447+
}
448+
449+
let start = start_arr.value(idx);
450+
let stop = stop_arr.value(idx);
451+
let step = step_arr.value(idx);
452+
453+
let (months, days, ns) = IntervalMonthDayNanoType::to_parts(step);
454+
if months == 0 && days == 0 && ns == 0 {
455+
return exec_err!(
456+
"Interval argument to {} must not be 0",
457+
if include_upper_bound {
458+
"GENERATE_SERIES"
459+
} else {
460+
"RANGE"
461+
}
462+
);
463+
}
464+
465+
let neg = TSNT::add_month_day_nano(start, step, start_tz)
466+
.ok_or(exec_datafusion_err!(
467+
"Cannot generate timestamp range where start + step overflows"
468+
))?
469+
.cmp(&start)
470+
== Ordering::Less;
471+
472+
let stop_dt = as_datetime_with_timezone::<TSNT>(stop, stop_tz).ok_or(
473+
exec_datafusion_err!(
474+
"Cannot generate timestamp for stop: {}: {:?}",
475+
stop,
476+
stop_tz
477+
),
478+
)?;
479+
480+
let mut current = start;
481+
let mut current_dt = as_datetime_with_timezone::<TSNT>(current, start_tz).ok_or(
482+
exec_datafusion_err!(
483+
"Cannot generate timestamp for start: {}: {:?}",
484+
current,
485+
start_tz
486+
),
487+
)?;
488+
489+
let values = from_fn(|| {
490+
if (include_upper_bound
491+
&& ((neg && current_dt < stop_dt) || (!neg && current_dt > stop_dt)))
492+
|| (!include_upper_bound
493+
&& ((neg && current_dt <= stop_dt)
494+
|| (!neg && current_dt >= stop_dt)))
495+
{
496+
return None;
497+
}
498+
499+
let prev_current = current;
500+
501+
if let Some(ts) = TSNT::add_month_day_nano(current, step, start_tz) {
502+
current = ts;
503+
current_dt = as_datetime_with_timezone::<TSNT>(current, start_tz)?;
504+
505+
Some(Some(prev_current))
506+
} else {
507+
// we failed to parse the timestamp here so terminate the series
508+
None
509+
}
510+
});
511+
512+
list_builder.append_value(values);
513+
}
514+
515+
let arr = Arc::new(list_builder.finish());
516+
517+
Ok(arr)
518+
}
519+
520+
fn cast_timestamp_arg(
521+
arg: &ArrayRef,
522+
include_upper: bool,
523+
) -> Result<(&TimestampNanosecondArray, &Option<Arc<str>>)> {
524+
match arg.data_type() {
525+
Timestamp(Nanosecond, tz_opt) => {
526+
Ok((as_timestamp_nanosecond_array(arg)?, tz_opt))
527+
}
528+
_ => {
529+
internal_err!(
530+
"Unexpected argument type for {} : {}",
531+
if include_upper {
532+
"GENERATE_SERIES"
533+
} else {
534+
"RANGE"
535+
},
536+
arg.data_type()
537+
)
538+
}
539+
}
540+
}
541+
542+
fn parse_tz(tz: &Option<Arc<str>>) -> Result<Tz> {
543+
let tz = tz.as_ref().map_or_else(|| "+00", |s| s);
544+
545+
Tz::from_str(tz)
546+
.map_err(|op| exec_datafusion_err!("failed to parse timezone {tz}: {:?}", op))
547+
}

0 commit comments

Comments
 (0)