-
-
Notifications
You must be signed in to change notification settings - Fork 157
feat: merge finish .arrows and convert to .parquet
#1200
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
6f322fc
63382b2
e3121ea
281a1a4
4a2a659
e4ca566
8219e43
2b3b2d4
0c789eb
71d643d
00b5b71
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -20,14 +20,14 @@ use chrono::{TimeDelta, Timelike}; | |||||||||
| use std::future::Future; | ||||||||||
| use std::panic::AssertUnwindSafe; | ||||||||||
| use tokio::sync::oneshot; | ||||||||||
| use tokio::task::JoinSet; | ||||||||||
| use tokio::time::{interval_at, sleep, Duration, Instant}; | ||||||||||
| use tokio::{select, task}; | ||||||||||
| use tracing::{error, info, trace, warn}; | ||||||||||
|
|
||||||||||
| use crate::alerts::{alerts_utils, AlertConfig, AlertError}; | ||||||||||
| use crate::parseable::PARSEABLE; | ||||||||||
| use crate::storage::LOCAL_SYNC_INTERVAL; | ||||||||||
| use crate::{STORAGE_CONVERSION_INTERVAL, STORAGE_UPLOAD_INTERVAL}; | ||||||||||
| use crate::{LOCAL_SYNC_INTERVAL, STORAGE_UPLOAD_INTERVAL}; | ||||||||||
|
|
||||||||||
| // Calculates the instant that is the start of the next minute | ||||||||||
| fn next_minute() -> Instant { | ||||||||||
|
|
@@ -74,28 +74,22 @@ where | |||||||||
|
|
||||||||||
| /// Flushes arrows onto disk every `LOCAL_SYNC_INTERVAL` seconds, packs arrows into parquet every | ||||||||||
| /// `STORAGE_CONVERSION_INTERVAL` secondsand uploads them every `STORAGE_UPLOAD_INTERVAL` seconds. | ||||||||||
| #[tokio::main(flavor = "current_thread")] | ||||||||||
| #[tokio::main(flavor = "multi_thread", worker_threads = 2)] | ||||||||||
| pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()> { | ||||||||||
| let (localsync_handler, mut localsync_outbox, localsync_inbox) = run_local_sync(); | ||||||||||
| let (localsync_handler, mut localsync_outbox, localsync_inbox) = local_sync(); | ||||||||||
| let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = | ||||||||||
| object_store_sync(); | ||||||||||
| let (mut remote_conversion_handler, mut remote_conversion_outbox, mut remote_conversion_inbox) = | ||||||||||
| arrow_conversion(); | ||||||||||
| loop { | ||||||||||
| select! { | ||||||||||
| _ = &mut cancel_rx => { | ||||||||||
| // actix server finished .. stop other threads and stop the server | ||||||||||
| remote_sync_inbox.send(()).unwrap_or(()); | ||||||||||
| localsync_inbox.send(()).unwrap_or(()); | ||||||||||
| remote_conversion_inbox.send(()).unwrap_or(()); | ||||||||||
| if let Err(e) = localsync_handler.await { | ||||||||||
| error!("Error joining remote_sync_handler: {:?}", e); | ||||||||||
| error!("Error joining remote_sync_handler: {e:?}"); | ||||||||||
| } | ||||||||||
|
||||||||||
| error!("Error joining remote_sync_handler: {e:?}"); | |
| } | |
| error!("Error joining localsync_handler: {e:?}"); | |
| } |
Uh oh!
There was an error while loading. Please reload this page.