Skip to content

Major bug: Partial AggregateExec correctness issue dropping rows #18701

@xanderbailey

Description

@xanderbailey

Describe the bug

There is a race condition in AggregateExec if two conditions are met on the same record batch.

  1. Memory pressure is high we results in emit_early_if_necessary.
    let n = self.group_values.len() / self.batch_size * self.batch_size; are emitted, leaving the remainder still in the hash table. Then it setsself.exec_state = ExecutionState::ProducingOutput(batch);
  2. skip_aggregation_probe says you should also skip aggregating in switch_to_skip_aggregation because cardinality is high. This emits the remainder into a new batch and overrides the self.exec_state = ExecutionState::ProducingOutput(batch); in the same iteration loop. Loop finishes and state moves to ProducingOutput which will return only the remainder batch.

Link to code:

self.emit_early_if_necessary()?;
self.switch_to_skip_aggregation()?;

To Reproduce

#[cfg(test)]
mod tests {
    use super::*;
    use crate::test::TestMemoryExec;
    use arrow::array::{Int32Array, Int64Array};
    use arrow::datatypes::{DataType, Field, Schema};
    use datafusion_execution::runtime_env::RuntimeEnvBuilder;
    use datafusion_execution::TaskContext;
    use datafusion_functions_aggregate::count::count_udaf;
    use datafusion_physical_expr::aggregate::AggregateExprBuilder;
    use datafusion_physical_expr::expressions::col;
    use std::sync::Arc;

    #[tokio::test]
    async fn test_double_emission_race_condition_bug() -> Result<()> {
        // This test specifically reproduces the double emission race condition
        // where emit_early_if_necessary() and switch_to_skip_aggregation()
        // both emit in the same loop iteration, causing data loss

        let schema = Arc::new(Schema::new(vec![
            Field::new("group_col", DataType::Int32, false),
            Field::new("value_col", DataType::Int64, false),
        ]));

        // Create data that will trigger BOTH conditions in the same iteration:
        // 1. More groups than batch_size (triggers early emission when memory pressure hits)
        // 2. High cardinality ratio (triggers skip aggregation)
        let batch_size = 1024; // We'll set this in session config
        let num_groups = batch_size + 100; // Slightly more than batch_size (1124 groups)

        // Create exactly 1 row per group = 100% cardinality ratio
        let group_ids: Vec<i32> = (0..num_groups as i32).collect();
        let values: Vec<i64> = vec![1; num_groups];

        let batch = RecordBatch::try_new(
            Arc::clone(&schema),
            vec![
                Arc::new(Int32Array::from(group_ids)),
                Arc::new(Int64Array::from(values)),
            ],
        )?;

        let input_partitions = vec![vec![batch]];

        // Create constrained memory to trigger early emission but not completely fail
        let runtime = RuntimeEnvBuilder::default()
            .with_memory_limit(1024, 1.0) // 100KB - enough to start but will trigger pressure
            .build_arc()?;

        let mut task_ctx = TaskContext::default().with_runtime(runtime);

        // Configure to trigger BOTH conditions:
        // 1. Low probe threshold (triggers skip probe after few rows)
        // 2. Low ratio threshold (triggers skip aggregation immediately)
        // 3. Set batch_size to 1024 so our 1124 groups will trigger early emission
        // This creates the race condition where both emit paths are triggered
        let mut session_config = task_ctx.session_config().clone();
        session_config = session_config.set(
            "datafusion.execution.batch_size",
            &datafusion_common::ScalarValue::UInt64(Some(1024)),
        );
        session_config = session_config.set(
            "datafusion.execution.skip_partial_aggregation_probe_rows_threshold",
            &datafusion_common::ScalarValue::UInt64(Some(50)),
        );
        session_config = session_config.set(
            "datafusion.execution.skip_partial_aggregation_probe_ratio_threshold",
            &datafusion_common::ScalarValue::Float64(Some(0.8)),
        );
        task_ctx = task_ctx.with_session_config(session_config);
        let task_ctx = Arc::new(task_ctx);

        // Create aggregate: COUNT(*) GROUP BY group_col
        let group_expr = vec![(col("group_col", &schema)?, "group_col".to_string())];
        let aggr_expr = vec![Arc::new(
            AggregateExprBuilder::new(count_udaf(), vec![col("value_col", &schema)?])
                .schema(Arc::clone(&schema))
                .alias("count_value")
                .build()?,
        )];

        let exec = TestMemoryExec::try_new(&input_partitions, Arc::clone(&schema), None)?;
        let exec = Arc::new(TestMemoryExec::update_cache(Arc::new(exec)));

        // Use Partial mode where the race condition occurs
        let aggregate_exec = AggregateExec::try_new(
            AggregateMode::Partial,
            PhysicalGroupBy::new_single(group_expr),
            aggr_expr,
            vec![None],
            exec,
            Arc::clone(&schema),
        )?;

        // Execute and collect results
        let mut stream = GroupedHashAggregateStream::new(&aggregate_exec, Arc::clone(&task_ctx), 0)?;
        let mut results = Vec::new();

        while let Some(result) = stream.next().await {
            let batch = result?;
            results.push(batch);
        }

        // Count total groups emitted
        let mut total_output_groups = 0;
        for batch in &results {
            total_output_groups += batch.num_rows();
        }

        // With the race condition bug:
        // 1. emit_early_if_necessary() emits first batch_size groups (1024)
        // 2. switch_to_skip_aggregation() immediately overwrites with remaining groups (100)
        // 3. The 1024 groups from step 1 are LOST!
        // 4. Only 100 groups are returned instead of 1124

        println!(
            "Double emission race condition test: Expected {} groups, got {} groups",
            num_groups, total_output_groups
        );

        if total_output_groups < num_groups / 2 {
            println!(
                "🐛 BUG REPRODUCED! Lost {} groups ({:.1}% loss) - this indicates the double emission race condition",
                num_groups - total_output_groups,
                (1.0 - total_output_groups as f64 / num_groups as f64) * 100.0
            );
        }

        // This test documents the expected behavior vs actual buggy behavior
        // TODO: Once fixed, this assertion should pass
        assert_eq!(
            total_output_groups, num_groups,
            "Double emission race condition detected! emit_early_if_necessary() result \
             was overwritten by switch_to_skip_aggregation(). Expected {} groups, got {} groups",
            num_groups, total_output_groups
        );

        Ok(())
    }

Expected behavior

No response

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions