Skip to content

Commit 67b1da8

Browse files
korowaalamb
andauthored
Parquet parallel scan (#5057)
* parallel parquet scanning * repartitioning ParquetExec * minor changes & review comments * settings reorganized * Apply suggestions from code review Co-authored-by: Andrew Lamb <[email protected]> * additional test case & updated docs --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 74b05fa commit 67b1da8

File tree

7 files changed

+630
-41
lines changed

7 files changed

+630
-41
lines changed

datafusion/common/src/config.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,10 +261,19 @@ config_namespace! {
261261
/// in parallel using the provided `target_partitions` level"
262262
pub repartition_aggregations: bool, default = true
263263

264+
/// Minimum total files size in bytes to perform file scan repartitioning.
265+
pub repartition_file_min_size: usize, default = 10 * 1024 * 1024
266+
264267
/// Should DataFusion repartition data using the join keys to execute joins in parallel
265268
/// using the provided `target_partitions` level"
266269
pub repartition_joins: bool, default = true
267270

271+
/// When set to true, file groups will be repartitioned to achieve maximum parallelism.
272+
/// Currently supported only for Parquet format in which case
273+
/// multiple row groups from the same file may be read concurrently. If false then each
274+
/// row group is read serially, though different files may be read in parallel.
275+
pub repartition_file_scans: bool, default = false
276+
268277
/// Should DataFusion repartition data using the partitions keys to execute window
269278
/// functions in parallel using the provided `target_partitions` level"
270279
pub repartition_windows: bool, default = true

datafusion/core/src/execution/context.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1251,6 +1251,18 @@ impl SessionConfig {
12511251
self
12521252
}
12531253

1254+
/// Sets minimum file range size for repartitioning scans
1255+
pub fn with_repartition_file_min_size(mut self, size: usize) -> Self {
1256+
self.options.optimizer.repartition_file_min_size = size;
1257+
self
1258+
}
1259+
1260+
/// Enables or disables the use of repartitioning for file scans
1261+
pub fn with_repartition_file_scans(mut self, enabled: bool) -> Self {
1262+
self.options.optimizer.repartition_file_scans = enabled;
1263+
self
1264+
}
1265+
12541266
/// Enables or disables the use of repartitioning for window functions to improve parallelism
12551267
pub fn with_repartition_windows(mut self, enabled: bool) -> Self {
12561268
self.options.optimizer.repartition_windows = enabled;

datafusion/core/src/physical_optimizer/repartition.rs

Lines changed: 252 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ use crate::config::ConfigOptions;
2323
use crate::error::Result;
2424
use crate::physical_plan::Partitioning::*;
2525
use crate::physical_plan::{
26-
repartition::RepartitionExec, with_new_children_if_necessary, ExecutionPlan,
26+
file_format::ParquetExec, repartition::RepartitionExec,
27+
with_new_children_if_necessary, ExecutionPlan,
2728
};
2829

2930
/// Optimizer that introduces repartition to introduce more
@@ -167,6 +168,8 @@ fn optimize_partitions(
167168
is_root: bool,
168169
can_reorder: bool,
169170
would_benefit: bool,
171+
repartition_file_scans: bool,
172+
repartition_file_min_size: usize,
170173
) -> Result<Arc<dyn ExecutionPlan>> {
171174
// Recurse into children bottom-up (attempt to repartition as
172175
// early as possible)
@@ -199,6 +202,8 @@ fn optimize_partitions(
199202
false, // child is not root
200203
can_reorder_child,
201204
plan.benefits_from_input_partitioning(),
205+
repartition_file_scans,
206+
repartition_file_min_size,
202207
)
203208
})
204209
.collect::<Result<_>>()?;
@@ -227,14 +232,28 @@ fn optimize_partitions(
227232
could_repartition = false;
228233
}
229234

230-
if would_benefit && could_repartition && can_reorder {
231-
Ok(Arc::new(RepartitionExec::try_new(
232-
new_plan,
233-
RoundRobinBatch(target_partitions),
234-
)?))
235-
} else {
236-
Ok(new_plan)
235+
let repartition_allowed = would_benefit && could_repartition && can_reorder;
236+
237+
// If repartition is not allowed - return plan as it is
238+
if !repartition_allowed {
239+
return Ok(new_plan);
240+
}
241+
242+
// For ParquetExec return internally repartitioned version of the plan in case `repartition_file_scans` is set
243+
if let Some(parquet_exec) = new_plan.as_any().downcast_ref::<ParquetExec>() {
244+
if repartition_file_scans {
245+
return Ok(Arc::new(
246+
parquet_exec
247+
.get_repartitioned(target_partitions, repartition_file_min_size),
248+
));
249+
}
237250
}
251+
252+
// Otherwise - return plan wrapped up in RepartitionExec
253+
Ok(Arc::new(RepartitionExec::try_new(
254+
new_plan,
255+
RoundRobinBatch(target_partitions),
256+
)?))
238257
}
239258

240259
/// Returns true if `plan` requires any of inputs to be sorted in some
@@ -253,6 +272,8 @@ impl PhysicalOptimizerRule for Repartition {
253272
) -> Result<Arc<dyn ExecutionPlan>> {
254273
let target_partitions = config.execution.target_partitions;
255274
let enabled = config.optimizer.enable_round_robin_repartition;
275+
let repartition_file_scans = config.optimizer.repartition_file_scans;
276+
let repartition_file_min_size = config.optimizer.repartition_file_min_size;
256277
// Don't run optimizer if target_partitions == 1
257278
if !enabled || target_partitions == 1 {
258279
Ok(plan)
@@ -266,6 +287,8 @@ impl PhysicalOptimizerRule for Repartition {
266287
is_root,
267288
can_reorder,
268289
would_benefit,
290+
repartition_file_scans,
291+
repartition_file_min_size,
269292
)
270293
}
271294
}
@@ -331,6 +354,28 @@ mod tests {
331354
))
332355
}
333356

