Skip to content

Commit db6ba4b

Browse files
moved from query handler to scheduled job
1 parent 001c478 commit db6ba4b

File tree

9 files changed

+91
-57
lines changed

9 files changed

+91
-57
lines changed

src/handlers/http/modal/ingest_server.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,9 @@ impl ParseableServer for IngestServer {
129129
let (cancel_tx, cancel_rx) = oneshot::channel();
130130
thread::spawn(|| sync::handler(cancel_rx));
131131

132+
// Initialize memory release scheduler
133+
crate::memory::init_memory_release_scheduler()?;
134+
132135
tokio::spawn(airplane::server());
133136

134137
// Ingestors shouldn't have to deal with OpenId auth flow

src/handlers/http/modal/query_server.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,9 @@ impl ParseableServer for QueryServer {
129129
analytics::init_analytics_scheduler()?;
130130
}
131131

132+
// Initialize memory release scheduler
133+
crate::memory::init_memory_release_scheduler()?;
134+
132135
if init_cluster_metrics_schedular().is_ok() {
133136
info!("Cluster metrics scheduler started successfully");
134137
}

src/handlers/http/modal/server.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,9 @@ impl ParseableServer for Server {
154154
analytics::init_analytics_scheduler()?;
155155
}
156156

157+
// Initialize memory release scheduler
158+
crate::memory::init_memory_release_scheduler()?;
159+
157160
tokio::spawn(handlers::livetail::server());
158161
tokio::spawn(handlers::airplane::server());
159162

src/handlers/http/query.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ use crate::query::error::ExecuteError;
5151
use crate::query::{CountsRequest, Query as LogicalQuery, execute};
5252
use crate::query::{QUERY_SESSION, resolve_stream_names};
5353
use crate::rbac::Users;
54-
use crate::response::{QueryResponse, force_memory_release};
54+
use crate::response::QueryResponse;
5555
use crate::storage::ObjectStorageError;
5656
use crate::utils::actix::extract_session_key_from_req;
5757
use crate::utils::time::{TimeParseError, TimeRange};
@@ -245,8 +245,8 @@ async fn handle_non_streaming_query(
245245
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
246246
.json(response);
247247

248-
// Force memory release after HTTP response is fully created
249-
force_memory_release();
248+
// // Force memory release after HTTP response is fully created
249+
// force_memory_release();
250250

251251
Ok(http_response)
252252
}
@@ -346,8 +346,6 @@ fn create_batch_processor(
346346
let bytes_result = Bytes::from(format!("{response}\n"));
347347
drop(response); // Explicit cleanup
348348

349-
force_memory_release();
350-
351349
Ok(bytes_result)
352350
}
353351
Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ pub mod event;
3030
pub mod handlers;
3131
pub mod hottier;
3232
mod livetail;
33+
pub mod memory;
3334
mod metadata;
3435
pub mod metastore;
3536
pub mod metrics;

src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use tracing_subscriber::util::SubscriberInitExt;
3232
use tracing_subscriber::{EnvFilter, Registry, fmt};
3333

3434
// Use jemalloc as the global allocator
35+
#[cfg(not(target_env = "msvc"))]
3536
#[global_allocator]
3637
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
3738

src/memory.rs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
19+
use std::ffi::CString;
20+
use std::time::Duration;
21+
22+
use clokwerk::AsyncScheduler;
23+
use tracing::{info, warn};
24+
25+
/// Force memory release using jemalloc
26+
pub fn force_memory_release() {
27+
// Advance epoch to refresh statistics and trigger potential cleanup
28+
if let Err(e) = tikv_jemalloc_ctl::epoch::mib().and_then(|mib| mib.advance()) {
29+
warn!("Failed to advance jemalloc epoch: {:?}", e);
30+
}
31+
32+
// Purge each initialized arena
33+
if let Ok(n) = tikv_jemalloc_ctl::arenas::narenas::read() {
34+
for i in 0..n {
35+
if let Ok(name) = CString::new(format!("arena.{i}.purge")) {
36+
unsafe {
37+
let ret = tikv_jemalloc_sys::mallctl(
38+
name.as_ptr(),
39+
std::ptr::null_mut(),
40+
std::ptr::null_mut(),
41+
std::ptr::null_mut(),
42+
0,
43+
);
44+
if ret != 0 {
45+
warn!("Arena purge failed for index {i} with code: {ret}");
46+
}
47+
}
48+
}
49+
}
50+
} else {
51+
warn!("Failed to read jemalloc arenas.narenas");
52+
}
53+
}
54+
55+
/// Initialize memory management scheduler
56+
pub fn init_memory_release_scheduler() -> anyhow::Result<()> {
57+
info!("Setting up scheduler for memory release");
58+
59+
let mut scheduler = AsyncScheduler::new();
60+
scheduler
61+
.every(clokwerk::Interval::Hours(1))
62+
.run(move || async {
63+
info!("Running scheduled memory release");
64+
force_memory_release();
65+
});
66+
67+
tokio::spawn(async move {
68+
loop {
69+
scheduler.run_pending().await;
70+
tokio::time::sleep(Duration::from_secs(60)).await; // Check every minute
71+
}
72+
});
73+
74+
Ok(())
75+
}

src/response.rs

Lines changed: 1 addition & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,10 @@
1616
*
1717
*/
1818

19-
use std::ffi::CString;
20-
use std::sync::Mutex;
21-
use std::time::{Duration, Instant};
22-
2319
use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json};
2420
use datafusion::arrow::record_batch::RecordBatch;
2521
use serde_json::{Value, json};
26-
use tracing::{debug, info, warn};
22+
use tracing::info;
2723

