@@ -280,18 +280,7 @@ impl EventFormat for Event {
280280 _ => p_timestamp. naive_utc ( ) ,
281281 } ;
282282
283- let prefix = format ! (
284- "{}.{}.minute={}.{}" ,
285- get_schema_key( & schema) ,
286- parsed_timestamp. format( "date=%Y-%m-%d.hour=%H" ) ,
287- Minute :: from( parsed_timestamp) . to_slot( OBJECT_STORE_DATA_GRANULARITY ) ,
288- custom_partition_values
289- . iter( )
290- . sorted_by_key( |v| v. 0 )
291- . map( |( key, value) | format!( "{key}={value}" ) )
292- . join( "." )
293- ) ;
294-
283+ let prefix = generate_prefix ( & schema, parsed_timestamp, & custom_partition_values) ;
295284 if let Some ( JsonPartition { batch, .. } ) = json_partitions. get_mut ( & prefix) {
296285 batch. push ( json)
297286 } else {
@@ -338,6 +327,24 @@ impl EventFormat for Event {
338327 }
339328}
340329
330+ fn generate_prefix (
331+ schema : & [ Arc < Field > ] ,
332+ parsed_timestamp : NaiveDateTime ,
333+ custom_partition_values : & HashMap < String , String > ,
334+ ) -> String {
335+ format ! (
336+ "{}.{}.minute={}{}" ,
337+ get_schema_key( schema) ,
338+ parsed_timestamp. format( "date=%Y-%m-%d.hour=%H" ) ,
339+ Minute :: from( parsed_timestamp) . to_slot( OBJECT_STORE_DATA_GRANULARITY ) ,
340+ custom_partition_values
341+ . iter( )
342+ . sorted_by_key( |v| v. 0 )
343+ . map( |( key, value) | format!( ".{key}={value}" ) )
344+ . join( "" )
345+ )
346+ }
347+
341348/// Extracts custom partition values from provided JSON object
342349/// e.g. `json: {"status": 400, "msg": "Hello, World!"}, custom_partition_list: ["status"]` returns `{"status" => 400}`
343350pub fn extract_custom_partition_values (
@@ -463,6 +470,7 @@ mod tests {
463470
464471 use arrow:: datatypes:: Int64Type ;
465472 use arrow_array:: { ArrayRef , Float64Array , Int64Array , ListArray , StringArray } ;
473+ use chrono:: Timelike ;
466474 use serde_json:: json;
467475
468476 use super :: * ;
@@ -976,4 +984,51 @@ mod tests {
976984 & Float64Array :: from( vec![ None , None , None , Some ( 2.0 ) ] )
977985 ) ;
978986 }
987+
988+ #[ test]
989+ fn generate_correct_prefix_with_current_time_and_no_custom_partitioning ( ) {
990+ let schema = vec ! [ ] ;
991+ let parsed_timestamp = NaiveDate :: from_ymd_opt ( 2023 , 10 , 1 )
992+ . unwrap ( )
993+ . and_hms_opt ( 12 , 30 , 0 )
994+ . unwrap ( ) ;
995+ let custom_partition_values = HashMap :: new ( ) ;
996+
997+ let expected = format ! (
998+ "{}.date={}.hour={:02}.minute={}" ,
999+ get_schema_key( & schema) ,
1000+ parsed_timestamp. date( ) ,
1001+ parsed_timestamp. hour( ) ,
1002+ Minute :: from( parsed_timestamp) . to_slot( OBJECT_STORE_DATA_GRANULARITY ) ,
1003+ ) ;
1004+
1005+ let generated = generate_prefix ( & schema, parsed_timestamp, & custom_partition_values) ;
1006+
1007+ assert_eq ! ( generated, expected) ;
1008+ }
1009+
1010+ #[ test]
1011+ fn generate_correct_prefix_with_current_time_and_custom_partitioning ( ) {
1012+ let schema = vec ! [ ] ;
1013+ let parsed_timestamp = NaiveDate :: from_ymd_opt ( 2023 , 10 , 1 )
1014+ . unwrap ( )
1015+ . and_hms_opt ( 12 , 30 , 0 )
1016+ . unwrap ( ) ;
1017+ let custom_partition_values = HashMap :: from_iter ( [
1018+ ( "key1" . to_string ( ) , "value1" . to_string ( ) ) ,
1019+ ( "key2" . to_string ( ) , "value2" . to_string ( ) ) ,
1020+ ] ) ;
1021+
1022+ let expected = format ! (
1023+ "{}.date={}.hour={:02}.minute={}.key1=value1.key2=value2" ,
1024+ get_schema_key( & schema) ,
1025+ parsed_timestamp. date( ) ,
1026+ parsed_timestamp. hour( ) ,
1027+ Minute :: from( parsed_timestamp) . to_slot( OBJECT_STORE_DATA_GRANULARITY ) ,
1028+ ) ;
1029+
1030+ let generated = generate_prefix ( & schema, parsed_timestamp, & custom_partition_values) ;
1031+
1032+ assert_eq ! ( generated, expected) ;
1033+ }
9791034}
0 commit comments