@@ -20,6 +20,7 @@ use chrono::{TimeDelta, Timelike};
2020use std:: future:: Future ;
2121use std:: panic:: AssertUnwindSafe ;
2222use tokio:: sync:: oneshot;
23+ use tokio:: task:: JoinSet ;
2324use tokio:: time:: { interval_at, sleep, Duration , Instant } ;
2425use tokio:: { select, task} ;
2526use tracing:: { error, info, trace, warn} ;
@@ -183,23 +184,18 @@ pub fn local_sync() -> (
183184
184185 let result = std:: panic:: catch_unwind ( AssertUnwindSafe ( || async move {
185186 let mut sync_interval = interval_at ( next_minute ( ) , LOCAL_SYNC_INTERVAL ) ;
187+ let mut joinset = JoinSet :: new ( ) ;
186188
187189 loop {
188190 select ! {
191+ // Spawns a flush+conversion task every `LOCAL_SYNC_INTERVAL` seconds
189192 _ = sync_interval. tick( ) => {
190- trace!( "Flushing Arrows to disk..." ) ;
191- PARSEABLE . flush_all_streams( ) ;
192-
193- trace!( "Converting Arrow to Parquet... " ) ;
194- if let Err ( e) = monitor_task_duration(
195- "arrow_conversion" ,
196- Duration :: from_secs( 30 ) ,
197- || async { PARSEABLE . streams. prepare_parquet( false ) } ,
198- ) . await
199- {
200- warn!( "failed to convert local arrow data to parquet. {e:?}" ) ;
201- }
193+ joinset. spawn( flush_and_convert( ) ) ;
202194 } ,
195+ // Joins and logs errors in spawned tasks
196+ Some ( Err ( e) ) = joinset. join_next( ) , if !joinset. is_empty( ) => {
197+ error!( "Issue joining flush+conversion: {e}" )
198+ }
203199 res = & mut inbox_rx => { match res{
204200 Ok ( _) => break ,
205201 Err ( _) => {
@@ -273,3 +269,18 @@ pub fn schedule_alert_task(
273269 } ) ;
274270 Ok ( handle)
275271}
272+
273+ /// Asynchronously flushes all streams when called, then compacts them into parquet files ready to be pushed onto objectstore
274+ async fn flush_and_convert ( ) {
275+ trace ! ( "Flushing Arrows to disk..." ) ;
276+ PARSEABLE . flush_all_streams ( ) ;
277+
278+ trace ! ( "Converting Arrow to Parquet... " ) ;
279+ if let Err ( e) = monitor_task_duration ( "arrow_conversion" , Duration :: from_secs ( 30 ) , || async {
280+ PARSEABLE . streams . prepare_parquet ( false )
281+ } )
282+ . await
283+ {
284+ warn ! ( "failed to convert local arrow data to parquet. {e:?}" ) ;
285+ }
286+ }
0 commit comments