2824
pub struct QueryResponse {
2925
pub records: Vec<RecordBatch>,
@@ -70,49 +66,3 @@ impl QueryResponse {
7066
Ok(response)
7167
}
7268
}
73-
74-
impl Drop for QueryResponse {
75-
fn drop(&mut self) {
76-
force_memory_release();
77-
}
78-
}
79-
80-
// Rate-limited memory release with proper error handling
81-
static LAST_PURGE: Mutex<Option<Instant>> = Mutex::new(None);
82-
const PURGE_INTERVAL: Duration = Duration::from_secs(3600); // 1 hour
83-
pub fn force_memory_release() {
84-
{
85-
let mut last_purge = LAST_PURGE.lock().unwrap();
86-
if let Some(last) = *last_purge {
87-
if last.elapsed() < PURGE_INTERVAL {
88-
return;
89-
}
90-
}
91-
*last_purge = Some(Instant::now());
92-
}
93-
94-
// Advance epoch to refresh statistics and trigger potential cleanup
95-
if let Err(e) = tikv_jemalloc_ctl::epoch::mib().and_then(|mib| mib.advance()) {
96-
warn!("Failed to advance jemalloc epoch: {:?}", e);
97-
}
98-
99-
// Purge all arenas using MALLCTL_ARENAS_ALL
100-
if let Ok(arena_purge) = CString::new("arena.4096.purge") {
101-
unsafe {
102-
let ret = tikv_jemalloc_sys::mallctl(
103-
arena_purge.as_ptr(),
104-
std::ptr::null_mut(), // oldp (not reading)
105-
std::ptr::null_mut(), // oldlenp (not reading)
106-
std::ptr::null_mut(), // newp (void operation)
107-
0, // newlen (void operation)
108-
);
109-
if ret != 0 {
110-
warn!("Arena purge failed with code: {}", ret);
111-
} else {
112-
debug!("Successfully purged all jemalloc arenas");
113-
}
114-
}
115-
} else {
116-
warn!("Failed to create CString for arena purge");
117-
}
118-
}

src/utils/arrow/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ pub fn record_batches_to_json(records: &[RecordBatch]) -> Result<Vec<Map<String,
6565
// Use a cursor to avoid extra allocations during parsing
6666
let json_rows: Vec<Map<String, Value>> = {
6767
let cursor = std::io::Cursor::new(buf);
68-
serde_json::from_reader(cursor).unwrap_or_else(|_| Vec::with_capacity(0))
68+
serde_json::from_reader(cursor)?
6969
};
7070

7171
Ok(json_rows)

0 commit comments

Comments
 (0)