Skip to content

Refactor Async UDF rewriting in physical planner #18150

@Jefffrey

Description

@Jefffrey

Is your feature request related to a problem or challenge?

#14837 introduced support for async UDFs. As part of it, in the physical planner it rewrites projections, filters (and inputs to aggregations by #17619).

Projection:

match self.try_plan_async_exprs(
num_input_columns,
PlannedExprResult::ExprWithName(physical_exprs),
input_physical_schema.as_ref(),
)? {
PlanAsyncExpr::Sync(PlannedExprResult::ExprWithName(physical_exprs)) => {
let proj_exprs: Vec<ProjectionExpr> = physical_exprs
.into_iter()
.map(|(expr, alias)| ProjectionExpr { expr, alias })
.collect();
Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input_exec)?))
}
PlanAsyncExpr::Async(
async_map,
PlannedExprResult::ExprWithName(physical_exprs),
) => {
let async_exec =
AsyncFuncExec::try_new(async_map.async_exprs, input_exec)?;
let proj_exprs: Vec<ProjectionExpr> = physical_exprs
.into_iter()
.map(|(expr, alias)| ProjectionExpr { expr, alias })
.collect();
let new_proj_exec =
ProjectionExec::try_new(proj_exprs, Arc::new(async_exec))?;
Ok(Arc::new(new_proj_exec))
}
_ => internal_err!("Unexpected PlanAsyncExpressions variant"),
}

Filter:

LogicalPlan::Filter(Filter {
predicate, input, ..
}) => {
let physical_input = children.one()?;
let input_dfschema = input.schema();
let runtime_expr =
self.create_physical_expr(predicate, input_dfschema, session_state)?;
let input_schema = input.schema();
let filter = match self.try_plan_async_exprs(
input_schema.fields().len(),
PlannedExprResult::Expr(vec![runtime_expr]),
input_schema.as_arrow(),
)? {
PlanAsyncExpr::Sync(PlannedExprResult::Expr(runtime_expr)) => {
FilterExec::try_new(Arc::clone(&runtime_expr[0]), physical_input)?
}
PlanAsyncExpr::Async(
async_map,
PlannedExprResult::Expr(runtime_expr),
) => {
let async_exec = AsyncFuncExec::try_new(
async_map.async_exprs,
physical_input,
)?;
FilterExec::try_new(
Arc::clone(&runtime_expr[0]),
Arc::new(async_exec),
)?
// project the output columns excluding the async functions
// The async functions are always appended to the end of the schema.
.with_projection(Some(
(0..input.schema().fields().len()).collect(),
))?
}
_ => {
return internal_err!(
"Unexpected result from try_plan_async_exprs"
)
}
};

Aggregation input:

for agg_func in &mut aggregates {
match self.try_plan_async_exprs(
num_input_columns,
PlannedExprResult::Expr(agg_func.expressions()),
physical_input_schema.as_ref(),
)? {
PlanAsyncExpr::Async(
async_map,
PlannedExprResult::Expr(physical_exprs),
) => {
async_exprs.extend(async_map.async_exprs);
if let Some(new_agg_func) = agg_func.with_new_expressions(
physical_exprs,
agg_func
.order_bys()
.iter()
.cloned()
.map(|x| x.expr)
.collect(),
) {
*agg_func = Arc::new(new_agg_func);
} else {
return internal_err!("Failed to plan async expression");
}
}
PlanAsyncExpr::Sync(PlannedExprResult::Expr(_)) => {
// Do nothing
}
_ => {
return internal_err!(
"Unexpected result from try_plan_async_exprs"
)
}
}
}
let input_exec = if !async_exprs.is_empty() {
Arc::new(AsyncFuncExec::try_new(async_exprs, input_exec)?)
} else {
input_exec
};

I find this quite confusing to understand and it can cause issues (see #18149).

There was also discussion about this in the original async UDF function:

Describe the solution you'd like

Either we try to clean up this rewrite logic and do it earlier, or see if we can design in a way to not need to rewrite at all (see original PR as there seemed to be some discussion towards that direction).

Describe alternatives you've considered

No response

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions