@@ -45,7 +45,7 @@ use std::{collections::HashMap, sync::Arc};
4545use arrow_array:: {
4646 Array , ArrayRef , RecordBatch , StringArray , TimestampMillisecondArray , UInt64Array ,
4747} ;
48- use arrow_schema:: { DataType , Field , Schema , TimeUnit } ;
48+ use arrow_schema:: { ArrowError , DataType , Field , Schema , TimeUnit } ;
4949use arrow_select:: take:: take;
5050use chrono:: { DateTime , Utc } ;
5151use itertools:: Itertools ;
@@ -145,41 +145,50 @@ pub fn add_parseable_fields(
145145 rb : RecordBatch ,
146146 p_timestamp : DateTime < Utc > ,
147147 p_custom_fields : & HashMap < String , String > ,
148- ) -> RecordBatch {
148+ ) -> Result < RecordBatch , ArrowError > {
149+ // Return Result for proper error handling
150+
151+ // Add custom fields in sorted order
152+ let mut sorted_keys: Vec < & String > = p_custom_fields. keys ( ) . collect ( ) ;
153+ sorted_keys. sort ( ) ;
154+
149155 let schema = rb. schema ( ) ;
150156 let row_count = rb. num_rows ( ) ;
151157
152- let mut fields: Vec < Field > = schema. fields ( ) . iter ( ) . map ( |f| f. as_ref ( ) . clone ( ) ) . collect ( ) ;
153- let mut columns: Vec < ArrayRef > = rb. columns ( ) . to_vec ( ) ;
154-
155- // Create and insert the p_timestamp field and array at index 0
156- let p_timestamp_array = Arc :: new ( TimestampMillisecondArray :: from_iter_values (
157- std:: iter:: repeat ( p_timestamp. timestamp_millis ( ) ) . take ( row_count) ,
158- ) ) ;
159- let p_timestamp_field = Field :: new (
160- DEFAULT_TIMESTAMP_KEY . to_string ( ) ,
161- DataType :: Timestamp ( TimeUnit :: Millisecond , None ) ,
162- false ,
158+ let mut fields = schema
159+ . fields ( )
160+ . iter ( )
161+ . map ( |f| f. as_ref ( ) . clone ( ) )
162+ . collect_vec ( ) ;
163+ fields. insert (
164+ 0 ,
165+ Field :: new (
166+ DEFAULT_TIMESTAMP_KEY ,
167+ DataType :: Timestamp ( TimeUnit :: Millisecond , None ) ,
168+ true ,
169+ ) ,
170+ ) ;
171+ fields. extend (
172+ sorted_keys
173+ . iter ( )
174+ . map ( |k| Field :: new ( * k, DataType :: Utf8 , true ) ) ,
163175 ) ;
164- fields. insert ( 0 , p_timestamp_field) ;
165- columns. insert ( 0 , p_timestamp_array) ;
166176
167- // Sort p_custom_fields by key and insert custom fields at the beginning, after the p_timestamp field
168- let mut sorted_keys: Vec < & String > = p_custom_fields. keys ( ) . collect ( ) ;
169- sorted_keys. sort ( ) ;
170- for key in sorted_keys. iter ( ) . rev ( ) {
171- let value = p_custom_fields. get ( * key) . unwrap ( ) ;
172- let string_array: ArrayRef = Arc :: new ( StringArray :: from_iter_values (
177+ let mut columns = rb. columns ( ) . iter ( ) . map ( Arc :: clone) . collect_vec ( ) ;
178+ columns. insert (
179+ 0 ,
180+ Arc :: new ( get_timestamp_array ( p_timestamp, row_count) ) as ArrayRef ,
181+ ) ;
182+ columns. extend ( sorted_keys. iter ( ) . map ( |k| {
183+ let value = p_custom_fields. get ( * k) . unwrap ( ) ;
184+ Arc :: new ( StringArray :: from_iter_values (
173185 std:: iter:: repeat ( value) . take ( row_count) ,
174- ) ) ;
175- columns. insert ( 1 , string_array) ;
176-
177- let new_field = Field :: new ( ( * key) . clone ( ) , DataType :: Utf8 , true ) ;
178- fields. insert ( 1 , new_field) ;
179- }
186+ ) ) as ArrayRef
187+ } ) ) ;
180188
189+ // Create the new schema and batch
181190 let new_schema = Arc :: new ( Schema :: new ( fields) ) ;
182- RecordBatch :: try_new ( new_schema, columns) . unwrap ( )
191+ RecordBatch :: try_new ( new_schema, columns)
183192}
184193
185194pub fn reverse ( rb : & RecordBatch ) -> RecordBatch {
0 commit comments