Skip to content
20 changes: 17 additions & 3 deletions arrow-data/src/transform/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,10 @@ impl<'a> MutableArrayData<'a> {
array_capacity = *capacity;
new_buffers(data_type, *capacity)
}
(DataType::List(_) | DataType::LargeList(_), Capacities::List(capacity, _)) => {
(
DataType::List(_) | DataType::LargeList(_) | DataType::FixedSizeList(_, _),
Capacities::List(capacity, _),
) => {
array_capacity = *capacity;
new_buffers(data_type, *capacity)
}
Expand Down Expand Up @@ -501,12 +504,23 @@ impl<'a> MutableArrayData<'a> {
MutableArrayData::new(value_child, use_nulls, array_capacity),
]
}
DataType::FixedSizeList(_, _) => {
DataType::FixedSizeList(_, size) => {
let children = arrays
.iter()
.map(|array| &array.child_data()[0])
.collect::<Vec<_>>();
vec![MutableArrayData::new(children, use_nulls, array_capacity)]
let capacities =
if let Capacities::List(capacity, ref child_capacities) = capacities {
child_capacities
.clone()
.map(|c| *c)
.unwrap_or(Capacities::Array(capacity * *size as usize))
} else {
Capacities::Array(array_capacity * *size as usize)
};
vec![MutableArrayData::with_capacities(
children, use_nulls, capacities,
)]
}
DataType::Union(fields, _) => (0..fields.len())
.map(|i| {
Expand Down
77 changes: 65 additions & 12 deletions arrow-select/src/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,22 @@ fn binary_capacity<T: ByteArrayType>(arrays: &[&dyn Array]) -> Capacities {
Capacities::Binary(item_capacity, Some(bytes_capacity))
}

fn fixed_size_list_capacity(arrays: &[&dyn Array], data_type: &DataType) -> Capacities {
if let DataType::FixedSizeList(f, _) = data_type {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be returning Capacities::Array() if the children are primitive / fixed size? I ask because I see it handled above and it seems like it would avoid some allocations and iteration over children.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, that's fair we can avoid at least one iteration and a Box::new() if only recursively call get_capacity() when we know that it will have some special handling other than Capacities::Array(arrays.iter().map(|a| a.len()).sum()).

b30566a

let item_capacity = arrays.iter().map(|a| a.len()).sum();
let values: Vec<&dyn arrow_array::Array> = arrays
.iter()
.map(|a| a.as_fixed_size_list().values().as_ref())
.collect();
Capacities::List(
item_capacity,
Some(Box::new(get_capacity(&values, f.data_type()))),
)
} else {
unreachable!("illegal data type for fixed size list")
}
}

fn concat_dictionaries<K: ArrowDictionaryKeyType>(
arrays: &[&dyn Array],
) -> Result<ArrayRef, ArrowError> {
Expand Down Expand Up @@ -107,6 +123,17 @@ macro_rules! dict_helper {
};
}

fn get_capacity(arrays: &[&dyn Array], data_type: &DataType) -> Capacities {
match data_type {
DataType::Utf8 => binary_capacity::<Utf8Type>(arrays),
DataType::LargeUtf8 => binary_capacity::<LargeUtf8Type>(arrays),
DataType::Binary => binary_capacity::<BinaryType>(arrays),
DataType::LargeBinary => binary_capacity::<LargeBinaryType>(arrays),
DataType::FixedSizeList(_, _) => fixed_size_list_capacity(arrays, data_type),
_ => Capacities::Array(arrays.iter().map(|a| a.len()).sum()),
}
}

/// Concatenate multiple [Array] of the same type into a single [ArrayRef].
pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
if arrays.is_empty() {
Expand All @@ -124,20 +151,15 @@ pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
"It is not possible to concatenate arrays of different data types.".to_string(),
));
}

let capacity = match d {
DataType::Utf8 => binary_capacity::<Utf8Type>(arrays),
DataType::LargeUtf8 => binary_capacity::<LargeUtf8Type>(arrays),
DataType::Binary => binary_capacity::<BinaryType>(arrays),
DataType::LargeBinary => binary_capacity::<LargeBinaryType>(arrays),
DataType::Dictionary(k, _) => downcast_integer! {
if let DataType::Dictionary(k, _) = d {
downcast_integer! {
k.as_ref() => (dict_helper, arrays),
_ => unreachable!("illegal dictionary key type {k}")
},
_ => Capacities::Array(arrays.iter().map(|a| a.len()).sum()),
};

concat_fallback(arrays, capacity)
};
} else {
let capacity = get_capacity(arrays, d);
concat_fallback(arrays, capacity)
}
}

/// Concatenates arrays using MutableArrayData
Expand Down Expand Up @@ -373,6 +395,37 @@ mod tests {
assert_eq!(array_result.as_ref(), &array_expected as &dyn Array);
}

#[test]
fn test_concat_primitive_fixed_size_list_arrays() {
let list1 = vec![
Some(vec![Some(-1), None]),
None,
Some(vec![Some(10), Some(20)]),
];
let list1_array =
FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(list1.clone(), 2);

let list2 = vec![
None,
Some(vec![Some(100), None]),
Some(vec![Some(102), Some(103)]),
];
let list2_array =
FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(list2.clone(), 2);

let list3 = vec![Some(vec![Some(1000), Some(1001)])];
let list3_array =
FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(list3.clone(), 2);

let array_result = concat(&[&list1_array, &list2_array, &list3_array]).unwrap();

let expected = list1.into_iter().chain(list2).chain(list3);
let array_expected =
FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(expected, 2);

assert_eq!(array_result.as_ref(), &array_expected as &dyn Array);
}

