Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,11 @@ config_namespace! {
/// target batch size is determined by the configuration setting
pub coalesce_batches: bool, default = true

/// When set to true, hash joins will allow passing hashes from the build
/// side to the right side of the join. This can be useful to prune rows early on,
/// but may consume more memory.
pub hash_join_sideways_hash_passing: bool, default = false

/// Should DataFusion collect statistics when first creating a table.
/// Has no effect after the table is created. Applies to the default
/// `ListingTableProvider` in DataFusion. Defaults to true.
Expand Down
11 changes: 11 additions & 0 deletions datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,17 @@ impl SessionConfig {
self.options.execution.batch_size
}

/// Get whether sideways passing of the build side hash table is enabled
pub fn hash_join_sideways_hash_passing(&self) -> bool {
self.options.execution.hash_join_sideways_hash_passing
}

/// Get whether sideways passing of the build side hash table is enabled
pub fn with_hash_join_sideways_hash_passing(mut self, enabled: bool) -> Self {
self.options_mut().execution.hash_join_sideways_hash_passing = enabled;
self
}

/// Enables or disables the coalescence of small batches into larger batches
pub fn with_coalesce_batches(mut self, enabled: bool) -> Self {
self.options_mut().execution.coalesce_batches = enabled;
Expand Down
114 changes: 64 additions & 50 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use crate::filter_pushdown::{
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation,
};
use crate::joins::hash_join::shared_bounds::{ColumnBounds, SharedBoundsAccumulator};
use crate::joins::hash_join::information_passing::{
ColumnBounds, SharedBuildAccumulator,
};
use crate::joins::hash_join::stream::{
BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState,
};
Expand Down Expand Up @@ -81,17 +83,17 @@ use futures::TryStreamExt;
use parking_lot::Mutex;

/// Hard-coded seed to ensure hash values from the hash join differ from `RepartitionExec`, avoiding collisions.
const HASH_JOIN_SEED: RandomState =
static HASH_JOIN_SEED: RandomState =
RandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64);

/// HashTable and input data for the left (build side) of a join
pub(super) struct JoinLeftData {
/// The hash table with indices into `batch`
pub(super) hash_map: Box<dyn JoinHashMapType>,
pub(super) hash_map: Arc<dyn JoinHashMapType>,
/// The input rows for the build side
batch: RecordBatch,
/// The build side on expressions values
values: Vec<ArrayRef>,
pub(super) values: Vec<ArrayRef>,
/// Shared bitmap builder for visited left indices
visited_indices_bitmap: SharedBitmapBuilder,
/// Counter of running probe-threads, potentially
Expand All @@ -109,7 +111,7 @@ pub(super) struct JoinLeftData {
impl JoinLeftData {
/// Create a new `JoinLeftData` from its parts
pub(super) fn new(
hash_map: Box<dyn JoinHashMapType>,
hash_map: Arc<dyn JoinHashMapType>,
batch: RecordBatch,
values: Vec<ArrayRef>,
visited_indices_bitmap: SharedBitmapBuilder,
Expand Down Expand Up @@ -341,7 +343,7 @@ pub struct HashJoinExec {
/// the hash table creation.
left_fut: Arc<OnceAsync<JoinLeftData>>,
/// Shared the `RandomState` for the hashing algorithm
random_state: RandomState,
random_state: &'static RandomState,
/// Partitioning mode to use
pub mode: PartitionMode,
/// Execution metrics
Expand All @@ -366,7 +368,7 @@ struct HashJoinExecDynamicFilter {
filter: Arc<DynamicFilterPhysicalExpr>,
/// Bounds accumulator to keep track of the min/max bounds on the join keys for each partition.
/// It is lazily initialized during execution to make sure we use the actual execution time partition counts.
bounds_accumulator: OnceLock<Arc<SharedBoundsAccumulator>>,
build_accumulator: OnceLock<Arc<SharedBuildAccumulator>>,
}

impl fmt::Debug for HashJoinExec {
Expand Down Expand Up @@ -424,8 +426,6 @@ impl HashJoinExec {
let (join_schema, column_indices) =
build_join_schema(&left_schema, &right_schema, join_type);

let random_state = HASH_JOIN_SEED;

let join_schema = Arc::new(join_schema);

// check if the projection is valid
Expand All @@ -452,7 +452,7 @@ impl HashJoinExec {
join_type: *join_type,
join_schema,
left_fut: Default::default(),
random_state,
random_state: &HASH_JOIN_SEED,
mode: partition_mode,
metrics: ExecutionPlanMetricsSet::new(),
projection,
Expand Down Expand Up @@ -844,7 +844,7 @@ impl ExecutionPlan for HashJoinExec {
/// Creates a new HashJoinExec with different children while preserving configuration.
///
/// This method is called during query optimization when the optimizer creates new
/// plan nodes. Importantly, it creates a fresh bounds_accumulator via `try_new`
/// plan nodes. Importantly, it creates a fresh build_accumulator via `try_new`
/// rather than cloning the existing one because partitioning may have changed.
fn with_new_children(
self: Arc<Self>,
Expand All @@ -858,7 +858,7 @@ impl ExecutionPlan for HashJoinExec {
join_type: self.join_type,
join_schema: Arc::clone(&self.join_schema),
left_fut: Arc::clone(&self.left_fut),
random_state: self.random_state.clone(),
random_state: self.random_state,
mode: self.mode,
metrics: ExecutionPlanMetricsSet::new(),
projection: self.projection.clone(),
Expand Down Expand Up @@ -888,7 +888,7 @@ impl ExecutionPlan for HashJoinExec {
join_schema: Arc::clone(&self.join_schema),
// Reset the left_fut to allow re-execution
left_fut: Arc::new(OnceAsync::default()),
random_state: self.random_state.clone(),
random_state: self.random_state,
mode: self.mode,
metrics: ExecutionPlanMetricsSet::new(),
projection: self.projection.clone(),
Expand Down Expand Up @@ -929,43 +929,54 @@ impl ExecutionPlan for HashJoinExec {
}

let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some();
let enable_hash_collection =
context.session_config().hash_join_sideways_hash_passing();
let should_compute_bounds =
enable_dynamic_filter_pushdown && !enable_hash_collection;

let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
let left_fut = match self.mode {
PartitionMode::CollectLeft => self.left_fut.try_once(|| {
let left_stream = self.left.execute(0, Arc::clone(&context))?;

let (left_fut, reservation) = match self.mode {
PartitionMode::CollectLeft => {
let reservation =
MemoryConsumer::new("HashJoinInput").register(context.memory_pool());

Ok(collect_left_input(
self.random_state.clone(),
left_stream,
on_left.clone(),
join_metrics.clone(),
(
self.left_fut.try_once(|| {
let left_stream = self.left.execute(0, Arc::clone(&context))?;

Ok(collect_left_input(
self.random_state,
left_stream,
on_left.clone(),
join_metrics.clone(),
reservation.new_empty(),
need_produce_result_in_final(self.join_type),
self.right().output_partitioning().partition_count(),
should_compute_bounds,
))
})?,
reservation,
need_produce_result_in_final(self.join_type),
self.right().output_partitioning().partition_count(),
enable_dynamic_filter_pushdown,
))
})?,
)
}
PartitionMode::Partitioned => {
let left_stream = self.left.execute(partition, Arc::clone(&context))?;

let reservation =
MemoryConsumer::new(format!("HashJoinInput[{partition}]"))
.register(context.memory_pool());

OnceFut::new(collect_left_input(
self.random_state.clone(),
left_stream,
on_left.clone(),
join_metrics.clone(),
(
OnceFut::new(collect_left_input(
self.random_state,
left_stream,
on_left.clone(),
join_metrics.clone(),
reservation.new_empty(),
need_produce_result_in_final(self.join_type),
1,
should_compute_bounds,
)),
reservation,
need_produce_result_in_final(self.join_type),
1,
enable_dynamic_filter_pushdown,
))
)
}
PartitionMode::Auto => {
return plan_err!(
Expand All @@ -977,8 +988,8 @@ impl ExecutionPlan for HashJoinExec {

let batch_size = context.session_config().batch_size();

// Initialize bounds_accumulator lazily with runtime partition counts (only if enabled)
let bounds_accumulator = enable_dynamic_filter_pushdown
// Initialize build_accumulator lazily with runtime partition counts (only if enabled)
let build_accumulator = enable_dynamic_filter_pushdown
.then(|| {
self.dynamic_filter.as_ref().map(|df| {
let filter = Arc::clone(&df.filter);
Expand All @@ -987,13 +998,15 @@ impl ExecutionPlan for HashJoinExec {
.iter()
.map(|(_, right_expr)| Arc::clone(right_expr))
.collect::<Vec<_>>();
Some(Arc::clone(df.bounds_accumulator.get_or_init(|| {
Arc::new(SharedBoundsAccumulator::new_from_partition_mode(
Some(Arc::clone(df.build_accumulator.get_or_init(|| {
Arc::new(SharedBuildAccumulator::new_from_partition_mode(
self.mode,
self.left.as_ref(),
self.right.as_ref(),
filter,
on_right,
self.random_state,
reservation,
))
})))
})
Expand Down Expand Up @@ -1027,7 +1040,7 @@ impl ExecutionPlan for HashJoinExec {
self.filter.clone(),
self.join_type,
right_stream,
self.random_state.clone(),
self.random_state,
join_metrics,
column_indices_after_projection,
self.null_equality,
Expand All @@ -1036,8 +1049,9 @@ impl ExecutionPlan for HashJoinExec {
batch_size,
vec![],
self.right.output_ordering().is_some(),
bounds_accumulator,
build_accumulator,
self.mode,
enable_hash_collection,
)))
}

Expand Down Expand Up @@ -1188,7 +1202,7 @@ impl ExecutionPlan for HashJoinExec {
join_type: self.join_type,
join_schema: Arc::clone(&self.join_schema),
left_fut: Arc::clone(&self.left_fut),
random_state: self.random_state.clone(),
random_state: self.random_state,
mode: self.mode,
metrics: ExecutionPlanMetricsSet::new(),
projection: self.projection.clone(),
Expand All @@ -1197,7 +1211,7 @@ impl ExecutionPlan for HashJoinExec {
cache: self.cache.clone(),
dynamic_filter: Some(HashJoinExecDynamicFilter {
filter: dynamic_filter,
bounds_accumulator: OnceLock::new(),
build_accumulator: OnceLock::new(),
}),
});
result = result.with_updated_node(new_node as Arc<dyn ExecutionPlan>);
Expand Down Expand Up @@ -1346,15 +1360,15 @@ impl BuildSideState {
/// When `should_compute_bounds` is true, this function computes the min/max bounds
/// for each join key column but does NOT update the dynamic filter. Instead, the
/// bounds are stored in the returned `JoinLeftData` and later coordinated by
/// `SharedBoundsAccumulator` to ensure all partitions contribute their bounds
/// `SharedBuildAccumulator` to ensure all partitions contribute their bounds
/// before updating the filter exactly once.
///
/// # Returns
/// `JoinLeftData` containing the hash map, consolidated batch, join key values,
/// visited indices bitmap, and computed bounds (if requested).
#[allow(clippy::too_many_arguments)]
async fn collect_left_input(
random_state: RandomState,
random_state: &RandomState,
left_stream: SendableRecordBatchStream,
on_left: Vec<PhysicalExprRef>,
metrics: BuildProbeJoinMetrics,
Expand Down Expand Up @@ -1444,7 +1458,7 @@ async fn collect_left_input(
batch,
&mut *hashmap,
offset,
&random_state,
random_state,
&mut hashes_buffer,
0,
true,
Expand Down Expand Up @@ -1488,9 +1502,9 @@ async fn collect_left_input(
};

let data = JoinLeftData::new(
hashmap,
Arc::from(hashmap),
single_batch,
left_values.clone(),
left_values,
Mutex::new(visited_indices_bitmap),
AtomicUsize::new(probe_threads_count),
reservation,
Expand Down
Loading