diff --git a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs index e8b6588dc091..4d6a2fec51af 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs @@ -38,7 +38,7 @@ use datafusion_common::utils::SingleRowListArrayBuilder; use datafusion_common::ScalarValue; use datafusion_expr_common::accumulator::Accumulator; -use crate::utils::Hashable; +use crate::utils::GenericDistinctBuffer; #[derive(Debug)] pub struct PrimitiveDistinctCountAccumulator @@ -124,88 +124,42 @@ where } #[derive(Debug)] -pub struct FloatDistinctCountAccumulator -where - T: ArrowPrimitiveType + Send, -{ - values: HashSet, RandomState>, +pub struct FloatDistinctCountAccumulator { + values: GenericDistinctBuffer, } -impl FloatDistinctCountAccumulator -where - T: ArrowPrimitiveType + Send, -{ +impl FloatDistinctCountAccumulator { pub fn new() -> Self { Self { - values: HashSet::default(), + values: GenericDistinctBuffer::new(T::DATA_TYPE), } } } -impl Default for FloatDistinctCountAccumulator -where - T: ArrowPrimitiveType + Send, -{ +impl Default for FloatDistinctCountAccumulator { fn default() -> Self { Self::new() } } -impl Accumulator for FloatDistinctCountAccumulator -where - T: ArrowPrimitiveType + Send + Debug, -{ +impl Accumulator for FloatDistinctCountAccumulator { fn state(&mut self) -> datafusion_common::Result> { - let arr = Arc::new(PrimitiveArray::::from_iter_values( - self.values.iter().map(|v| v.0), - )) as ArrayRef; - Ok(vec![SingleRowListArrayBuilder::new(arr).build_list_scalar()]) + self.values.state() } fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { - if values.is_empty() { - return Ok(()); - } - - let arr = as_primitive_array::(&values[0])?; - arr.iter().for_each(|value| { - if let Some(value) = value { - self.values.insert(Hashable(value)); - } - }); - - Ok(()) + self.values.update_batch(values) } fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> { - if states.is_empty() { - return Ok(()); - } - assert_eq!( - states.len(), - 1, - "count_distinct states must be single array" - ); - - let arr = as_list_array(&states[0])?; - arr.iter().try_for_each(|maybe_list| { - if let Some(list) = maybe_list { - let list = as_primitive_array::(&list)?; - self.values - .extend(list.values().iter().map(|v| Hashable(*v))); - }; - Ok(()) - }) + self.values.merge_batch(states) } fn evaluate(&mut self) -> datafusion_common::Result { - Ok(ScalarValue::Int64(Some(self.values.len() as i64))) + Ok(ScalarValue::Int64(Some(self.values.values.len() as i64))) } fn size(&self) -> usize { - let num_elements = self.values.len(); - let fixed_size = size_of_val(self) + size_of_val(&self.values); - - estimate_memory_size::(num_elements, fixed_size).unwrap() + size_of_val(self) + self.values.size() } } diff --git a/datafusion/functions-aggregate-common/src/aggregate/sum_distinct/numeric.rs b/datafusion/functions-aggregate-common/src/aggregate/sum_distinct/numeric.rs index 3021783a2a79..e5a23597c44a 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/sum_distinct/numeric.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/sum_distinct/numeric.rs @@ -17,16 +17,12 @@ //! Defines the accumulator for `SUM DISTINCT` for primitive numeric types -use std::collections::HashSet; use std::fmt::Debug; -use std::mem::{size_of, size_of_val}; +use std::mem::size_of_val; -use ahash::RandomState; -use arrow::array::Array; use arrow::array::ArrayRef; use arrow::array::ArrowNativeTypeOp; use arrow::array::ArrowPrimitiveType; -use arrow::array::AsArray; use arrow::datatypes::ArrowNativeType; use arrow::datatypes::DataType; @@ -34,90 +30,54 @@ use datafusion_common::Result; use datafusion_common::ScalarValue; use datafusion_expr_common::accumulator::Accumulator; -use crate::utils::Hashable; +use crate::utils::GenericDistinctBuffer; /// Accumulator for computing SUM(DISTINCT expr) +#[derive(Debug)] pub struct DistinctSumAccumulator { - values: HashSet, RandomState>, + values: GenericDistinctBuffer, data_type: DataType, } -impl Debug for DistinctSumAccumulator { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "DistinctSumAccumulator({})", self.data_type) - } -} - impl DistinctSumAccumulator { pub fn new(data_type: &DataType) -> Self { Self { - values: HashSet::default(), + values: GenericDistinctBuffer::new(data_type.clone()), data_type: data_type.clone(), } } pub fn distinct_count(&self) -> usize { - self.values.len() + self.values.values.len() } } -impl Accumulator for DistinctSumAccumulator { +impl Accumulator for DistinctSumAccumulator { fn state(&mut self) -> Result> { - // 1. Stores aggregate state in `ScalarValue::List` - // 2. Constructs `ScalarValue::List` state from distinct numeric stored in hash set - let state_out = { - let distinct_values = self - .values - .iter() - .map(|value| { - ScalarValue::new_primitive::(Some(value.0), &self.data_type) - }) - .collect::>>()?; - - vec![ScalarValue::List(ScalarValue::new_list_nullable( - &distinct_values, - &self.data_type, - ))] - }; - Ok(state_out) + self.values.state() } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - if values.is_empty() { - return Ok(()); - } - - let array = values[0].as_primitive::(); - match array.nulls().filter(|x| x.null_count() > 0) { - Some(n) => { - for idx in n.valid_indices() { - self.values.insert(Hashable(array.value(idx))); - } - } - None => array.values().iter().for_each(|x| { - self.values.insert(Hashable(*x)); - }), - } - Ok(()) + self.values.update_batch(values) } fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - for x in states[0].as_list::().iter().flatten() { - self.update_batch(&[x])? - } - Ok(()) + self.values.merge_batch(states) } fn evaluate(&mut self) -> Result { - let mut acc = T::Native::usize_as(0); - for distinct_value in self.values.iter() { - acc = acc.add_wrapping(distinct_value.0) + if self.distinct_count() == 0 { + ScalarValue::new_primitive::(None, &self.data_type) + } else { + let mut acc = T::Native::usize_as(0); + for distinct_value in self.values.values.iter() { + acc = acc.add_wrapping(distinct_value.0) + } + ScalarValue::new_primitive::(Some(acc), &self.data_type) } - let v = (!self.values.is_empty()).then_some(acc); - ScalarValue::new_primitive::(v, &self.data_type) } fn size(&self) -> usize { - size_of_val(self) + self.values.capacity() * size_of::() + size_of_val(self) + self.values.size() } } diff --git a/datafusion/functions-aggregate-common/src/utils.rs b/datafusion/functions-aggregate-common/src/utils.rs index 7ce5f09373f5..9fff5e392dc8 100644 --- a/datafusion/functions-aggregate-common/src/utils.rs +++ b/datafusion/functions-aggregate-common/src/utils.rs @@ -15,12 +15,20 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::{ArrayRef, ArrowNativeTypeOp}; +use ahash::RandomState; +use arrow::array::{ + Array, ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, PrimitiveArray, +}; use arrow::compute::SortOptions; use arrow::datatypes::{ ArrowNativeType, DataType, DecimalType, Field, FieldRef, ToByteSlice, }; -use datafusion_common::{exec_err, internal_datafusion_err, Result}; +use datafusion_common::cast::{as_list_array, as_primitive_array}; +use datafusion_common::utils::memory::estimate_memory_size; +use datafusion_common::utils::SingleRowListArrayBuilder; +use datafusion_common::{ + exec_err, internal_datafusion_err, HashSet, Result, ScalarValue, +}; use datafusion_expr_common::accumulator::Accumulator; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use std::sync::Arc; @@ -167,3 +175,90 @@ impl DecimalAverager { } } } + +/// Generic way to collect distinct values for accumulators. +/// +/// The intermediate state is represented as a List of scalar values updated by +/// `merge_batch` and a `Vec` of `ArrayRef` that are converted to scalar values +/// in the final evaluation step so that we avoid expensive conversions and +/// allocations during `update_batch`. +pub struct GenericDistinctBuffer { + pub values: HashSet, RandomState>, + data_type: DataType, +} + +impl std::fmt::Debug for GenericDistinctBuffer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "GenericDistinctBuffer({}, values={})", + self.data_type, + self.values.len() + ) + } +} + +impl GenericDistinctBuffer { + pub fn new(data_type: DataType) -> Self { + Self { + values: HashSet::default(), + data_type, + } + } + + /// Mirrors [`Accumulator::state`]. + pub fn state(&self) -> Result> { + let arr = Arc::new( + PrimitiveArray::::from_iter_values(self.values.iter().map(|v| v.0)) + // Ideally we'd just use T::DATA_TYPE but this misses things like + // decimal scale/precision and timestamp timezones, which need to + // match up with Accumulator::state_fields + .with_data_type(self.data_type.clone()), + ); + Ok(vec![SingleRowListArrayBuilder::new(arr).build_list_scalar()]) + } + + /// Mirrors [`Accumulator::update_batch`]. + pub fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + if values.is_empty() { + return Ok(()); + } + + debug_assert_eq!( + values.len(), + 1, + "DistinctValuesBuffer::update_batch expects only a single input array" + ); + + let arr = as_primitive_array::(&values[0])?; + if arr.null_count() > 0 { + self.values.extend(arr.iter().flatten().map(Hashable)); + } else { + self.values + .extend(arr.values().iter().cloned().map(Hashable)); + } + + Ok(()) + } + + /// Mirrors [`Accumulator::merge_batch`]. + pub fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + if states.is_empty() { + return Ok(()); + } + + let array = as_list_array(&states[0])?; + for list in array.iter().flatten() { + self.update_batch(&[list])?; + } + + Ok(()) + } + + /// Mirrors [`Accumulator::size`]. + pub fn size(&self) -> usize { + let num_elements = self.values.len(); + let fixed_size = size_of_val(self) + size_of_val(&self.values); + estimate_memory_size::(num_elements, fixed_size).unwrap() + } +} diff --git a/datafusion/functions-aggregate/src/median.rs b/datafusion/functions-aggregate/src/median.rs index a65759594eac..b2bb8e294f39 100644 --- a/datafusion/functions-aggregate/src/median.rs +++ b/datafusion/functions-aggregate/src/median.rs @@ -40,7 +40,7 @@ use arrow::datatypes::{ }; use datafusion_common::{ - internal_datafusion_err, internal_err, DataFusionError, HashSet, Result, ScalarValue, + internal_datafusion_err, internal_err, DataFusionError, Result, ScalarValue, }; use datafusion_expr::function::StateFieldsArgs; use datafusion_expr::{ @@ -50,7 +50,7 @@ use datafusion_expr::{ use datafusion_expr::{EmitTo, GroupsAccumulator}; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::accumulate; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::filtered_null_mask; -use datafusion_functions_aggregate_common::utils::Hashable; +use datafusion_functions_aggregate_common::utils::GenericDistinctBuffer; use datafusion_macros::user_doc; make_udaf_expr_and_func!( @@ -151,7 +151,7 @@ impl AggregateUDFImpl for Median { if acc_args.is_distinct { Ok(Box::new(DistinctMedianAccumulator::<$t> { data_type: $dt.clone(), - distinct_values: HashSet::new(), + distinct_values: GenericDistinctBuffer::new($dt), })) } else { Ok(Box::new(MedianAccumulator::<$t> { @@ -506,65 +506,27 @@ impl GroupsAccumulator for MedianGroupsAccumulator { + distinct_values: GenericDistinctBuffer, data_type: DataType, - distinct_values: HashSet>, -} - -impl Debug for DistinctMedianAccumulator { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "DistinctMedianAccumulator({})", self.data_type) - } } -impl Accumulator for DistinctMedianAccumulator { +impl Accumulator for DistinctMedianAccumulator { fn state(&mut self) -> Result> { - let all_values = self - .distinct_values - .iter() - .map(|x| ScalarValue::new_primitive::(Some(x.0), &self.data_type)) - .collect::>>()?; - - let arr = ScalarValue::new_list_nullable(&all_values, &self.data_type); - Ok(vec![ScalarValue::List(arr)]) + self.distinct_values.state() } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - if values.is_empty() { - return Ok(()); - } - - let array = values[0].as_primitive::(); - match array.nulls().filter(|x| x.null_count() > 0) { - Some(n) => { - for idx in n.valid_indices() { - self.distinct_values.insert(Hashable(array.value(idx))); - } - } - None => array.values().iter().for_each(|x| { - self.distinct_values.insert(Hashable(*x)); - }), - } - Ok(()) + self.distinct_values.update_batch(values) } fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - let array = states[0].as_list::(); - for v in array.iter().flatten() { - self.update_batch(&[v])? - } - Ok(()) + self.distinct_values.merge_batch(states) } fn evaluate(&mut self) -> Result { - let d = std::mem::take(&mut self.distinct_values) + let d = std::mem::take(&mut self.distinct_values.values) .into_iter() .map(|v| v.0) .collect::>(); @@ -573,7 +535,7 @@ impl Accumulator for DistinctMedianAccumulator { } fn size(&self) -> usize { - size_of_val(self) + self.distinct_values.capacity() * size_of::() + size_of_val(self) + self.distinct_values.size() } } diff --git a/datafusion/functions-aggregate/src/percentile_cont.rs b/datafusion/functions-aggregate/src/percentile_cont.rs index 8e9e9a3144d4..d311135828d3 100644 --- a/datafusion/functions-aggregate/src/percentile_cont.rs +++ b/datafusion/functions-aggregate/src/percentile_cont.rs @@ -34,8 +34,7 @@ use arrow::{ use arrow::array::ArrowNativeTypeOp; use datafusion_common::{ - internal_datafusion_err, internal_err, plan_err, DataFusionError, HashSet, Result, - ScalarValue, + internal_datafusion_err, internal_err, plan_err, DataFusionError, Result, ScalarValue, }; use datafusion_expr::expr::{AggregateFunction, Sort}; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; @@ -48,7 +47,7 @@ use datafusion_expr::{ use datafusion_expr::{EmitTo, GroupsAccumulator}; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::accumulate; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::filtered_null_mask; -use datafusion_functions_aggregate_common::utils::Hashable; +use datafusion_functions_aggregate_common::utils::GenericDistinctBuffer; use datafusion_macros::user_doc; use crate::utils::validate_percentile_expr; @@ -171,7 +170,7 @@ impl PercentileCont { if args.is_distinct { Ok(Box::new(DistinctPercentileContAccumulator::<$t> { data_type: $dt.clone(), - distinct_values: HashSet::new(), + distinct_values: GenericDistinctBuffer::new($dt), percentile, })) } else { @@ -656,77 +655,28 @@ impl GroupsAccumulator } } -/// The distinct percentile_cont accumulator accumulates the raw input values -/// using a HashSet to eliminate duplicates. -/// -/// The intermediate state is represented as a List of scalar values updated by -/// `merge_batch` and a `Vec` of `ArrayRef` that are converted to scalar values -/// in the final evaluation step so that we avoid expensive conversions and -/// allocations during `update_batch`. +#[derive(Debug)] struct DistinctPercentileContAccumulator { + distinct_values: GenericDistinctBuffer, data_type: DataType, - distinct_values: HashSet>, percentile: f64, } -impl Debug for DistinctPercentileContAccumulator { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "DistinctPercentileContAccumulator({}, percentile={})", - self.data_type, self.percentile - ) - } -} - -impl Accumulator for DistinctPercentileContAccumulator { +impl Accumulator for DistinctPercentileContAccumulator { fn state(&mut self) -> Result> { - let all_values = self - .distinct_values - .iter() - .map(|x| ScalarValue::new_primitive::(Some(x.0), &self.data_type)) - .collect::>>()?; - - let arr = ScalarValue::new_list_nullable(&all_values, &self.data_type); - Ok(vec![ScalarValue::List(arr)]) + self.distinct_values.state() } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - if values.is_empty() { - return Ok(()); - } - - // Cast to target type if needed (e.g., integer to Float64) - let values = if values[0].data_type() != &self.data_type { - arrow::compute::cast(&values[0], &self.data_type)? - } else { - Arc::clone(&values[0]) - }; - - let array = values.as_primitive::(); - match array.nulls().filter(|x| x.null_count() > 0) { - Some(n) => { - for idx in n.valid_indices() { - self.distinct_values.insert(Hashable(array.value(idx))); - } - } - None => array.values().iter().for_each(|x| { - self.distinct_values.insert(Hashable(*x)); - }), - } - Ok(()) + self.distinct_values.update_batch(values) } fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - let array = states[0].as_list::(); - for v in array.iter().flatten() { - self.update_batch(&[v])? - } - Ok(()) + self.distinct_values.merge_batch(states) } fn evaluate(&mut self) -> Result { - let d = std::mem::take(&mut self.distinct_values) + let d = std::mem::take(&mut self.distinct_values.values) .into_iter() .map(|v| v.0) .collect::>(); @@ -735,7 +685,7 @@ impl Accumulator for DistinctPercentileContAccumulator { } fn size(&self) -> usize { - size_of_val(self) + self.distinct_values.capacity() * size_of::() + size_of_val(self) + self.distinct_values.size() } }