Skip to content

Conversation

@xanderbailey
Copy link
Contributor

@xanderbailey xanderbailey commented Nov 14, 2025

Which issue does this PR close?

More detail is in the issue.

Rationale for this change

This is a pretty major correctness issue.

What changes are included in this PR?

Fixes issue and reorders skip aggregate and emit early within partial aggregate execution

Are these changes tested?

Yes, the unit test that's added here previously failed before this change.

Are there any user-facing changes?

@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label Nov 14, 2025
@xanderbailey xanderbailey changed the title Fix aggregate race Fix Partial AggregateExec correctness issue dropping rows Nov 14, 2025
// Check if we should switch to skip aggregation mode
// It's important that we do this before we early emit since we've
// already updated the probe.
if let Some(new_state) = self.switch_to_skip_aggregation()? {
Copy link
Contributor Author

@xanderbailey xanderbailey Nov 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unclear to me why we set update_skip_aggregation_probe here https://github.com/apache/datafusion/pull/18712/files#diff-69c8ecaca5e2c7005f2ed1facaa41f80b45bfd006f2357e53ff3072f535c287dL687 and not next to switch_to_skip_aggregation. I can't fully give an explanation yet but allowing the probe to be updated and then allowing the look to break before we get here seems dangerous? It's important that we emit everything before we move to the SkipAggregation state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@korowa does that make sense to you?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read this new logic carefully and I think moving the set to switch aggregation state next to the check makes a lot of sense to me

@xanderbailey xanderbailey force-pushed the xb/fix_aggregate_race branch 2 times, most recently from b28fce0 to 353b11b Compare November 15, 2025 15:34

ExecutionState::Done => {
// Sanity check: all groups should have been emitted by now
if !self.group_values.is_empty() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding some protection here to try and avoid bugs like this happening in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather fail the query than return the wrong / incomplete results.


// Create constrained memory to trigger early emission but not completely fail
let runtime = RuntimeEnvBuilder::default()
.with_memory_limit(1024, 1.0) // 100KB - enough to start but will trigger pressure
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1024 bytes != 100KB

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, I was playing around with this to repro the bug

@alamb
Copy link
Contributor

alamb commented Nov 19, 2025

Thank you for this PR @xanderbailey -- I plan to review it tomorrow

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @xanderbailey -- this is an amazing find and a really well done PR. It was clear, and easy to read and makes sense to me.

Thank you

// Check if we should switch to skip aggregation mode
// It's important that we do this before we early emit since we've
// already updated the probe.
if let Some(new_state) = self.switch_to_skip_aggregation()? {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read this new logic carefully and I think moving the set to switch aggregation state next to the check makes a lot of sense to me

use std::sync::Arc;

#[tokio::test]
async fn test_double_emission_race_condition_bug() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified that this test covers the code in this PR as it fails without the code changes:

thread 'aggregates::row_hash::tests::test_double_emission_race_condition_bug' (41064140) panicked at datafusion/physical-plan/src/aggregates/row_hash.rs:1354:9:
assertion `left == right` failed: Unexpected number of groups
  left: 100
 right: 1124
stack backtrace:

@alamb
Copy link
Contributor

alamb commented Nov 20, 2025

I also added this ticket to the potential content of a datafusion 51.1.0 release:

@alamb alamb added this pull request to the merge queue Nov 21, 2025
@alamb
Copy link
Contributor

alamb commented Nov 21, 2025

Thanks again @xanderbailey

Merged via the queue into apache:main with commit e6ddb48 Nov 21, 2025
32 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Major bug: Partial AggregateExec correctness issue dropping rows

3 participants