@@ -24,12 +24,13 @@ use std::{
2424 path:: { Path , PathBuf } ,
2525 process,
2626 sync:: { Arc , Mutex , RwLock } ,
27+ time:: { SystemTime , UNIX_EPOCH } ,
2728} ;
2829
2930use arrow_array:: RecordBatch ;
3031use arrow_ipc:: writer:: StreamWriter ;
3132use arrow_schema:: { Field , Fields , Schema } ;
32- use chrono:: { NaiveDateTime , Timelike , Utc } ;
33+ use chrono:: { NaiveDateTime , Timelike } ;
3334use derive_more:: { Deref , DerefMut } ;
3435use itertools:: Itertools ;
3536use parquet:: {
@@ -72,6 +73,14 @@ const ARROW_FILE_EXTENSION: &str = "data.arrows";
7273
7374pub type StreamRef = Arc < Stream > ;
7475
76+ /// Gets the unix timestamp for the minute as described by the `SystemTime`
77+ fn minute_from_system_time ( time : SystemTime ) -> u128 {
78+ time. duration_since ( UNIX_EPOCH )
79+ . expect ( "Legitimate time" )
80+ . as_millis ( )
81+ / 60000
82+ }
83+
7584/// All state associated with a single logstream in Parseable.
7685pub struct Stream {
7786 pub stream_name : String ,
@@ -156,8 +165,7 @@ impl Stream {
156165 hostname. push_str ( id) ;
157166 }
158167 let filename = format ! (
159- "{}{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}" ,
160- Utc :: now( ) . format( "%Y%m%dT%H%M" ) ,
168+ "{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}" ,
161169 parsed_timestamp. date( ) ,
162170 parsed_timestamp. hour( ) ,
163171 minute_to_slot( parsed_timestamp. minute( ) , OBJECT_STORE_DATA_GRANULARITY ) . unwrap( ) ,
@@ -192,7 +200,7 @@ impl Stream {
192200 /// Only includes ones starting from the previous minute
193201 pub fn arrow_files_grouped_exclude_time (
194202 & self ,
195- exclude : NaiveDateTime ,
203+ exclude : SystemTime ,
196204 shutdown_signal : bool ,
197205 ) -> HashMap < PathBuf , Vec < PathBuf > > {
198206 let mut grouped_arrow_file: HashMap < PathBuf , Vec < PathBuf > > = HashMap :: new ( ) ;
@@ -202,12 +210,13 @@ impl Stream {
202210 // don't keep the ones for the current minute
203211 if !shutdown_signal {
204212 arrow_files. retain ( |path| {
205- !path
206- . file_name ( )
207- . unwrap ( )
208- . to_str ( )
209- . unwrap ( )
210- . starts_with ( & exclude. format ( "%Y%m%dT%H%M" ) . to_string ( ) )
213+ let creation = path
214+ . metadata ( )
215+ . expect ( "Arrow file should exist on disk" )
216+ . created ( )
217+ . expect ( "Creation time should be accessible" ) ;
218+ // Compare if creation time is actually from previous minute
219+ minute_from_system_time ( creation) < minute_from_system_time ( exclude)
211220 } ) ;
212221 }
213222
@@ -429,8 +438,8 @@ impl Stream {
429438 ) -> Result < Option < Schema > , StagingError > {
430439 let mut schemas = Vec :: new ( ) ;
431440
432- let time = chrono :: Utc :: now ( ) . naive_utc ( ) ;
433- let staging_files = self . arrow_files_grouped_exclude_time ( time , shutdown_signal) ;
441+ let now = SystemTime :: now ( ) ;
442+ let staging_files = self . arrow_files_grouped_exclude_time ( now , shutdown_signal) ;
434443 if staging_files. is_empty ( ) {
435444 metrics:: STAGING_FILES
436445 . with_label_values ( & [ & self . stream_name ] )
@@ -757,7 +766,7 @@ mod tests {
757766
758767 use arrow_array:: { Int32Array , StringArray , TimestampMillisecondArray } ;
759768 use arrow_schema:: { DataType , Field , TimeUnit } ;
760- use chrono:: { NaiveDate , TimeDelta } ;
769+ use chrono:: { NaiveDate , TimeDelta , Utc } ;
761770 use temp_dir:: TempDir ;
762771 use tokio:: time:: sleep;
763772
@@ -874,8 +883,7 @@ mod tests {
874883 ) ;
875884
876885 let expected_path = staging. data_path . join ( format ! (
877- "{}{stream_hash}.date={}.hour={:02}.minute={}.{}.{ARROW_FILE_EXTENSION}" ,
878- Utc :: now( ) . format( "%Y%m%dT%H%M" ) ,
886+ "{stream_hash}.date={}.hour={:02}.minute={}.{}.{ARROW_FILE_EXTENSION}" ,
879887 parsed_timestamp. date( ) ,
880888 parsed_timestamp. hour( ) ,
881889 minute_to_slot( parsed_timestamp. minute( ) , OBJECT_STORE_DATA_GRANULARITY ) . unwrap( ) ,
@@ -909,8 +917,7 @@ mod tests {
909917 ) ;
910918
911919 let expected_path = staging. data_path . join ( format ! (
912- "{}{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}" ,
913- Utc :: now( ) . format( "%Y%m%dT%H%M" ) ,
920+ "{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}" ,
914921 parsed_timestamp. date( ) ,
915922 parsed_timestamp. hour( ) ,
916923 minute_to_slot( parsed_timestamp. minute( ) , OBJECT_STORE_DATA_GRANULARITY ) . unwrap( ) ,
0 commit comments