Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 19 additions & 14 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,11 +635,7 @@ impl Stream for GroupedHashAggregateStream {
(
if self.input_done {
ExecutionState::Done
} else if self
.skip_aggregation_probe
.as_ref()
.is_some_and(|probe| probe.should_skip())
{
} else if self.should_skip_aggregation() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the main rationale is to pull this check into a function.

ExecutionState::SkippingAggregation
} else {
ExecutionState::ReadingInput
Expand Down Expand Up @@ -955,12 +951,13 @@ impl GroupedHashAggregateStream {
Ok(())
}

// Updates skip aggregation probe state.
// In case stream has any spills, the probe is forcefully set to
// forbid aggregation skipping, and locked, since spilling resets
// total number of unique groups.
//
// Note: currently spilling is not supported for Partial aggregation
/// Updates skip aggregation probe state.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drive by conversion to docstring style comments

///
/// In case stream has any spills, the probe is forcefully set to
/// forbid aggregation skipping, and locked, since spilling resets
/// total number of unique groups.
///
/// Note: currently spilling is not supported for Partial aggregation
fn update_skip_aggregation_probe(&mut self, input_rows: usize) {
if let Some(probe) = self.skip_aggregation_probe.as_mut() {
if !self.spill_state.spills.is_empty() {
Expand All @@ -971,8 +968,8 @@ impl GroupedHashAggregateStream {
};
}

// In case the probe indicates that aggregation may be
// skipped, forces stream to produce currently accumulated output.
/// In case the probe indicates that aggregation may be
/// skipped, forces stream to produce currently accumulated output.
fn switch_to_skip_aggregation(&mut self) -> Result<()> {
if let Some(probe) = self.skip_aggregation_probe.as_mut() {
if probe.should_skip() {
Expand All @@ -984,7 +981,15 @@ impl GroupedHashAggregateStream {
Ok(())
}

// Transforms input batch to intermediate aggregate state, without grouping it
/// Returns true if the aggregation probe indicates that aggregation
/// should be skipped.
fn should_skip_aggregation(&self) -> bool {
self.skip_aggregation_probe
.as_ref()
.is_some_and(|probe| probe.should_skip())
}

/// Transforms input batch to intermediate aggregate state, without grouping it
fn transform_to_states(&self, batch: RecordBatch) -> Result<RecordBatch> {
let group_values = evaluate_group_by(&self.group_by, &batch)?;
let input_values = evaluate_many(&self.aggregate_arguments, &batch)?;
Expand Down