Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if I can pull in PrimitiveDistinctCountAccumulator to the deduplication as well, however it is specialized for types which don't need to hash through Hashable (aka non-float types) and I think there might be a performance hit if I try force them to use Hashable 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we definitely don't want to be hashing if we can avoid taht

Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use datafusion_common::utils::SingleRowListArrayBuilder;
use datafusion_common::ScalarValue;
use datafusion_expr_common::accumulator::Accumulator;

use crate::utils::Hashable;
use crate::utils::GenericDistinctBuffer;

#[derive(Debug)]
pub struct PrimitiveDistinctCountAccumulator<T>
Expand Down Expand Up @@ -124,88 +124,42 @@ where
}

#[derive(Debug)]
pub struct FloatDistinctCountAccumulator<T>
where
T: ArrowPrimitiveType + Send,
{
values: HashSet<Hashable<T::Native>, RandomState>,
pub struct FloatDistinctCountAccumulator<T: ArrowPrimitiveType> {
values: GenericDistinctBuffer<T>,
}

impl<T> FloatDistinctCountAccumulator<T>
where
T: ArrowPrimitiveType + Send,
{
impl<T: ArrowPrimitiveType> FloatDistinctCountAccumulator<T> {
pub fn new() -> Self {
Self {
values: HashSet::default(),
values: GenericDistinctBuffer::new(T::DATA_TYPE),
}
}
}

impl<T> Default for FloatDistinctCountAccumulator<T>
where
T: ArrowPrimitiveType + Send,
{
impl<T: ArrowPrimitiveType> Default for FloatDistinctCountAccumulator<T> {
fn default() -> Self {
Self::new()
}
}

impl<T> Accumulator for FloatDistinctCountAccumulator<T>
where
T: ArrowPrimitiveType + Send + Debug,
{
impl<T: ArrowPrimitiveType + Debug> Accumulator for FloatDistinctCountAccumulator<T> {
fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
let arr = Arc::new(PrimitiveArray::<T>::from_iter_values(
self.values.iter().map(|v| v.0),
)) as ArrayRef;
Ok(vec![SingleRowListArrayBuilder::new(arr).build_list_scalar()])
self.values.state()
}

fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
if values.is_empty() {
return Ok(());
}

let arr = as_primitive_array::<T>(&values[0])?;
arr.iter().for_each(|value| {
if let Some(value) = value {
self.values.insert(Hashable(value));
}
});

Ok(())
self.values.update_batch(values)
}

fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
if states.is_empty() {
return Ok(());
}
assert_eq!(
states.len(),
1,
"count_distinct states must be single array"
);

let arr = as_list_array(&states[0])?;
arr.iter().try_for_each(|maybe_list| {
if let Some(list) = maybe_list {
let list = as_primitive_array::<T>(&list)?;
self.values
.extend(list.values().iter().map(|v| Hashable(*v)));
};
Ok(())
})
self.values.merge_batch(states)
}

fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(self.values.len() as i64)))
Ok(ScalarValue::Int64(Some(self.values.values.len() as i64)))
}

fn size(&self) -> usize {
let num_elements = self.values.len();
let fixed_size = size_of_val(self) + size_of_val(&self.values);

estimate_memory_size::<T::Native>(num_elements, fixed_size).unwrap()
size_of_val(self) + self.values.size()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,107 +17,67 @@

//! Defines the accumulator for `SUM DISTINCT` for primitive numeric types

use std::collections::HashSet;
use std::fmt::Debug;
use std::mem::{size_of, size_of_val};
use std::mem::size_of_val;

use ahash::RandomState;
use arrow::array::Array;
use arrow::array::ArrayRef;
use arrow::array::ArrowNativeTypeOp;
use arrow::array::ArrowPrimitiveType;
use arrow::array::AsArray;
use arrow::datatypes::ArrowNativeType;
use arrow::datatypes::DataType;

use datafusion_common::Result;
use datafusion_common::ScalarValue;
use datafusion_expr_common::accumulator::Accumulator;

use crate::utils::Hashable;
use crate::utils::GenericDistinctBuffer;

/// Accumulator for computing SUM(DISTINCT expr)
#[derive(Debug)]
pub struct DistinctSumAccumulator<T: ArrowPrimitiveType> {
values: HashSet<Hashable<T::Native>, RandomState>,
values: GenericDistinctBuffer<T>,
data_type: DataType,
}

impl<T: ArrowPrimitiveType> Debug for DistinctSumAccumulator<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "DistinctSumAccumulator({})", self.data_type)
}
}

impl<T: ArrowPrimitiveType> DistinctSumAccumulator<T> {
pub fn new(data_type: &DataType) -> Self {
Self {
values: HashSet::default(),
values: GenericDistinctBuffer::new(data_type.clone()),
data_type: data_type.clone(),
}
}

pub fn distinct_count(&self) -> usize {
self.values.len()
self.values.values.len()
}
}

