Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,11 @@ config_namespace! {
/// target batch size is determined by the configuration setting
pub coalesce_batches: bool, default = true

/// When set to true, hash joins will allow passing hashes from the build
/// side to the right side of the join. This can be useful to prune rows early on,
/// but may consume more memory.
pub hash_join_sideways_hash_passing: bool, default = false

/// Should DataFusion collect statistics when first creating a table.
/// Has no effect after the table is created. Applies to the default
/// `ListingTableProvider` in DataFusion. Defaults to true.
Expand Down
11 changes: 11 additions & 0 deletions datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,17 @@ impl SessionConfig {
self.options.execution.batch_size
}

/// Get whether sideways passing of the build side hash table is enabled
pub fn hash_join_sideways_hash_passing(&self) -> bool {
self.options.execution.hash_join_sideways_hash_passing
}

/// Get whether sideways passing of the build side hash table is enabled
pub fn with_hash_join_sideways_hash_passing(mut self, enabled: bool) -> Self {
self.options_mut().execution.hash_join_sideways_hash_passing = enabled;
self
}

/// Enables or disables the coalescence of small batches into larger batches
pub fn with_coalesce_batches(mut self, enabled: bool) -> Self {
self.options_mut().execution.coalesce_batches = enabled;
Expand Down
114 changes: 64 additions & 50 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use crate::filter_pushdown::{
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation,
};
use crate::joins::hash_join::shared_bounds::{ColumnBounds, SharedBoundsAccumulator};
use crate::joins::hash_join::information_passing::{
ColumnBounds, SharedBuildAccumulator,
};
use crate::joins::hash_join::stream::{
BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState,
};
Expand Down Expand Up @@ -81,17 +83,17 @@ use futures::TryStreamExt;
use parking_lot::Mutex;

/// Hard-coded seed to ensure hash values from the hash join differ from `RepartitionExec`, avoiding collisions.
const HASH_JOIN_SEED: RandomState =
static HASH_JOIN_SEED: RandomState =
RandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64);

/// HashTable and input data for the left (build side) of a join
pub(super) struct JoinLeftData {
/// The hash table with indices into `batch`
pub(super) hash_map: Box<dyn JoinHashMapType>,
pub(super) hash_map: Arc<dyn JoinHashMapType>,
/// The input rows for the build side
batch: RecordBatch,
/// The build side on expressions values
values: Vec<ArrayRef>,
pub(super) values: Vec<ArrayRef>,
/// Shared bitmap builder for visited left indices
visited_indices_bitmap: SharedBitmapBuilder,
/// Counter of running probe-threads, potentially
Expand All @@ -109,7 +111,7 @@ pub(super) struct JoinLeftData {
impl JoinLeftData {
/// Create a new `JoinLeftData` from its parts
pub(super) fn new(
hash_map: Box<dyn JoinHashMapType>,
hash_map: Arc<dyn JoinHashMapType>,
batch: RecordBatch,
values: Vec<ArrayRef>,
visited_indices_bitmap: SharedBitmapBuilder,
Expand Down Expand Up @@ -341,7 +343,7 @@ pub struct HashJoinExec {
/// the hash table creation.
left_fut: Arc<OnceAsync<JoinLeftData>>,
/// Shared the `RandomState` for the hashing algorithm
random_state: RandomState,
random_state: &'static RandomState,
/// Partitioning mode to use
pub mode: PartitionMode,
/// Execution metrics
Expand All @@ -366,7 +368,7 @@ struct HashJoinExecDynamicFilter {
filter: Arc<DynamicFilterPhysicalExpr>,
/// Bounds accumulator to keep track of the min/max bounds on the join keys for each partition.
/// It is lazily initialized during execution to make sure we use the actual execution time partition counts.
bounds_accumulator: OnceLock<Arc<SharedBoundsAccumulator>>,
build_accumulator: OnceLock<Arc<SharedBuildAccumulator>>,
}

impl fmt::Debug for HashJoinExec {
Expand Down Expand Up @@ -424,8 +426,6 @@ impl HashJoinExec {
let (join_schema, column_indices) =
build_join_schema(&left_schema, &right_schema, join_type);

let random_state = HASH_JOIN_SEED;

let join_schema = Arc::new(join_schema);

// check if the projection is valid
Expand All @@ -452,7 +452,7 @@ impl HashJoinExec {
join_type: *join_type,
join_schema,
left_fut: Default::default(),
random_state,
random_state: &HASH_JOIN_SEED,
mode: partition_mode,
metrics: ExecutionPlanMetricsSet::new(),
projection,
Expand Down Expand Up @@ -827,7 +827,7 @@ impl ExecutionPlan for HashJoinExec {
/// Creates a new HashJoinExec with different children while preserving configuration.
///
/// This method is called during query optimization when the optimizer creates new
/// plan nodes. Importantly, it creates a fresh bounds_accumulator via `try_new`
/// plan nodes. Importantly, it creates a fresh build_accumulator via `try_new`
/// rather than cloning the existing one because partitioning may have changed.
fn with_new_children(
self: Arc<Self>,
Expand All @@ -841,7 +841,7 @@ impl ExecutionPlan for HashJoinExec {
join_type: self.join_type,
join_schema: Arc::clone(&self.join_schema),
left_fut: Arc::clone(&self.left_fut),
random_state: self.random_state.clone(),
random_state: self.random_state,
mode: self.mode,
metrics: ExecutionPlanMetricsSet::new(),
projection: self.projection.clone(),
Expand Down Expand Up @@ -871,7 +871,7 @@ impl ExecutionPlan for HashJoinExec {
join_schema: Arc::clone(&self.join_schema),
// Reset the left_fut to allow re-execution
left_fut: Arc::new(OnceAsync::default()),
random_state: self.random_state.clone(),
random_state: self.random_state,
mode: self.mode,
metrics: ExecutionPlanMetricsSet::new(),
projection: self.projection.clone(),
Expand Down Expand Up @@ -912,43 +912,54 @@ impl ExecutionPlan for HashJoinExec {
}

let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some();
let enable_hash_collection =
context.session_config().hash_join_sideways_hash_passing();
let should_compute_bounds =
enable_dynamic_filter_pushdown && !enable_hash_collection;

let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
let left_fut = match self.mode {
PartitionMode::CollectLeft => self.left_fut.try_once(|| {
let left_stream = self.left.execute(0, Arc::clone(&context))?;

let (left_fut, reservation) = match self.mode {
PartitionMode::CollectLeft => {
let reservation =
MemoryConsumer::new("HashJoinInput").register(context.memory_pool());

Ok(collect_left_input(
self.random_state.clone(),
left_stream,
on_left.clone(),
join_metrics.clone(),
(
self.left_fut.try_once(|| {
let left_stream = self.left.execute(0, Arc::clone(&context))?;

Ok(collect_left_input(
self.random_state,
left_stream,
on_left.clone(),
join_metrics.clone(),
reservation.new_empty(),
need_produce_result_in_final(self.join_type),
self.right().output_partitioning().partition_count(),
should_compute_bounds,
))
})?,
reservation,
need_produce_result_in_final(self.join_type),
self.right().output_partitioning().partition_count(),
enable_dynamic_filter_pushdown,
))
})?,
)
}
PartitionMode::Partitioned => {
let left_stream = self.left.execute(partition, Arc::clone(&context))?;

let reservation =
MemoryConsumer::new(format!("HashJoinInput[{partition}]"))
.register(context.memory_pool());

OnceFut::new(collect_left_input(
self.random_state.clone(),
left_stream,
on_left.clone(),
join_metrics.clone(),
(
OnceFut::new(collect_left_input(
self.random_state,
left_stream,
on_left.clone(),
join_metrics.clone(),
reservation.new_empty(),
need_produce_result_in_final(self.join_type),
1,
should_compute_bounds,
)),
reservation,
need_produce_result_in_final(self.join_type),
1,
enable_dynamic_filter_pushdown,
))
)
}
PartitionMode::Auto => {
return plan_err!(
Expand All @@ -960,8 +971,8 @@ impl ExecutionPlan for HashJoinExec {

let batch_size = context.session_config().batch_size();

// Initialize bounds_accumulator lazily with runtime partition counts (only if enabled)
let bounds_accumulator = enable_dynamic_filter_pushdown
// Initialize build_accumulator lazily with runtime partition counts (only if enabled)
let build_accumulator = enable_dynamic_filter_pushdown
.then(|| {
self.dynamic_filter.as_ref().map(|df| {
let filter = Arc::clone(&df.filter);
Expand All @@ -970,13 +981,15 @@ impl ExecutionPlan for HashJoinExec {
.iter()
.map(|(_, right_expr)| Arc::clone(right_expr))
.collect::<Vec<_>>();
Some(Arc::clone(df.bounds_accumulator.get_or_init(|| {
Arc::new(SharedBoundsAccumulator::new_from_partition_mode(
Some(Arc::clone(df.build_accumulator.get_or_init(|| {
Arc::new(SharedBuildAccumulator::new_from_partition_mode(
self.mode,
self.left.as_ref(),
self.right.as_ref(),
filter,
on_right,
self.random_state,
reservation,
))
})))
})
Expand Down Expand Up @@ -1010,7 +1023,7 @@ impl ExecutionPlan for HashJoinExec {
self.filter.clone(),
self.join_type,
right_stream,
self.random_state.clone(),
self.random_state,
join_metrics,
column_indices_after_projection,
self.null_equality,
Expand All @@ -1019,8 +1032,9 @@ impl ExecutionPlan for HashJoinExec {
batch_size,
vec![],
self.right.output_ordering().is_some(),
bounds_accumulator,
build_accumulator,
self.mode,
enable_hash_collection,
)))
}

Expand Down Expand Up @@ -1171,7 +1185,7 @@ impl ExecutionPlan for HashJoinExec {
join_type: self.join_type,
join_schema: Arc::clone(&self.join_schema),
left_fut: Arc::clone(&self.left_fut),
random_state: self.random_state.clone(),
random_state: self.random_state,
mode: self.mode,
metrics: ExecutionPlanMetricsSet::new(),
projection: self.projection.clone(),
Expand All @@ -1180,7 +1194,7 @@ impl ExecutionPlan for HashJoinExec {
cache: self.cache.clone(),
dynamic_filter: Some(HashJoinExecDynamicFilter {
filter: dynamic_filter,
bounds_accumulator: OnceLock::new(),
build_accumulator: OnceLock::new(),
}),
});
result = result.with_updated_node(new_node as Arc<dyn ExecutionPlan>);
Expand Down Expand Up @@ -1329,15 +1343,15 @@ impl BuildSideState {
/// When `should_compute_bounds` is true, this function computes the min/max bounds
/// for each join key column but does NOT update the dynamic filter. Instead, the
/// bounds are stored in the returned `JoinLeftData` and later coordinated by
/// `SharedBoundsAccumulator` to ensure all partitions contribute their bounds
/// `SharedBuildAccumulator` to ensure all partitions contribute their bounds
/// before updating the filter exactly once.
///
/// # Returns
/// `JoinLeftData` containing the hash map, consolidated batch, join key values,
/// visited indices bitmap, and computed bounds (if requested).
#[allow(clippy::too_many_arguments)]
async fn collect_left_input(
random_state: RandomState,
random_state: &RandomState,
left_stream: SendableRecordBatchStream,
on_left: Vec<PhysicalExprRef>,
metrics: BuildProbeJoinMetrics,
Expand Down Expand Up @@ -1427,7 +1441,7 @@ async fn collect_left_input(
batch,
&mut *hashmap,
offset,
&random_state,
random_state,
&mut hashes_buffer,
0,
true,
Expand Down Expand Up @@ -1471,9 +1485,9 @@ async fn collect_left_input(
};

let data = JoinLeftData::new(
hashmap,
Arc::from(hashmap),
single_batch,
left_values.clone(),
left_values,
Mutex::new(visited_indices_bitmap),
AtomicUsize::new(probe_threads_count),
reservation,
Expand Down
Loading
Loading