|
18 | 18 | //! Interleave elements from multiple arrays |
19 | 19 |
|
20 | 20 | use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values}; |
21 | | -use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder, PrimitiveBuilder}; |
| 21 | +use arrow_array::builder::{BooleanBufferBuilder, PrimitiveBuilder}; |
22 | 22 | use arrow_array::cast::AsArray; |
23 | 23 | use arrow_array::types::*; |
24 | 24 | use arrow_array::*; |
25 | 25 | use arrow_buffer::{ArrowNativeType, BooleanBuffer, MutableBuffer, NullBuffer, OffsetBuffer}; |
26 | 26 | use arrow_data::transform::MutableArrayData; |
27 | 27 | use arrow_data::ByteView; |
28 | 28 | use arrow_schema::{ArrowError, DataType}; |
29 | | -use std::collections::HashMap; |
30 | 29 | use std::sync::Arc; |
31 | 30 |
|
32 | 31 | macro_rules! primitive_helper { |
@@ -238,32 +237,43 @@ fn interleave_views<T: ByteViewType>( |
238 | 237 | indices: &[(usize, usize)], |
239 | 238 | ) -> Result<ArrayRef, ArrowError> { |
240 | 239 | let interleaved = Interleave::<'_, GenericByteViewArray<T>>::new(values, indices); |
241 | | - let mut views_builder = BufferBuilder::new(indices.len()); |
242 | 240 | let mut buffers = Vec::new(); |
243 | 241 |
|
244 | | - // (input array_index, input buffer_index) -> output buffer_index |
245 | | - let mut buffer_lookup: HashMap<(usize, u32), u32> = HashMap::new(); |
246 | | - for (array_idx, value_idx) in indices { |
247 | | - let array = interleaved.arrays[*array_idx]; |
248 | | - let raw_view = array.views().get(*value_idx).unwrap(); |
249 | | - let view_len = *raw_view as u32; |
250 | | - if view_len <= 12 { |
251 | | - views_builder.append(*raw_view); |
252 | | - continue; |
253 | | - } |
254 | | - // value is big enough to be in a variadic buffer |
255 | | - let view = ByteView::from(*raw_view); |
256 | | - let new_buffer_idx: &mut u32 = buffer_lookup |
257 | | - .entry((*array_idx, view.buffer_index)) |
258 | | - .or_insert_with(|| { |
259 | | - buffers.push(array.data_buffers()[view.buffer_index as usize].clone()); |
260 | | - (buffers.len() - 1) as u32 |
261 | | - }); |
262 | | - views_builder.append(view.with_buffer_index(*new_buffer_idx).into()); |
| 242 | + // Contains the offsets of start buffer in `buffer_to_new_index` |
| 243 | + let mut offsets = Vec::with_capacity(interleaved.arrays.len() + 1); |
| 244 | + offsets.push(0); |
| 245 | + let mut total_buffers = 0; |
| 246 | + for a in interleaved.arrays.iter() { |
| 247 | + total_buffers += a.data_buffers().len(); |
| 248 | + offsets.push(total_buffers); |
263 | 249 | } |
264 | 250 |
|
| 251 | + // contains the mapping from old buffer index to new buffer index |
| 252 | + let mut buffer_to_new_index = vec![None; total_buffers]; |
| 253 | + |
| 254 | + let views: Vec<u128> = indices |
| 255 | + .iter() |
| 256 | + .map(|(array_idx, value_idx)| { |
| 257 | + let array = interleaved.arrays[*array_idx]; |
| 258 | + let view = array.views().get(*value_idx).unwrap(); |
| 259 | + let view_len = *view as u32; |
| 260 | + if view_len <= 12 { |
| 261 | + return *view; |
| 262 | + } |
| 263 | + // value is big enough to be in a variadic buffer |
| 264 | + let view = ByteView::from(*view); |
| 265 | + let buffer_to_new_idx = offsets[*array_idx] + view.buffer_index as usize; |
| 266 | + let new_buffer_idx: u32 = |
| 267 | + *buffer_to_new_index[buffer_to_new_idx].get_or_insert_with(|| { |
| 268 | + buffers.push(array.data_buffers()[view.buffer_index as usize].clone()); |
| 269 | + (buffers.len() - 1) as u32 |
| 270 | + }); |
| 271 | + view.with_buffer_index(new_buffer_idx).as_u128() |
| 272 | + }) |
| 273 | + .collect(); |
| 274 | + |
265 | 275 | let array = unsafe { |
266 | | - GenericByteViewArray::<T>::new_unchecked(views_builder.into(), buffers, interleaved.nulls) |
| 276 | + GenericByteViewArray::<T>::new_unchecked(views.into(), buffers, interleaved.nulls) |
267 | 277 | }; |
268 | 278 | Ok(Arc::new(array)) |
269 | 279 | } |
|
0 commit comments