@@ -27,7 +27,7 @@ use crate::{
2727use arrow_array:: RecordBatch ;
2828use arrow_schema:: { Schema , SchemaRef , SortOptions } ;
2929use bytes:: Bytes ;
30- use chrono:: { DateTime , NaiveDateTime , Timelike , Utc } ;
30+ use chrono:: { DateTime , NaiveDateTime , TimeDelta , Timelike , Utc } ;
3131use datafusion:: catalog:: Session ;
3232use datafusion:: common:: stats:: Precision ;
3333use datafusion:: logical_expr:: utils:: conjunction;
@@ -442,7 +442,7 @@ impl TableProvider for StandardTableProvider {
442442 return Err ( DataFusionError :: Plan ( "potentially unbounded query on time range. Table scanning requires atleast one time bound" . to_string ( ) ) ) ;
443443 }
444444
445- if include_now ( filters , & time_partition ) {
445+ if is_within_staging_window ( & time_filters ) {
446446 if let Ok ( staging) = PARSEABLE . get_stream ( & self . stream ) {
447447 let records = staging. recordbatches_cloned ( & self . schema ) ;
448448 let reversed_mem_table = reversed_mem_table ( records, self . schema . clone ( ) ) ?;
@@ -730,23 +730,21 @@ fn return_listing_time_filters(
730730 }
731731}
732732
733- pub fn include_now ( filters : & [ Expr ] , time_partition : & Option < String > ) -> bool {
734- let current_minute = Utc :: now ( )
733+ /// We should consider data in staging for queries concerning a time period,
734+ /// ending within 5 minutes from now. e.g. If current time is 5
735+ pub fn is_within_staging_window ( time_filters : & [ PartialTimeFilter ] ) -> bool {
736+ let five_minutes_back = ( Utc :: now ( ) - TimeDelta :: minutes ( 5 ) )
735737 . with_second ( 0 )
736738 . and_then ( |x| x. with_nanosecond ( 0 ) )
737739 . expect ( "zeroed value is valid" )
738740 . naive_utc ( ) ;
739741
740- let time_filters = extract_primary_filter ( filters, time_partition) ;
741-
742- let upper_bound_matches = time_filters. iter ( ) . any ( |filter| match filter {
742+ if time_filters. iter ( ) . any ( |filter| match filter {
743743 PartialTimeFilter :: High ( Bound :: Excluded ( time) )
744744 | PartialTimeFilter :: High ( Bound :: Included ( time) )
745- | PartialTimeFilter :: Eq ( time) => time > & current_minute ,
745+ | PartialTimeFilter :: Eq ( time) => time >= & five_minutes_back ,
746746 _ => false ,
747- } ) ;
748-
749- if upper_bound_matches {
747+ } ) {
750748 return true ;
751749 }
752750
@@ -828,7 +826,7 @@ pub async fn collect_manifest_files(
828826}
829827
830828// Extract start time and end time from filter predicate
831- fn extract_primary_filter (
829+ pub fn extract_primary_filter (
832830 filters : & [ Expr ] ,
833831 time_partition : & Option < String > ,
834832) -> Vec < PartialTimeFilter > {
0 commit comments