Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 20 additions & 51 deletions arrow-array/src/builder/generic_bytes_view_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::any::Any;
use std::marker::PhantomData;
use std::sync::Arc;

use arrow_buffer::{Buffer, NullBufferBuilder, ScalarBuffer};
use arrow_buffer::{Buffer, BufferBuilder, NullBufferBuilder, ScalarBuffer};
use arrow_data::ByteView;
use arrow_schema::ArrowError;
use hashbrown::hash_table::Entry;
Expand All @@ -28,7 +28,7 @@ use hashbrown::HashTable;
use crate::builder::ArrayBuilder;
use crate::types::bytes::ByteArrayNativeType;
use crate::types::{BinaryViewType, ByteViewType, StringViewType};
use crate::{Array, ArrayRef, GenericByteViewArray};
use crate::{ArrayRef, GenericByteViewArray};

const STARTING_BLOCK_SIZE: u32 = 8 * 1024; // 8KiB
const MAX_BLOCK_SIZE: u32 = 2 * 1024 * 1024; // 2MiB
Expand Down Expand Up @@ -79,7 +79,7 @@ impl BlockSizeGrowthStrategy {
/// using [`GenericByteViewBuilder::append_block`] and then views into this block appended
/// using [`GenericByteViewBuilder::try_append_view`]
pub struct GenericByteViewBuilder<T: ByteViewType + ?Sized> {
views_buffer: Vec<u128>,
views_builder: BufferBuilder<u128>,
null_buffer_builder: NullBufferBuilder,
completed: Vec<Buffer>,
in_progress: Vec<u8>,
Expand All @@ -99,7 +99,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
/// Creates a new [`GenericByteViewBuilder`] with space for `capacity` string values.
pub fn with_capacity(capacity: usize) -> Self {
Self {
views_buffer: Vec::with_capacity(capacity),
views_builder: BufferBuilder::new(capacity),
null_buffer_builder: NullBufferBuilder::new(capacity),
completed: vec![],
in_progress: vec![],
Expand Down Expand Up @@ -148,7 +148,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
pub fn with_deduplicate_strings(self) -> Self {
Self {
string_tracker: Some((
HashTable::with_capacity(self.views_buffer.capacity()),
HashTable::with_capacity(self.views_builder.capacity()),
Default::default(),
)),
..self
Expand Down Expand Up @@ -201,43 +201,10 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
let b = b.get_unchecked(start..end);

let view = make_view(b, block, offset);
self.views_buffer.push(view);
self.views_builder.append(view);
self.null_buffer_builder.append_non_null();
}

/// Appends an array to the builder.
/// This will flush any in-progress block and append the data buffers
/// and add the (adapted) views.
pub fn append_array(&mut self, array: &GenericByteViewArray<T>) {
self.flush_in_progress();
// keep original views if this array is the first to be added or if there are no data buffers (all inline views)
let keep_views = self.completed.is_empty() || array.data_buffers().is_empty();

self.completed.extend(array.data_buffers().iter().cloned());

if keep_views {
self.views_buffer.extend_from_slice(array.views());
} else {
let starting_buffer = self.completed.len() as u32;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think one bug is that this starting buffer value should be calculated before extending self.completed a few lines above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm - this seems off as we already pushed to self.completed @alamb (I only have time later today to address)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll make some new tests / etc to try and cover this case


self.views_buffer.extend(array.views().iter().map(|v| {
let mut byte_view = ByteView::from(*v);
if byte_view.length > 12 {
// Small views (<=12 bytes) are inlined, so only need to update large views
byte_view.buffer_index += starting_buffer;
};

byte_view.as_u128()
}));
}

if let Some(null_buffer) = array.nulls() {
self.null_buffer_builder.append_buffer(null_buffer);
} else {
self.null_buffer_builder.append_n_non_nulls(array.len());
}
}

/// Try to append a view of the given `block`, `offset` and `length`
///
/// See [`Self::append_block`]
Expand Down Expand Up @@ -288,7 +255,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
/// Useful if we want to know what value has been inserted to the builder
/// The index has to be smaller than `self.len()`, otherwise it will panic
pub fn get_value(&self, index: usize) -> &[u8] {
let view = self.views_buffer.as_slice().get(index).unwrap();
let view = self.views_builder.as_slice().get(index).unwrap();
let len = *view as u32;
if len <= 12 {
// # Safety
Expand Down Expand Up @@ -320,7 +287,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
let mut view_buffer = [0; 16];
view_buffer[0..4].copy_from_slice(&length.to_le_bytes());
view_buffer[4..4 + v.len()].copy_from_slice(v);
self.views_buffer.push(u128::from_le_bytes(view_buffer));
self.views_builder.append(u128::from_le_bytes(view_buffer));
self.null_buffer_builder.append_non_null();
return;
}
Expand All @@ -344,15 +311,16 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
Entry::Occupied(occupied) => {
// If the string already exists, we will directly use the view
let idx = occupied.get();
self.views_buffer.push(self.views_buffer[*idx]);
self.views_builder
.append(self.views_builder.as_slice()[*idx]);
self.null_buffer_builder.append_non_null();
self.string_tracker = Some((ht, hasher));
return;
}
Entry::Vacant(vacant) => {
// o.w. we insert the (string hash -> view index)
// the idx is current length of views_builder, as we are inserting a new view
vacant.insert(self.views_buffer.len());
vacant.insert(self.views_builder.len());
}
}
self.string_tracker = Some((ht, hasher));
Expand All @@ -373,7 +341,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
buffer_index: self.completed.len() as u32,
offset,
};
self.views_buffer.push(view.into());
self.views_builder.append(view.into());
self.null_buffer_builder.append_non_null();
}

Expand All @@ -390,20 +358,21 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
#[inline]
pub fn append_null(&mut self) {
self.null_buffer_builder.append_null();
self.views_buffer.push(0);
self.views_builder.append(0);
}

/// Builds the [`GenericByteViewArray`] and reset this builder
pub fn finish(&mut self) -> GenericByteViewArray<T> {
self.flush_in_progress();
let completed = std::mem::take(&mut self.completed);
let len = self.views_builder.len();
let views = ScalarBuffer::new(self.views_builder.finish(), 0, len);
let nulls = self.null_buffer_builder.finish();
if let Some((ref mut ht, _)) = self.string_tracker.as_mut() {
ht.clear();
}
let views = std::mem::take(&mut self.views_buffer);
// SAFETY: valid by construction
unsafe { GenericByteViewArray::new_unchecked(views.into(), completed, nulls) }
unsafe { GenericByteViewArray::new_unchecked(views, completed, nulls) }
}

/// Builds the [`GenericByteViewArray`] without resetting the builder
Expand All @@ -412,8 +381,8 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
if !self.in_progress.is_empty() {
completed.push(Buffer::from_slice_ref(&self.in_progress));
}
let len = self.views_buffer.len();
let views = Buffer::from_slice_ref(self.views_buffer.as_slice());
let len = self.views_builder.len();
let views = Buffer::from_slice_ref(self.views_builder.as_slice());
let views = ScalarBuffer::new(views, 0, len);
let nulls = self.null_buffer_builder.finish_cloned();
// SAFETY: valid by construction
Expand All @@ -427,7 +396,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {

/// Return the allocated size of this builder in bytes, useful for memory accounting.
pub fn allocated_size(&self) -> usize {
let views = self.views_buffer.capacity() * std::mem::size_of::<u128>();
let views = self.views_builder.capacity() * std::mem::size_of::<u128>();
let null = self.null_buffer_builder.allocated_size();
let buffer_size = self.completed.iter().map(|b| b.capacity()).sum::<usize>();
let in_progress = self.in_progress.capacity();
Expand All @@ -449,7 +418,7 @@ impl<T: ByteViewType + ?Sized> std::fmt::Debug for GenericByteViewBuilder<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}ViewBuilder", T::PREFIX)?;
f.debug_struct("")
.field("views_buffer", &self.views_buffer)
.field("views_builder", &self.views_builder)
.field("in_progress", &self.in_progress)
.field("completed", &self.completed)
.field("null_buffer_builder", &self.null_buffer_builder)
Expand Down
107 changes: 34 additions & 73 deletions arrow-select/src/coalesce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
//! [`filter`]: crate::filter::filter
//! [`take`]: crate::take::take
use crate::concat::concat_batches;
use arrow_array::StringViewArray;
use arrow_array::{cast::AsArray, Array, ArrayRef, RecordBatch};
use arrow_data::ByteView;
use arrow_array::{
builder::StringViewBuilder, cast::AsArray, Array, ArrayRef, RecordBatch, RecordBatchOptions,
};
use arrow_schema::{ArrowError, SchemaRef};
use std::collections::VecDeque;
use std::sync::Arc;
Expand Down Expand Up @@ -164,7 +164,7 @@ impl BatchCoalescer {
return Ok(());
}

let mut batch = gc_string_view_batch(batch);
let mut batch = gc_string_view_batch(&batch);

// If pushing this batch would exceed the target batch size,
// finish the current batch and start a new one
Expand Down Expand Up @@ -242,19 +242,15 @@ impl BatchCoalescer {
/// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the
/// `StringViewArray` may only refer to a small portion of the buffer,
/// significantly increasing memory usage.
fn gc_string_view_batch(batch: RecordBatch) -> RecordBatch {
let (schema, columns, num_rows) = batch.into_parts();
let new_columns: Vec<ArrayRef> = columns
.into_iter()
fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch {
let new_columns: Vec<ArrayRef> = batch
.columns()
.iter()
.map(|c| {
// Try to re-create the `StringViewArray` to prevent holding the underlying buffer too long.
let Some(s) = c.as_string_view_opt() else {
return c;
return Arc::clone(c);
};
if s.data_buffers().is_empty() {
// If there are no data buffers, we can just return the array as is
return c;
}
let ideal_buffer_size: usize = s
.views()
.iter()
Expand All @@ -268,73 +264,42 @@ fn gc_string_view_batch(batch: RecordBatch) -> RecordBatch {
})
.sum();
let actual_buffer_size = s.get_buffer_memory_size();
let buffers = s.data_buffers();

// Re-creating the array copies data and can be time consuming.
// We only do it if the array is sparse
if actual_buffer_size > (ideal_buffer_size * 2) {
if ideal_buffer_size == 0 {
// If the ideal buffer size is 0, all views are inlined
// so just reuse the views
return Arc::new(unsafe {
StringViewArray::new_unchecked(
s.views().clone(),
vec![],
s.nulls().cloned(),
)
});
}
// We set the block size to `ideal_buffer_size` so that the new StringViewArray only has one buffer, which accelerate later concat_batches.
// See https://github.com/apache/arrow-rs/issues/6094 for more details.
let mut buffer: Vec<u8> = Vec::with_capacity(ideal_buffer_size);

let views: Vec<u128> = s
.views()
.as_ref()
.iter()
.cloned()
.map(|v| {
let mut b: ByteView = ByteView::from(v);

if b.length > 12 {
let offset = buffer.len() as u32;
buffer.extend_from_slice(
buffers[b.buffer_index as usize]
.get(b.offset as usize..b.offset as usize + b.length as usize)
.expect("Invalid buffer slice"),
);
b.offset = offset;
b.buffer_index = 0; // Set buffer index to 0, as we only have one buffer
}

b.into()
})
.collect();

let buffers = if buffer.is_empty() {
vec![]
} else {
vec![buffer.into()]
};

let gc_string = unsafe {
StringViewArray::new_unchecked(views.into(), buffers, s.nulls().cloned())
};
let mut builder = StringViewBuilder::with_capacity(s.len());
if ideal_buffer_size > 0 {
builder = builder.with_fixed_block_size(ideal_buffer_size as u32);
}

for v in s.iter() {
builder.append_option(v);
}

let gc_string = builder.finish();

debug_assert!(gc_string.data_buffers().len() <= 1); // buffer count can be 0 if the `ideal_buffer_size` is 0

Arc::new(gc_string)
} else {
c
Arc::clone(c)
}
})
.collect();
unsafe { RecordBatch::new_unchecked(schema, new_columns, num_rows) }
let mut options = RecordBatchOptions::new();
options = options.with_row_count(Some(batch.num_rows()));
RecordBatch::try_new_with_options(batch.schema(), new_columns, &options)
.expect("Failed to re-create the gc'ed record batch")
}

#[cfg(test)]
mod tests {
use super::*;
use arrow_array::builder::{ArrayBuilder, StringViewBuilder};
use arrow_array::{RecordBatchOptions, StringViewArray, UInt32Array};
use arrow_array::builder::ArrayBuilder;
use arrow_array::{StringViewArray, UInt32Array};
use arrow_schema::{DataType, Field, Schema};
use std::ops::Range;

Expand Down Expand Up @@ -553,11 +518,9 @@ mod tests {
fn test_gc_string_view_test_batch_empty() {
let schema = Schema::empty();
let batch = RecordBatch::new_empty(schema.into());
let cols = batch.num_columns();
let num_rows = batch.num_rows();
let output_batch = gc_string_view_batch(batch);
assert_eq!(cols, output_batch.num_columns());
assert_eq!(num_rows, output_batch.num_rows());
let output_batch = gc_string_view_batch(&batch);
assert_eq!(batch.num_columns(), output_batch.num_columns());
assert_eq!(batch.num_rows(), output_batch.num_rows());
}

#[test]
Expand Down Expand Up @@ -605,11 +568,9 @@ mod tests {
/// and ensures the number of rows are the same
fn do_gc(array: StringViewArray) -> StringViewArray {
let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)]).unwrap();
let rows = batch.num_rows();
let schema = batch.schema();
let gc_batch = gc_string_view_batch(batch);
assert_eq!(rows, gc_batch.num_rows());
assert_eq!(schema, gc_batch.schema());
let gc_batch = gc_string_view_batch(&batch);
assert_eq!(batch.num_rows(), gc_batch.num_rows());
assert_eq!(batch.schema(), gc_batch.schema());
gc_batch
.column(0)
.as_any()
Expand Down
15 changes: 1 addition & 14 deletions arrow-select/src/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@
//! ```

use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values};
use arrow_array::builder::{
BooleanBuilder, GenericByteBuilder, GenericByteViewBuilder, PrimitiveBuilder,
};
use arrow_array::builder::{BooleanBuilder, GenericByteBuilder, PrimitiveBuilder};
use arrow_array::cast::AsArray;
use arrow_array::types::*;
use arrow_array::*;
Expand Down Expand Up @@ -86,15 +84,6 @@ fn fixed_size_list_capacity(arrays: &[&dyn Array], data_type: &DataType) -> Capa
}
}

fn concat_byte_view<B: ByteViewType>(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
let mut builder =
GenericByteViewBuilder::<B>::with_capacity(arrays.iter().map(|a| a.len()).sum());
for &array in arrays.iter() {
builder.append_array(array.as_byte_view());
}
Ok(Arc::new(builder.finish()))
}

fn concat_dictionaries<K: ArrowDictionaryKeyType>(
arrays: &[&dyn Array],
) -> Result<ArrayRef, ArrowError> {
Expand Down Expand Up @@ -436,8 +425,6 @@ pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
_ => unreachable!("Unsupported run end index type: {r:?}"),
}
}
DataType::Utf8View => concat_byte_view::<StringViewType>(arrays),
DataType::BinaryView => concat_byte_view::<BinaryViewType>(arrays),
_ => {
let capacity = get_capacity(arrays, d);
concat_fallback(arrays, capacity)
Expand Down
Loading