1616 *
1717 */
1818
19- use crate :: catalog:: manifest:: File ;
20- use crate :: hottier:: HotTierManager ;
21- use crate :: option:: Mode ;
22- use crate :: parseable:: STREAM_EXISTS ;
23- use crate :: {
24- catalog:: snapshot:: { self , Snapshot } ,
25- storage:: { ObjectStoreFormat , STREAM_ROOT_DIRECTORY } ,
26- } ;
19+ use std:: any:: Any ;
20+ use std:: collections:: HashMap ;
21+ use std:: ops:: Bound ;
22+ use std:: os:: unix:: fs:: MetadataExt ;
23+ use std:: sync:: Arc ;
24+
2725use arrow_array:: RecordBatch ;
2826use arrow_schema:: { Schema , SchemaRef , SortOptions } ;
2927use bytes:: Bytes ;
3028use chrono:: { DateTime , NaiveDateTime , TimeDelta , Timelike , Utc } ;
31- use datafusion:: catalog:: Session ;
32- use datafusion:: common:: stats:: Precision ;
33- use datafusion:: logical_expr:: utils:: conjunction;
34- use datafusion:: physical_expr:: LexOrdering ;
3529use datafusion:: {
36- catalog:: SchemaProvider ,
30+ catalog:: { SchemaProvider , Session } ,
3731 common:: {
32+ stats:: Precision ,
3833 tree_node:: { TreeNode , TreeNodeRecursion } ,
3934 ToDFSchema ,
4035 } ,
@@ -46,28 +41,32 @@ use datafusion::{
4641 } ,
4742 error:: { DataFusionError , Result as DataFusionResult } ,
4843 execution:: { context:: SessionState , object_store:: ObjectStoreUrl } ,
49- logical_expr:: { BinaryExpr , Operator , TableProviderFilterPushDown , TableType } ,
50- physical_expr:: { create_physical_expr, PhysicalSortExpr } ,
44+ logical_expr:: {
45+ utils:: conjunction, BinaryExpr , Operator , TableProviderFilterPushDown , TableType ,
46+ } ,
47+ physical_expr:: { create_physical_expr, LexOrdering , PhysicalSortExpr } ,
5148 physical_plan:: { self , empty:: EmptyExec , union:: UnionExec , ExecutionPlan , Statistics } ,
5249 prelude:: Expr ,
5350 scalar:: ScalarValue ,
5451} ;
55-
5652use futures_util:: { stream:: FuturesOrdered , StreamExt , TryFutureExt , TryStreamExt } ;
5753use itertools:: Itertools ;
5854use object_store:: { path:: Path , ObjectStore } ;
5955use relative_path:: RelativePathBuf ;
60- use std:: { any:: Any , collections:: HashMap , ops:: Bound , sync:: Arc } ;
6156use url:: Url ;
6257
6358use crate :: {
6459 catalog:: {
65- self , column:: TypedStatistics , manifest:: Manifest , snapshot:: ManifestItem , ManifestFile ,
60+ self , column:: TypedStatistics , manifest:: File , manifest:: Manifest , snapshot:: ManifestItem ,
61+ snapshot:: Snapshot , ManifestFile ,
6662 } ,
6763 event:: DEFAULT_TIMESTAMP_KEY ,
64+ hottier:: HotTierManager ,
6865 metrics:: QUERY_CACHE_HIT ,
66+ option:: Mode ,
6967 parseable:: PARSEABLE ,
70- storage:: ObjectStorage ,
68+ parseable:: STREAM_EXISTS ,
69+ storage:: { ObjectStorage , ObjectStoreFormat , STREAM_ROOT_DIRECTORY } ,
7170} ;
7271
7372use super :: listing_table_builder:: ListingTableBuilder ;
@@ -223,6 +222,50 @@ impl StandardTableProvider {
223222 Ok ( ( ) )
224223 }
225224
225+ async fn get_staging_execution_plan (
226+ & self ,
227+ execution_plans : & mut Vec < Arc < dyn ExecutionPlan > > ,
228+ projection : Option < & Vec < usize > > ,
229+ filters : & [ Expr ] ,
230+ limit : Option < usize > ,
231+ state : & dyn Session ,
232+ time_partition : Option < & String > ,
233+ ) -> Result < ( ) , DataFusionError > {
234+ let Ok ( staging) = PARSEABLE . get_stream ( & self . stream ) else {
235+ return Ok ( ( ) ) ;
236+ } ;
237+ let records = staging. recordbatches_cloned ( & self . schema ) ;
238+ let reversed_mem_table = reversed_mem_table ( records, self . schema . clone ( ) ) ?;
239+
240+ let memory_exec = reversed_mem_table
241+ . scan ( state, projection, filters, limit)
242+ . await ?;
243+ execution_plans. push ( memory_exec) ;
244+
245+ let target_partition = num_cpus:: get ( ) ;
246+ let mut partitioned_files = Vec :: from_iter ( ( 0 ..target_partition) . map ( |_| Vec :: new ( ) ) ) ;
247+ for ( index, file_path) in staging. parquet_files ( ) . into_iter ( ) . enumerate ( ) {
248+ let Ok ( file_meta) = file_path. metadata ( ) else {
249+ continue ;
250+ } ;
251+ let file = PartitionedFile :: new ( file_path. display ( ) . to_string ( ) , file_meta. size ( ) ) ;
252+ partitioned_files[ index % target_partition] . push ( file)
253+ }
254+
255+ self . create_parquet_physical_plan (
256+ execution_plans,
257+ ObjectStoreUrl :: parse ( "file:///" ) . unwrap ( ) ,
258+ partitioned_files,
259+ Statistics :: new_unknown ( & self . schema ) ,
260+ projection,
261+ filters,
262+ limit,
263+ state,
264+ time_partition. cloned ( ) ,
265+ )
266+ . await
267+ }
268+
226269 #[ allow( clippy:: too_many_arguments) ]
227270 async fn legacy_listing_table (
228271 & self ,
@@ -443,17 +486,17 @@ impl TableProvider for StandardTableProvider {
443486 }
444487
445488 if is_within_staging_window ( & time_filters) {
446- if let Ok ( staging ) = PARSEABLE . get_stream ( & self . stream ) {
447- let records = staging . recordbatches_cloned ( & self . schema ) ;
448- let reversed_mem_table = reversed_mem_table ( records , self . schema . clone ( ) ) ? ;
449-
450- let memory_exec = reversed_mem_table
451- . scan ( state, projection , filters , limit )
452- . await ? ;
453- execution_plans . push ( memory_exec ) ;
454- }
489+ self . get_staging_execution_plan (
490+ & mut execution_plans ,
491+ projection ,
492+ filters ,
493+ limit ,
494+ state,
495+ time_partition . as_ref ( ) ,
496+ )
497+ . await ? ;
455498 } ;
456- let mut merged_snapshot: snapshot :: Snapshot = Snapshot :: default ( ) ;
499+ let mut merged_snapshot = Snapshot :: default ( ) ;
457500 if PARSEABLE . options . mode == Mode :: Query {
458501 let path = RelativePathBuf :: from_iter ( [ & self . stream , STREAM_ROOT_DIRECTORY ] ) ;
459502 let obs = glob_storage
0 commit comments