Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions src/query/service/src/physical_plans/physical_aggregate_final.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,23 +208,18 @@ impl PhysicalPlanBuilder {
&mut self,
s_expr: &SExpr,
agg: &Aggregate,
mut required: ColumnSet,
required: ColumnSet,
stat_info: PlanStatsInfo,
) -> Result<PhysicalPlan> {
// 1. Prune unused Columns.
let mut used = vec![];
for item in &agg.aggregate_functions {
if required.contains(&item.index) {
required.extend(item.scalar.used_columns());
used.push(item.clone());
}
}

agg.group_items.iter().for_each(|i| {
// If the group item comes from a complex expression, we only include the final
// column index here. The used columns will be included in its EvalScalar child.
required.insert(i.index);
});
let child_required = self.derive_single_child_required_columns(s_expr, &required)?;

// single key without aggregation
if agg.group_items.is_empty() && used.is_empty() {
Expand All @@ -245,7 +240,7 @@ impl PhysicalPlanBuilder {
};

// 2. Build physical plan.
let input = self.build(s_expr.child(0)?, required).await?;
let input = self.build(s_expr.child(0)?, child_required).await?;
let input_schema = input.output_schema()?;
let group_items = agg.group_items.iter().map(|v| v.index).collect::<Vec<_>>();

Expand Down
9 changes: 5 additions & 4 deletions src/query/service/src/physical_plans/physical_async_func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,23 +132,24 @@ impl PhysicalPlanBuilder {
&mut self,
s_expr: &SExpr,
async_func_plan: &databend_common_sql::plans::AsyncFunction,
mut required: ColumnSet,
required: ColumnSet,
stat_info: PlanStatsInfo,
) -> Result<PhysicalPlan> {
// 1. Prune unused Columns.
let mut used = vec![];
for item in async_func_plan.items.iter() {
if required.contains(&item.index) {
required.extend(item.scalar.used_columns());
used.push(item.clone());
}
}

let child_required = self.derive_single_child_required_columns(s_expr, &required)?;

// 2. Build physical plan.
if used.is_empty() {
return self.build(s_expr.child(0)?, required).await;
return self.build(s_expr.child(0)?, child_required).await;
}
let input = self.build(s_expr.child(0)?, required).await?;
let input = self.build(s_expr.child(0)?, child_required).await?;
let input_schema = input.output_schema()?;

let async_func_descs = used
Expand Down
53 changes: 50 additions & 3 deletions src/query/service/src/physical_plans/physical_cte_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::any::Any;
use std::collections::HashMap;

use databend_common_exception::Result;
use databend_common_expression::DataField;
Expand Down Expand Up @@ -93,11 +94,57 @@ impl PhysicalPlanBuilder {
cte_consumer: &databend_common_sql::plans::MaterializedCTERef,
stat_info: PlanStatsInfo,
) -> Result<PhysicalPlan> {
let mut fields = Vec::new();
let def_to_ref = cte_consumer
.column_mapping
.iter()
.map(|(k, v)| (*v, *k))
.collect::<HashMap<_, _>>();
let cte_required_columns = self
.cte_required_columns
.get(&cte_consumer.cte_name)
.ok_or_else(|| {
databend_common_exception::ErrorCode::Internal(format!(
"CTE required columns not found for CTE name: {}",
cte_consumer.cte_name
))
})?;

let metadata = self.metadata.read();
let mut cte_output_columns = Vec::with_capacity(cte_required_columns.len());
for c in cte_required_columns.iter() {
let index = def_to_ref.get(c).ok_or_else(|| {
// Build detailed error message with column names
let required_cols: Vec<String> = cte_required_columns
.iter()
.map(|idx| {
let col = metadata.column(*idx);
format!("{}({})", col.name(), idx)
})
.collect();

let available_mappings: Vec<String> = def_to_ref
.iter()
.map(|(def_idx, ref_idx)| {
let def_col = metadata.column(*def_idx);
let ref_col = metadata.column(*ref_idx);
format!("{}({}) -> {}({})", def_col.name(), def_idx, ref_col.name(), ref_idx)
})
.collect();

let current_col = metadata.column(*c);
databend_common_exception::ErrorCode::Internal(format!(
"Column mapping not found for column {}({}) in CTE: {}.\nRequired columns: [{}]\nAvailable mappings: [{}]",
current_col.name(), c, cte_consumer.cte_name,
required_cols.join(", "),
available_mappings.join(", ")
))
})?;
cte_output_columns.push(index);
}
let mut fields = Vec::new();

for index in &cte_consumer.output_columns {
let column = metadata.column(*index);
for index in cte_output_columns.iter() {
let column = metadata.column(**index);
let data_type = column.data_type();
fields.push(DataField::new(&index.to_string(), data_type));
}
Expand Down
10 changes: 3 additions & 7 deletions src/query/service/src/physical_plans/physical_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,13 @@ impl PhysicalPlanBuilder {
&mut self,
s_expr: &SExpr,
exchange: &databend_common_sql::plans::Exchange,
mut required: ColumnSet,
required: ColumnSet,
) -> Result<PhysicalPlan> {
// 1. Prune unused Columns.
if let databend_common_sql::plans::Exchange::Hash(exprs) = exchange {
for expr in exprs {
required.extend(expr.used_columns());
}
}
let child_required = self.derive_single_child_required_columns(s_expr, &required)?;

// 2. Build physical plan.
let input = self.build(s_expr.child(0)?, required).await?;
let input = self.build(s_expr.child(0)?, child_required).await?;
let input_schema = input.output_schema()?;
let mut keys = vec![];
let mut allow_adjust_parallelism = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ impl PhysicalPlanBuilder {
scan: &databend_common_sql::plans::ExpressionScan,
required: ColumnSet,
) -> Result<PhysicalPlan> {
let input = self.build(s_expr.child(0)?, required).await?;
let child_required = self.derive_single_child_required_columns(s_expr, &required)?;
let input = self.build(s_expr.child(0)?, child_required).await?;
let input_schema = input.output_schema()?;

let values = scan
Expand Down
6 changes: 2 additions & 4 deletions src/query/service/src/physical_plans/physical_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,10 @@ impl PhysicalPlanBuilder {
stat_info: PlanStatsInfo,
) -> Result<PhysicalPlan> {
// 1. Prune unused Columns.
let used = filter.predicates.iter().fold(required.clone(), |acc, v| {
acc.union(&v.used_columns()).cloned().collect()
});
let child_required = self.derive_single_child_required_columns(s_expr, &required)?;

// 2. Build physical plan.
let input = self.build(s_expr.child(0)?, used).await?;
let input = self.build(s_expr.child(0)?, child_required).await?;
required = required
.union(self.metadata.read().get_retained_column())
.cloned()
Expand Down
26 changes: 4 additions & 22 deletions src/query/service/src/physical_plans/physical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,28 +150,10 @@ impl PhysicalPlanBuilder {
others_required.insert(*column);
}
}

// Include columns referenced in left conditions and right conditions.
let left_required: ColumnSet = join
.equi_conditions
.iter()
.fold(required.clone(), |acc, v| {
acc.union(&v.left.used_columns()).cloned().collect()
})
.union(&others_required)
.cloned()
.collect();
let right_required: ColumnSet = join
.equi_conditions
.iter()
.fold(required.clone(), |acc, v| {
acc.union(&v.right.used_columns()).cloned().collect()
})
.union(&others_required)
.cloned()
.collect();
let left_required = left_required.union(&others_required).cloned().collect();
let right_required = right_required.union(&others_required).cloned().collect();
let mut child_required = self.derive_children_required_columns(s_expr, &required)?;
debug_assert_eq!(child_required.len(), s_expr.arity());
let left_required = child_required.remove(0);
let right_required = child_required.remove(0);

// 2. Build physical plan.
// Choose physical join type by join conditions
Expand Down
3 changes: 2 additions & 1 deletion src/query/service/src/physical_plans/physical_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ impl PhysicalPlanBuilder {
}

// 2. Build physical plan.
let input_plan = self.build(s_expr.child(0)?, required).await?;
let child_required = self.derive_single_child_required_columns(s_expr, &required)?;
let input_plan = self.build(s_expr.child(0)?, child_required).await?;
if limit.before_exchange || limit.lazy_columns.is_empty() || !support_lazy_materialize {
return Ok(PhysicalPlan::new(Limit {
input: input_plan,
Expand Down
45 changes: 26 additions & 19 deletions src/query/service/src/physical_plans/physical_materialized_cte.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ use std::any::Any;

use databend_common_exception::Result;
use databend_common_expression::DataSchemaRef;
use databend_common_sql::optimizer::ir::RelExpr;
use databend_common_pipeline_transforms::TransformPipelineHelper;
use databend_common_sql::evaluator::BlockOperator;
use databend_common_sql::evaluator::CompoundBlockOperator;
use databend_common_sql::optimizer::ir::SExpr;
use databend_common_sql::ColumnBinding;

use crate::physical_plans::explain::PlanStatsInfo;
use crate::physical_plans::format::MaterializedCTEFormatter;
Expand All @@ -38,7 +39,7 @@ pub struct MaterializedCTE {
pub stat_info: Option<PlanStatsInfo>,
pub input: PhysicalPlan,
pub cte_name: String,
pub cte_output_columns: Option<Vec<ColumnBinding>>,
pub cte_output_columns: Option<Vec<usize>>,
pub ref_count: usize,
pub channel_size: Option<usize>,
pub meta: PhysicalPlanMeta,
Expand Down Expand Up @@ -95,13 +96,20 @@ impl IPhysicalPlan for MaterializedCTE {

let input_schema = self.input.output_schema()?;
if let Some(output_columns) = &self.cte_output_columns {
PipelineBuilder::build_result_projection(
&builder.func_ctx,
input_schema,
output_columns,
&mut builder.main_pipeline,
false,
)?;
let mut projections = Vec::with_capacity(output_columns.len());
for index in output_columns {
projections.push(input_schema.index_of(index.to_string().as_str())?);
}
let num_input_columns = input_schema.num_fields();
builder.main_pipeline.add_transformer(|| {
CompoundBlockOperator::new(
vec![BlockOperator::Project {
projection: projections.clone(),
}],
builder.func_ctx.clone(),
num_input_columns,
)
});
}

builder.main_pipeline.try_resize(1)?;
Expand All @@ -123,20 +131,19 @@ impl PhysicalPlanBuilder {
materialized_cte: &databend_common_sql::plans::MaterializedCTE,
stat_info: PlanStatsInfo,
) -> Result<PhysicalPlan> {
let required = match &materialized_cte.cte_output_columns {
Some(o) => o.iter().map(|c| c.index).collect(),
None => RelExpr::with_s_expr(s_expr.child(0)?)
.derive_relational_prop()?
.output_columns
.clone(),
};
let input = self.build(s_expr.child(0)?, required).await?;
let required = self
.cte_required_columns
.get(&materialized_cte.cte_name)
.unwrap()
.clone();
let cte_output_columns = Some(required.iter().copied().collect());
let input = self.build_physical_plan(s_expr.child(0)?, required).await?;
Ok(PhysicalPlan::new(MaterializedCTE {
plan_id: 0,
stat_info: Some(stat_info),
input,
cte_name: materialized_cte.cte_name.clone(),
cte_output_columns: materialized_cte.cte_output_columns.clone(),
cte_output_columns,
ref_count: materialized_cte.ref_count,
channel_size: materialized_cte.channel_size,
meta: PhysicalPlanMeta::new("MaterializedCTE"),
Expand Down
3 changes: 2 additions & 1 deletion src/query/service/src/physical_plans/physical_mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,8 @@ impl PhysicalPlanBuilder {
let udf_col_num = required_udf_ids.len();
required.extend(required_udf_ids);

let mut plan = self.build(s_expr.child(0)?, required).await?;
let child_required = self.derive_single_child_required_columns(s_expr, &required)?;
let mut plan = self.build(s_expr.child(0)?, child_required).await?;
if *no_effect {
return Ok(plan);
}
Expand Down
Loading
Loading