@@ -23,7 +23,7 @@ use super::{
2323} ;
2424use crate :: {
2525 datasource:: file_format:: { file_compression_type:: FileCompressionType , FileFormat } ,
26- datasource:: { create_ordering , physical_plan:: FileSinkConfig } ,
26+ datasource:: physical_plan:: FileSinkConfig ,
2727 execution:: context:: SessionState ,
2828} ;
2929use arrow:: datatypes:: { DataType , Field , SchemaBuilder , SchemaRef } ;
@@ -32,7 +32,7 @@ use async_trait::async_trait;
3232use datafusion_catalog:: { Session , TableProvider } ;
3333use datafusion_common:: {
3434 config_datafusion_err, config_err, internal_err, plan_err, project_schema,
35- stats:: Precision , Constraints , DataFusionError , Result , SchemaExt ,
35+ stats:: Precision , Constraints , DFSchema , DataFusionError , Result , SchemaExt ,
3636} ;
3737use datafusion_datasource:: {
3838 compute_all_files_statistics,
@@ -45,16 +45,19 @@ use datafusion_execution::{
4545 cache:: { cache_manager:: FileStatisticsCache , cache_unit:: DefaultFileStatisticsCache } ,
4646 config:: SessionConfig ,
4747} ;
48+ use datafusion_expr:: execution_props:: ExecutionProps ;
4849use datafusion_expr:: {
4950 dml:: InsertOp , Expr , SortExpr , TableProviderFilterPushDown , TableType ,
5051} ;
52+ use datafusion_physical_expr:: create_lex_orderings;
5153use datafusion_physical_expr_adapter:: PhysicalExprAdapterFactory ;
5254use datafusion_physical_expr_common:: sort_expr:: LexOrdering ;
5355use datafusion_physical_plan:: { empty:: EmptyExec , ExecutionPlan , Statistics } ;
5456use futures:: { future, stream, Stream , StreamExt , TryStreamExt } ;
5557use itertools:: Itertools ;
5658use object_store:: ObjectStore ;
5759use std:: { any:: Any , collections:: HashMap , str:: FromStr , sync:: Arc } ;
60+
5861/// Indicates the source of the schema for a [`ListingTable`]
5962// PartialEq required for assert_eq! in tests
6063#[ derive( Debug , Clone , Copy , PartialEq , Default ) ]
@@ -1126,8 +1129,12 @@ impl ListingTable {
11261129 }
11271130
11281131 /// If file_sort_order is specified, creates the appropriate physical expressions
1129- fn try_create_output_ordering ( & self ) -> Result < Vec < LexOrdering > > {
1130- create_ordering ( & self . table_schema , & self . options . file_sort_order )
1132+ fn try_create_output_ordering (
1133+ & self ,
1134+ execution_props : & ExecutionProps ,
1135+ ) -> Result < Vec < LexOrdering > > {
1136+ let df_schema = DFSchema :: try_from ( Arc :: clone ( & self . table_schema ) ) ?;
1137+ create_lex_orderings ( & self . options . file_sort_order , & df_schema, execution_props)
11311138 }
11321139}
11331140
@@ -1199,7 +1206,7 @@ impl TableProvider for ListingTable {
11991206 return Ok ( Arc :: new ( EmptyExec :: new ( projected_schema) ) ) ;
12001207 }
12011208
1202- let output_ordering = self . try_create_output_ordering ( ) ?;
1209+ let output_ordering = self . try_create_output_ordering ( state . execution_props ( ) ) ?;
12031210 match state
12041211 . config_options ( )
12051212 . execution
@@ -1334,7 +1341,7 @@ impl TableProvider for ListingTable {
13341341 file_extension : self . options ( ) . format . get_ext ( ) ,
13351342 } ;
13361343
1337- let orderings = self . try_create_output_ordering ( ) ?;
1344+ let orderings = self . try_create_output_ordering ( state . execution_props ( ) ) ?;
13381345 // It is sufficient to pass only one of the equivalent orderings:
13391346 let order_requirements = orderings. into_iter ( ) . next ( ) . map ( Into :: into) ;
13401347
@@ -1562,6 +1569,7 @@ mod tests {
15621569 SchemaAdapter , SchemaAdapterFactory , SchemaMapper ,
15631570 } ;
15641571 use datafusion_expr:: { BinaryExpr , LogicalPlanBuilder , Operator } ;
1572+ use datafusion_physical_expr:: expressions:: binary;
15651573 use datafusion_physical_expr:: PhysicalSortExpr ;
15661574 use datafusion_physical_plan:: { collect, ExecutionPlanProperties } ;
15671575 use rstest:: rstest;
@@ -1694,29 +1702,44 @@ mod tests {
16941702
16951703 use crate :: datasource:: file_format:: parquet:: ParquetFormat ;
16961704 use datafusion_physical_plan:: expressions:: col as physical_col;
1705+ use datafusion_physical_plan:: expressions:: lit as physical_lit;
16971706 use std:: ops:: Add ;
16981707
16991708 // (file_sort_order, expected_result)
17001709 let cases = vec ! [
1701- ( vec![ ] , Ok ( Vec :: <LexOrdering >:: new( ) ) ) ,
1710+ (
1711+ vec![ ] ,
1712+ Ok :: <Vec <LexOrdering >, DataFusionError >( Vec :: <LexOrdering >:: new( ) ) ,
1713+ ) ,
17021714 // sort expr, but non column
17031715 (
1704- vec![ vec![
1705- col( "int_col" ) . add( lit( 1 ) ) . sort( true , true ) ,
1706- ] ] ,
1707- Err ( "Expected single column reference in sort_order[0][0], got int_col + Int32(1)" ) ,
1716+ vec![ vec![ col( "int_col" ) . add( lit( 1 ) ) . sort( true , true ) ] ] ,
1717+ Ok ( vec![ [ PhysicalSortExpr {
1718+ expr: binary(
1719+ physical_col( "int_col" , & schema) . unwrap( ) ,
1720+ Operator :: Plus ,
1721+ physical_lit( 1 ) ,
1722+ & schema,
1723+ )
1724+ . unwrap( ) ,
1725+ options: SortOptions {
1726+ descending: false ,
1727+ nulls_first: true ,
1728+ } ,
1729+ } ]
1730+ . into( ) ] ) ,
17081731 ) ,
17091732 // ok with one column
17101733 (
17111734 vec![ vec![ col( "string_col" ) . sort( true , false ) ] ] ,
17121735 Ok ( vec![ [ PhysicalSortExpr {
1713- expr: physical_col( "string_col" , & schema) . unwrap( ) ,
1714- options: SortOptions {
1715- descending: false ,
1716- nulls_first: false ,
1717- } ,
1718- } ] . into ( ) ,
1719- ] )
1736+ expr: physical_col( "string_col" , & schema) . unwrap( ) ,
1737+ options: SortOptions {
1738+ descending: false ,
1739+ nulls_first: false ,
1740+ } ,
1741+ } ]
1742+ . into ( ) ] ) ,
17201743 ) ,
17211744 // ok with two columns, different options
17221745 (
@@ -1725,14 +1748,18 @@ mod tests {
17251748 col( "int_col" ) . sort( false , true ) ,
17261749 ] ] ,
17271750 Ok ( vec![ [
1728- PhysicalSortExpr :: new_default( physical_col( "string_col" , & schema) . unwrap( ) )
1729- . asc( )
1730- . nulls_last( ) ,
1731- PhysicalSortExpr :: new_default( physical_col( "int_col" , & schema) . unwrap( ) )
1732- . desc( )
1733- . nulls_first( )
1734- ] . into( ) ,
1735- ] )
1751+ PhysicalSortExpr :: new_default(
1752+ physical_col( "string_col" , & schema) . unwrap( ) ,
1753+ )
1754+ . asc( )
1755+ . nulls_last( ) ,
1756+ PhysicalSortExpr :: new_default(
1757+ physical_col( "int_col" , & schema) . unwrap( ) ,
1758+ )
1759+ . desc( )
1760+ . nulls_first( ) ,
1761+ ]
1762+ . into( ) ] ) ,
17361763 ) ,
17371764 ] ;
17381765
@@ -1745,7 +1772,8 @@ mod tests {
17451772
17461773 let table =
17471774 ListingTable :: try_new ( config. clone ( ) ) . expect ( "Creating the table" ) ;
1748- let ordering_result = table. try_create_output_ordering ( ) ;
1775+ let ordering_result =
1776+ table. try_create_output_ordering ( state. execution_props ( ) ) ;
17491777
17501778 match ( expected_result, ordering_result) {
17511779 ( Ok ( expected) , Ok ( result) ) => {
0 commit comments