Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 37 additions & 11 deletions datafusion/functions-nested/src/flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use crate::utils::make_scalar_function;
use arrow::array::{Array, ArrayRef, GenericListArray, OffsetSizeTrait};
use arrow::buffer::OffsetBuffer;
use arrow::datatypes::ArrowNativeType;
use arrow::datatypes::{
DataType,
DataType::{FixedSizeList, LargeList, List, Null},
Expand Down Expand Up @@ -95,7 +96,10 @@ impl ScalarUDFImpl for Flatten {
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
let data_type = match &arg_types[0] {
List(field) => match field.data_type() {
List(field) | FixedSizeList(field, _) => List(Arc::clone(field)),
List(field) | FixedSizeList(field, _) => {
List(Arc::clone(field))
}
LargeList(field) => LargeList(Arc::clone(field)),
_ => arg_types[0].clone(),
},
LargeList(field) => match field.data_type() {
Expand Down Expand Up @@ -154,7 +158,16 @@ pub fn flatten_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
Ok(Arc::new(flattened_array) as ArrayRef)
}
LargeList(_) => {
exec_err!("flatten does not support type '{:?}'", array.data_type())?
let (inner_field, inner_offsets, inner_values, _) =
as_large_list_array(&values)?.clone().into_parts();
let offsets = keep_offsets_i64(inner_offsets, offsets);
let flattened_array = GenericListArray::<i64>::new(
inner_field,
offsets,
inner_values,
nulls,
);
Ok(Arc::new(flattened_array) as ArrayRef)
}
_ => Ok(Arc::clone(array) as ArrayRef),
}
Expand All @@ -179,7 +192,7 @@ pub fn flatten_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
Ok(Arc::new(flattened_array) as ArrayRef)
}
LargeList(_) => {
let (inner_field, inner_offsets, inner_values, nulls) =
let (inner_field, inner_offsets, inner_values, _) = // _ instead of nulls?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here flattened_array was generated using the inner nulls instead of the outer ones. I figured it should follow the same format and use nulls from the outer array.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is interesting; so this might be a bug in current behaviour? Do you think you can mock up a test where it fails on main without this fix?

as_large_list_array(&values)?.clone().into_parts();
let offsets = get_offsets_for_flatten::<i64>(inner_offsets, offsets);
let flattened_array = GenericListArray::<i64>::new(
Expand All @@ -203,11 +216,11 @@ pub fn flatten_inner(args: &[ArrayRef]) -> Result<ArrayRef> {

// Create new offsets that are equivalent to `flatten` the array.
fn get_offsets_for_flatten<O: OffsetSizeTrait>(
offsets: OffsetBuffer<O>,
indexes: OffsetBuffer<O>,
inner_offsets: OffsetBuffer<O>,
outer_offsets: OffsetBuffer<O>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little naming change because the previous one was confusing.

) -> OffsetBuffer<O> {
let buffer = offsets.into_inner();
let offsets: Vec<O> = indexes
let buffer = inner_offsets.into_inner();
let offsets: Vec<O> = outer_offsets
.iter()
.map(|i| buffer[i.to_usize().unwrap()])
.collect();
Expand All @@ -216,17 +229,30 @@ fn get_offsets_for_flatten<O: OffsetSizeTrait>(

// Create new large offsets that are equivalent to `flatten` the array.
fn get_large_offsets_for_flatten<O: OffsetSizeTrait, P: OffsetSizeTrait>(
offsets: OffsetBuffer<O>,
indexes: OffsetBuffer<P>,
inner_offsets: OffsetBuffer<O>,
outer_offsets: OffsetBuffer<P>,
) -> OffsetBuffer<i64> {
let buffer = offsets.into_inner();
let offsets: Vec<i64> = indexes
let buffer = inner_offsets.into_inner();
let offsets: Vec<i64> = outer_offsets
.iter()
.map(|i| buffer[i.to_usize().unwrap()].to_i64().unwrap())
.collect();
OffsetBuffer::new(offsets.into())
}

// In case the conversion fails we convert the outer offsets into i64
fn keep_offsets_i64(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function takes the outer i32 and inner i64 offsets and keeps the inner i64

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function should be renamed (and docstring adjusted) now? keep_offsets_i64 is a bit confusing to read for me

inner_offsets: OffsetBuffer<i64>,
outer_offsets: OffsetBuffer<i32>,
) -> OffsetBuffer<i64> {
let buffer = inner_offsets.into_inner();
let offsets: Vec<i64> = outer_offsets
.iter()
.map(|i| buffer[i.to_usize().unwrap()])
.collect();
OffsetBuffer::new(offsets.into())
}

fn cast_fsl_to_list(array: ArrayRef) -> Result<ArrayRef> {
match array.data_type() {
FixedSizeList(field, _) => {
Expand Down
6 changes: 6 additions & 0 deletions datafusion/sqllogictest/test_files/array.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7737,6 +7737,12 @@ select flatten(arrow_cast(make_array(1, 2, 1, 3, 2), 'FixedSizeList(5, Int64)'))
----
[1, 2, 1, 3, 2] [1, 2, 3, NULL, 4, NULL, 5] [[1.1], [2.2], [3.3], [4.4]]

query ??
select flatten(arrow_cast(make_array([1], [2, 3], [null], make_array(4, null, 5)), 'FixedSizeList(4, LargeList(Int64))')),
flatten(arrow_cast(make_array([[1.1], [2.2]], [[3.3], [4.4]]), 'List(LargeList(FixedSizeList(1, Float64)))'));
----
[1, 2, 3, NULL, 4, NULL, 5] [[1.1], [2.2], [3.3], [4.4]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think should also add a check for the output type via arrow_typeof() to ensure we are indeed getting a large list


# flatten with column values
query ????
select flatten(column1),
Expand Down
Loading