357+
/// Create a non sorted parquet exec over two files / partitions
358+
fn parquet_exec_two_partitions() -> Arc<ParquetExec> {
359+
Arc::new(ParquetExec::new(
360+
FileScanConfig {
361+
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
362+
file_schema: schema(),
363+
file_groups: vec![
364+
vec![PartitionedFile::new("x".to_string(), 100)],
365+
vec![PartitionedFile::new("y".to_string(), 200)],
366+
],
367+
statistics: Statistics::default(),
368+
projection: None,
369+
limit: None,
370+
table_partition_cols: vec![],
371+
output_ordering: None,
372+
infinite_source: false,
373+
},
374+
None,
375+
None,
376+
))
377+
}
378+
334379
// Created a sorted parquet exec
335380
fn parquet_exec_sorted() -> Arc<ParquetExec> {
336381
let sort_exprs = vec![PhysicalSortExpr {
@@ -448,10 +493,16 @@ mod tests {
448493
/// Runs the repartition optimizer and asserts the plan against the expected
449494
macro_rules! assert_optimized {
450495
($EXPECTED_LINES: expr, $PLAN: expr) => {
496+
assert_optimized!($EXPECTED_LINES, $PLAN, 10, false, 1024);
497+
};
498+
499+
($EXPECTED_LINES: expr, $PLAN: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => {
451500
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();
452501

453502
let mut config = ConfigOptions::new();
454-
config.execution.target_partitions = 10;
503+
config.execution.target_partitions = $TARGET_PARTITIONS;
504+
config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS;
505+
config.optimizer.repartition_file_min_size = $REPARTITION_FILE_MIN_SIZE;
455506

456507
// run optimizer
457508
let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
@@ -846,6 +897,198 @@ mod tests {
846897
Ok(())
847898
}
848899

900+
#[test]
901+
fn parallelization_single_partition() -> Result<()> {
902+
let plan = aggregate(parquet_exec());
903+
904+
let expected = [
905+
"AggregateExec: mode=Final, gby=[], aggr=[]",
906+
"CoalescePartitionsExec",
907+
"AggregateExec: mode=Partial, gby=[], aggr=[]",
908+
"ParquetExec: limit=None, partitions={2 groups: [[x:0..50], [x:50..100]]}, projection=[c1]",
909+
];
910+
911+
assert_optimized!(expected, plan, 2, true, 10);
912+
Ok(())
913+
}
914+
915+
#[test]
916+
fn parallelization_two_partitions() -> Result<()> {
917+
let plan = aggregate(parquet_exec_two_partitions());
918+
919+
let expected = [
920+
"AggregateExec: mode=Final, gby=[], aggr=[]",
921+
"CoalescePartitionsExec",
922+
"AggregateExec: mode=Partial, gby=[], aggr=[]",
923+
// Plan already has two partitions
924+
"ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, projection=[c1]",
925+
];
926+
927+
assert_optimized!(expected, plan, 2, true, 10);
928+
Ok(())
929+
}
930+
931+
#[test]
932+
fn parallelization_two_partitions_into_four() -> Result<()> {
933+
let plan = aggregate(parquet_exec_two_partitions());
934+
935+
let expected = [
936+
"AggregateExec: mode=Final, gby=[], aggr=[]",
937+
"CoalescePartitionsExec",
938+
"AggregateExec: mode=Partial, gby=[], aggr=[]",
939+
// Multiple source files splitted across partitions
940+
"ParquetExec: limit=None, partitions={4 groups: [[x:0..75], [x:75..100, y:0..50], [y:50..125], [y:125..200]]}, projection=[c1]",
941+
];
942+
943+
assert_optimized!(expected, plan, 4, true, 10);
944+
Ok(())
945+
}
946+
947+
#[test]
948+
fn parallelization_sorted_limit() -> Result<()> {
949+
let plan = limit_exec(sort_exec(parquet_exec(), false));
950+
951+
let expected = &[
952+
"GlobalLimitExec: skip=0, fetch=100",
953+
"LocalLimitExec: fetch=100",
954+
// data is sorted so can't repartition here
955+
"SortExec: [c1@0 ASC]",
956+
// Doesn't parallelize for SortExec without preserve_partitioning
957+
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
958+
];
959+
960+
assert_optimized!(expected, plan, 2, true, 10);
961+
Ok(())
962+
}
963+
964+
#[test]
965+
fn parallelization_limit_with_filter() -> Result<()> {
966+
let plan = limit_exec(filter_exec(sort_exec(parquet_exec(), false)));
967+
968+
let expected = &[
969+
"GlobalLimitExec: skip=0, fetch=100",
970+
"LocalLimitExec: fetch=100",
971+
"FilterExec: c1@0",
972+
// data is sorted so can't repartition here even though
973+
// filter would benefit from parallelism, the answers might be wrong
974+
"SortExec: [c1@0 ASC]",
975+
// SortExec doesn't benefit from input partitioning
976+
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
977+
];
978+
979+
assert_optimized!(expected, plan, 2, true, 10);
980+
Ok(())
981+
}
982+
983+
#[test]
984+
fn parallelization_ignores_limit() -> Result<()> {
985+
let plan = aggregate(limit_exec(filter_exec(limit_exec(parquet_exec()))));
986+
987+
let expected = &[
988+
"AggregateExec: mode=Final, gby=[], aggr=[]",
989+
"CoalescePartitionsExec",
990+
"AggregateExec: mode=Partial, gby=[], aggr=[]",
991+
"RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
992+
"GlobalLimitExec: skip=0, fetch=100",
993+
"CoalescePartitionsExec",
994+
"LocalLimitExec: fetch=100",
995+
"FilterExec: c1@0",
996+
// repartition should happen prior to the filter to maximize parallelism
997+
"RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
998+
"GlobalLimitExec: skip=0, fetch=100",
999+
// Limit doesn't benefit from input partitionins - no parallelism
1000+
"LocalLimitExec: fetch=100",
1001+
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
1002+
];
1003+
1004+
assert_optimized!(expected, plan, 2, true, 10);
1005+
Ok(())
1006+
}
1007+
1008+
#[test]
1009+
fn parallelization_union_inputs() -> Result<()> {
1010+
let plan = union_exec(vec![parquet_exec(); 5]);
1011+
1012+
let expected = &[
1013+
"UnionExec",
1014+
// Union doesn benefit from input partitioning - no parallelism
1015+
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
1016+
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
1017+
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
1018+
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
1019+
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
1020+
];
1021+
1022+
assert_optimized!(expected, plan, 2, true, 10);
1023+
Ok(())
1024+
}
1025+
1026+
#[test]
1027+
fn parallelization_prior_to_sort_preserving_merge() -> Result<()> {
1028+
// sort preserving merge already sorted input,
1029+
let plan = sort_preserving_merge_exec(parquet_exec_sorted());
1030+
1031+
// parallelization potentially could break sort order
1032+
let expected = &[
1033+
"SortPreservingMergeExec: [c1@0 ASC]",
1034+
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
1035+
];
1036+
1037+
assert_optimized!(expected, plan, 2, true, 10);
1038+
Ok(())
1039+
}
1040+
1041+
#[test]
1042+
fn parallelization_sort_preserving_merge_with_union() -> Result<()> {
1043+
// 2 sorted parquet files unioned (partitions are concatenated, sort is preserved)
1044+
let input = union_exec(vec![parquet_exec_sorted(); 2]);
1045+
let plan = sort_preserving_merge_exec(input);
1046+
1047+
// should not repartition / sort (as the data was already sorted)
1048+
let expected = &[
1049+
"SortPreservingMergeExec: [c1@0 ASC]",
1050+
"UnionExec",
1051+
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
1052+
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
1053+
];
1054+
1055+
assert_optimized!(expected, plan, 2, true, 10);
1056+
Ok(())
1057+
}
1058+
1059+
#[test]
1060+
fn parallelization_does_not_destroy_sort() -> Result<()> {
1061+
// SortRequired
1062+
// Parquet(sorted)
1063+
1064+
let plan = sort_required_exec(parquet_exec_sorted());
1065+
1066+
// no parallelization to preserve sort order
1067+
let expected = &[
1068+
"SortRequiredExec",
1069+
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
1070+
];
1071+
1072+
assert_optimized!(expected, plan, 2, true, 10);
1073+
Ok(())
1074+
}
1075+
1076+
#[test]
1077+
fn parallelization_ignores_transitively_with_projection() -> Result<()> {
1078+
// sorted input
1079+
let plan = sort_preserving_merge_exec(projection_exec(parquet_exec_sorted()));
1080+
1081+
// data should not be repartitioned / resorted
1082+
let expected = &[
1083+
"SortPreservingMergeExec: [c1@0 ASC]",
1084+
"ProjectionExec: expr=[c1@0 as c1]",
1085+
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
1086+
];
1087+
1088+
assert_optimized!(expected, plan, 2, true, 10);
1089+
Ok(())
1090+
}
1091+
8491092
/// Models operators like BoundedWindowExec that require an input
8501093
/// ordering but is easy to construct
8511094
#[derive(Debug)]

datafusion/core/src/physical_plan/file_format/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,10 @@ impl<'a> Display for FileGroupsDisplay<'a> {
207207
first_file = false;
208208

209209
write!(f, "{}", pf.object_meta.location.as_ref())?;
210+
211+
if let Some(range) = pf.range.as_ref() {
212+
write!(f, ":{}..{}", range.start, range.end)?;
213+
}
210214
}
211215
write!(f, "]")?;
212216
}

0 commit comments

Comments
 (0)