#[test]
fn test_concat_struct_arrays() {
let field = Arc::new(Field::new("field", DataType::Int64, true));
Expand Down
20 changes: 20 additions & 0 deletions arrow/benches/concatenate_kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#[macro_use]
extern crate criterion;
use std::sync::Arc;

use criterion::Criterion;

extern crate arrow;
Expand Down Expand Up @@ -82,6 +84,24 @@ fn add_benchmark(c: &mut Criterion) {
c.bench_function("concat str nulls 1024", |b| {
b.iter(|| bench_concat(&v1, &v2))
});

let v1 = FixedSizeListArray::try_new(
Arc::new(Field::new("item", DataType::Int32, true)),
1024,
Arc::new(create_primitive_array::<Int32Type>(1024 * 1024, 0.0)),
None,
)
.unwrap();
let v2 = FixedSizeListArray::try_new(
Arc::new(Field::new("item", DataType::Int32, true)),
1024,
Arc::new(create_primitive_array::<Int32Type>(1024 * 1024, 0.0)),
None,
)
.unwrap();
c.bench_function("concat fixed size lists", |b| {
b.iter(|| bench_concat(&v1, &v2))
});
}

criterion_group!(benches, add_benchmark);
Expand Down
92 changes: 41 additions & 51 deletions arrow/tests/array_transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

use arrow::array::{
Array, ArrayRef, BooleanArray, Decimal128Array, DictionaryArray, FixedSizeBinaryArray,
Int16Array, Int32Array, Int64Array, Int64Builder, ListArray, ListBuilder, MapBuilder,
NullArray, StringArray, StringBuilder, StringDictionaryBuilder, StructArray, UInt8Array,
UnionArray,
FixedSizeListBuilder, Int16Array, Int32Array, Int64Array, Int64Builder, ListArray, ListBuilder,
MapBuilder, NullArray, StringArray, StringBuilder, StringDictionaryBuilder, StructArray,
UInt16Array, UInt16Builder, UInt8Array, UnionArray,
};
use arrow::datatypes::Int16Type;
use arrow_array::StringViewArray;
Expand Down Expand Up @@ -1074,43 +1074,42 @@ fn test_mixed_types() {
MutableArrayData::new(vec![&a, &b], false, 4);
}

/*
// this is an old test used on a meanwhile removed dead code
// that is still useful when `MutableArrayData` supports fixed-size lists.
#[test]
fn test_fixed_size_list_append() -> Result<()> {
let int_builder = UInt16Builder::new(64);
fn test_fixed_size_list_append() {
let int_builder = UInt16Builder::with_capacity(64);
let mut builder = FixedSizeListBuilder::<UInt16Builder>::new(int_builder, 2);
builder.values().append_slice(&[1, 2])?;
builder.append(true)?;
builder.values().append_slice(&[3, 4])?;
builder.append(false)?;
builder.values().append_slice(&[5, 6])?;
builder.append(true)?;

let a_builder = UInt16Builder::new(64);
builder.values().append_slice(&[1, 2]);
builder.append(true);
builder.values().append_slice(&[3, 4]);
builder.append(false);
builder.values().append_slice(&[5, 6]);
builder.append(true);
let a = builder.finish().into_data();

let a_builder = UInt16Builder::with_capacity(64);
let mut a_builder = FixedSizeListBuilder::<UInt16Builder>::new(a_builder, 2);
a_builder.values().append_slice(&[7, 8])?;
a_builder.append(true)?;
a_builder.values().append_slice(&[9, 10])?;
a_builder.append(true)?;
a_builder.values().append_slice(&[11, 12])?;
a_builder.append(false)?;
a_builder.values().append_slice(&[13, 14])?;
a_builder.append(true)?;
a_builder.values().append_null()?;
a_builder.values().append_null()?;
a_builder.append(true)?;
let a = a_builder.finish();
a_builder.values().append_slice(&[7, 8]);
a_builder.append(true);
a_builder.values().append_slice(&[9, 10]);
a_builder.append(true);
a_builder.values().append_slice(&[11, 12]);
a_builder.append(false);
a_builder.values().append_slice(&[13, 14]);
a_builder.append(true);
a_builder.values().append_null();
a_builder.values().append_null();
a_builder.append(true);
let b = a_builder.finish().into_data();

let mut mutable = MutableArrayData::new(vec![&a, &b], false, 10);
mutable.extend(0, 0, a.len());
mutable.extend(1, 0, b.len());

// append array
builder.append_data(&[
a.data(),
a.slice(1, 3).data(),
a.slice(2, 1).data(),
a.slice(5, 0).data(),
])?;
let finished = builder.finish();
mutable.extend(1, 1, 4);
mutable.extend(1, 2, 3);

let finished = mutable.freeze();

let expected_int_array = UInt16Array::from(vec![
Some(1),
Expand Down Expand Up @@ -1141,23 +1140,14 @@ fn test_fixed_size_list_append() -> Result<()> {
Some(11),
Some(12),
]);
let expected_list_data = ArrayData::new(
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::UInt16, true)),
2,
),
let expected_fixed_size_list_data = ArrayData::try_new(
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::UInt16, true)), 2),
12,
None,
None,
Some(Buffer::from(&[0b11011101, 0b101])),
0,
vec![],
vec![expected_int_array.data()],
);
let expected_list =
FixedSizeListArray::from(Arc::new(expected_list_data) as ArrayData);
assert_eq!(&expected_list.values(), &finished.values());
assert_eq!(expected_list.len(), finished.len());

Ok(())
vec![expected_int_array.to_data()],
)
.unwrap();
assert_eq!(finished, expected_fixed_size_list_data);
}
*/