impl<T: ArrowPrimitiveType> Accumulator for DistinctSumAccumulator<T> {
impl<T: ArrowPrimitiveType + Debug> Accumulator for DistinctSumAccumulator<T> {
fn state(&mut self) -> Result<Vec<ScalarValue>> {
// 1. Stores aggregate state in `ScalarValue::List`
// 2. Constructs `ScalarValue::List` state from distinct numeric stored in hash set
let state_out = {
let distinct_values = self
.values
.iter()
.map(|value| {
ScalarValue::new_primitive::<T>(Some(value.0), &self.data_type)
})
.collect::<Result<Vec<_>>>()?;

vec![ScalarValue::List(ScalarValue::new_list_nullable(
&distinct_values,
&self.data_type,
))]
};
Ok(state_out)
self.values.state()
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
if values.is_empty() {
return Ok(());
}

let array = values[0].as_primitive::<T>();
match array.nulls().filter(|x| x.null_count() > 0) {
Some(n) => {
for idx in n.valid_indices() {
self.values.insert(Hashable(array.value(idx)));
}
}
None => array.values().iter().for_each(|x| {
self.values.insert(Hashable(*x));
}),
}
Ok(())
self.values.update_batch(values)
}

fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
for x in states[0].as_list::<i32>().iter().flatten() {
self.update_batch(&[x])?
}
Ok(())
self.values.merge_batch(states)
}

fn evaluate(&mut self) -> Result<ScalarValue> {
let mut acc = T::Native::usize_as(0);
for distinct_value in self.values.iter() {
acc = acc.add_wrapping(distinct_value.0)
if self.distinct_count() == 0 {
ScalarValue::new_primitive::<T>(None, &self.data_type)
} else {
let mut acc = T::Native::usize_as(0);
for distinct_value in self.values.values.iter() {
acc = acc.add_wrapping(distinct_value.0)
}
ScalarValue::new_primitive::<T>(Some(acc), &self.data_type)
}
let v = (!self.values.is_empty()).then_some(acc);
ScalarValue::new_primitive::<T>(v, &self.data_type)
}

fn size(&self) -> usize {
size_of_val(self) + self.values.capacity() * size_of::<T::Native>()
size_of_val(self) + self.values.size()
}
}
99 changes: 97 additions & 2 deletions datafusion/functions-aggregate-common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,20 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::{ArrayRef, ArrowNativeTypeOp};
use ahash::RandomState;
use arrow::array::{
Array, ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, PrimitiveArray,
};
use arrow::compute::SortOptions;
use arrow::datatypes::{
ArrowNativeType, DataType, DecimalType, Field, FieldRef, ToByteSlice,
};
use datafusion_common::{exec_err, internal_datafusion_err, Result};
use datafusion_common::cast::{as_list_array, as_primitive_array};
use datafusion_common::utils::memory::estimate_memory_size;
use datafusion_common::utils::SingleRowListArrayBuilder;
use datafusion_common::{
exec_err, internal_datafusion_err, HashSet, Result, ScalarValue,
};
use datafusion_expr_common::accumulator::Accumulator;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use std::sync::Arc;
Expand Down Expand Up @@ -167,3 +175,90 @@ impl<T: DecimalType> DecimalAverager<T> {
}
}
}

/// Generic way to collect distinct values for accumulators.
///
/// The intermediate state is represented as a List of scalar values updated by
/// `merge_batch` and a `Vec` of `ArrayRef` that are converted to scalar values
/// in the final evaluation step so that we avoid expensive conversions and
/// allocations during `update_batch`.
pub struct GenericDistinctBuffer<T: ArrowPrimitiveType> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main implementation here; I toyed with the idea of making this implement Accumulator and have the different functions (like median and percentile_cont) provide their evaluate logic as a closure but it got a bit messy; so for now they delegate their state/update_batch/merge_batch to this inner struct, which allows them to grab the final set of distinct values for them to do their own evaluate

pub values: HashSet<Hashable<T::Native>, RandomState>,
data_type: DataType,
}

impl<T: ArrowPrimitiveType> std::fmt::Debug for GenericDistinctBuffer<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"GenericDistinctBuffer({}, values={})",
self.data_type,
self.values.len()
)
}
}

impl<T: ArrowPrimitiveType> GenericDistinctBuffer<T> {
pub fn new(data_type: DataType) -> Self {
Self {
values: HashSet::default(),
data_type,
}
}

/// Mirrors [`Accumulator::state`].
pub fn state(&self) -> Result<Vec<ScalarValue>> {
let arr = Arc::new(
PrimitiveArray::<T>::from_iter_values(self.values.iter().map(|v| v.0))
// Ideally we'd just use T::DATA_TYPE but this misses things like
// decimal scale/precision and timestamp timezones, which need to
// match up with Accumulator::state_fields
.with_data_type(self.data_type.clone()),
);
Ok(vec![SingleRowListArrayBuilder::new(arr).build_list_scalar()])
}

/// Mirrors [`Accumulator::update_batch`].
pub fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
if values.is_empty() {
return Ok(());
}

debug_assert_eq!(
values.len(),
1,
"DistinctValuesBuffer::update_batch expects only a single input array"
);

let arr = as_primitive_array::<T>(&values[0])?;
if arr.null_count() > 0 {
self.values.extend(arr.iter().flatten().map(Hashable));
} else {
self.values
.extend(arr.values().iter().cloned().map(Hashable));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice -- this is an elegant way to special case nulls/non nulls

}

Ok(())
}

/// Mirrors [`Accumulator::merge_batch`].
pub fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
if states.is_empty() {
return Ok(());
}

let array = as_list_array(&states[0])?;
for list in array.iter().flatten() {
self.update_batch(&[list])?;
}

Ok(())
}

/// Mirrors [`Accumulator::size`].
pub fn size(&self) -> usize {
let num_elements = self.values.len();
let fixed_size = size_of_val(self) + size_of_val(&self.values);
estimate_memory_size::<T::Native>(num_elements, fixed_size).unwrap()
}
}
Loading