Skip to content

Commit 001c478

Browse files
chore: release memory to the OS every hour
1 parent 2f2b324 commit 001c478

File tree

7 files changed

+155
-30
lines changed

7 files changed

+155
-30
lines changed

Cargo.lock

Lines changed: 34 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ parquet = "54.0.0"
2828
# Web server and HTTP-related
2929
actix-cors = "0.7.0"
3030
actix-web = { version = "4.9.0", features = ["rustls-0_22"] }
31+
tikv-jemalloc-ctl = "0.6.0"
32+
tikv-jemallocator = "0.6.0"
33+
tikv-jemalloc-sys = "0.6.1"
3134
actix-web-httpauth = "0.8"
3235
actix-web-prometheus = { version = "0.1" }
3336
actix-web-static-files = "4.0"

src/handlers/http/query.rs

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ use futures::stream::once;
3535
use futures::{Stream, StreamExt, future};
3636
use futures_util::Future;
3737
use http::StatusCode;
38-
use itertools::Itertools;
3938
use serde::{Deserialize, Serialize};
4039
use serde_json::{Value, json};
4140
use std::collections::HashMap;
@@ -52,7 +51,7 @@ use crate::query::error::ExecuteError;
5251
use crate::query::{CountsRequest, Query as LogicalQuery, execute};
5352
use crate::query::{QUERY_SESSION, resolve_stream_names};
5453
use crate::rbac::Users;
55-
use crate::response::QueryResponse;
54+
use crate::response::{QueryResponse, force_memory_release};
5655
use crate::storage::ObjectStorageError;
5756
use crate::utils::actix::extract_session_key_from_req;
5857
use crate::utils::time::{TimeParseError, TimeRange};
@@ -241,9 +240,15 @@ async fn handle_non_streaming_query(
241240
with_fields: query_request.fields,
242241
}
243242
.to_json()?;
244-
Ok(HttpResponse::Ok()
243+
244+
let http_response = HttpResponse::Ok()
245245
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
246-
.json(response))
246+
.json(response);
247+
248+
// Force memory release after HTTP response is fully created
249+
force_memory_release();
250+
251+
Ok(http_response)
247252
}
248253

249254
/// Handles streaming queries, returning results as newline-delimited JSON (NDJSON).
@@ -324,18 +329,26 @@ fn create_batch_processor(
324329
) -> impl FnMut(Result<RecordBatch, QueryError>) -> Result<Bytes, actix_web::Error> {
325330
move |batch_result| match batch_result {
326331
Ok(batch) => {
327-
let response = QueryResponse {
332+
// Create response and immediately process to reduce memory retention
333+
let query_response = QueryResponse {
328334
records: vec![batch],
329335
fields: Vec::new(),
330336
fill_null: send_null,
331337
with_fields: false,
332-
}
333-
.to_json()
334-
.map_err(|e| {
338+
};
339+
340+
let response = query_response.to_json().map_err(|e| {
335341
error!("Failed to parse record batch into JSON: {}", e);
336342
actix_web::error::ErrorInternalServerError(e)
337343
})?;
338-
Ok(Bytes::from(format!("{response}\n")))
344+
345+
// Convert to bytes and explicitly drop the response object
346+
let bytes_result = Bytes::from(format!("{response}\n"));
347+
drop(response); // Explicit cleanup
348+
349+
force_memory_release();
350+
351+
Ok(bytes_result)
339352
}
340353
Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
341354
}
@@ -380,12 +393,19 @@ pub async fn get_counts(
380393
let (records, _) = get_records_and_fields(&query_request, &creds).await?;
381394

382395
if let Some(records) = records {
383-
let json_records = record_batches_to_json(&records)?;
384-
let records = json_records.into_iter().map(Value::Object).collect_vec();
396+
// Use optimized JSON conversion with explicit memory management
397+
let json_records = {
398+
let converted = record_batches_to_json(&records)?;
399+
drop(records); // Explicitly drop the original records early
400+
converted
401+
};
402+
403+
let processed_records: Vec<Value> =
404+
json_records.into_iter().map(Value::Object).collect();
385405

386406
let res = json!({
387407
"fields": vec!["start_time", "endTime", "count"],
388-
"records": records,
408+
"records": processed_records,
389409
});
390410

391411
return Ok(web::Json(res));

src/main.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ use tracing_subscriber::layer::SubscriberExt;
3131
use tracing_subscriber::util::SubscriberInitExt;
3232
use tracing_subscriber::{EnvFilter, Registry, fmt};
3333

34+
// Use jemalloc as the global allocator
35+
#[global_allocator]
36+
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
37+
3438
#[actix_web::main]
3539
async fn main() -> anyhow::Result<()> {
3640
init_logger();

src/metastore/metastores/object_store_metastore.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,7 @@ impl Metastore for ObjectStoreMetastore {
110110
/// Delete an overview
111111
async fn delete_overview(&self, stream: &str) -> Result<(), MetastoreError> {
112112
let path = RelativePathBuf::from_iter([stream, "overview"]);
113-
Ok(self
114-
.storage
115-
.delete_object(&path)
116-
.await?)
113+
Ok(self.storage.delete_object(&path).await?)
117114
}
118115

119116
/// This function fetches all the keystones from the underlying object store

src/response.rs

Lines changed: 70 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616
*
1717
*/
1818

19+
use std::ffi::CString;
20+
use std::sync::Mutex;
21+
use std::time::{Duration, Instant};
22+
1923
use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json};
2024
use datafusion::arrow::record_batch::RecordBatch;
21-
use itertools::Itertools;
2225
use serde_json::{Value, json};
23-
use tracing::info;
26+
use tracing::{debug, info, warn};
2427

2528
pub struct QueryResponse {
2629
pub records: Vec<RecordBatch>,
@@ -32,28 +35,84 @@ pub struct QueryResponse {
3235
impl QueryResponse {
3336
pub fn to_json(&self) -> Result<Value, QueryError> {
3437
info!("{}", "Returning query results");
35-
let mut json_records = record_batches_to_json(&self.records)?;
3638

37-
if self.fill_null {
38-
for map in &mut json_records {
39-
for field in &self.fields {
40-
if !map.contains_key(field) {
41-
map.insert(field.clone(), Value::Null);
39+
// Process in batches to avoid massive allocations
40+
const BATCH_SIZE: usize = 100; // Process 100 record batches at a time
41+
let mut all_values = Vec::new();
42+
43+
for chunk in self.records.chunks(BATCH_SIZE) {
44+
let mut json_records = record_batches_to_json(chunk)?;
45+
46+
if self.fill_null {
47+
for map in &mut json_records {
48+
for field in &self.fields {
49+
if !map.contains_key(field) {
50+
map.insert(field.clone(), Value::Null);
51+
}
4252
}
4353
}
4454
}
55+
56+
// Convert this batch to values and add to collection
57+
let batch_values: Vec<Value> = json_records.into_iter().map(Value::Object).collect();
58+
all_values.extend(batch_values);
4559
}
46-
let values = json_records.into_iter().map(Value::Object).collect_vec();
4760

4861
let response = if self.with_fields {
4962
json!({
5063
"fields": self.fields,
51-
"records": values,
64+
"records": all_values,
5265
})
5366
} else {
54-
Value::Array(values)
67+
Value::Array(all_values)
5568
};
5669

5770
Ok(response)
5871
}
5972
}
73+
74+
impl Drop for QueryResponse {
75+
fn drop(&mut self) {
76+
force_memory_release();
77+
}
78+
}
79+
80+
// Rate-limited memory release with proper error handling
81+
static LAST_PURGE: Mutex<Option<Instant>> = Mutex::new(None);
82+
const PURGE_INTERVAL: Duration = Duration::from_secs(3600); // 1 hour
83+
pub fn force_memory_release() {
84+
{
85+
let mut last_purge = LAST_PURGE.lock().unwrap();
86+
if let Some(last) = *last_purge {
87+
if last.elapsed() < PURGE_INTERVAL {
88+
return;
89+
}
90+
}
91+
*last_purge = Some(Instant::now());
92+
}
93+
94+
// Advance epoch to refresh statistics and trigger potential cleanup
95+
if let Err(e) = tikv_jemalloc_ctl::epoch::mib().and_then(|mib| mib.advance()) {
96+
warn!("Failed to advance jemalloc epoch: {:?}", e);
97+
}
98+
99+
// Purge all arenas using MALLCTL_ARENAS_ALL
100+
if let Ok(arena_purge) = CString::new("arena.4096.purge") {
101+
unsafe {
102+
let ret = tikv_jemalloc_sys::mallctl(
103+
arena_purge.as_ptr(),
104+
std::ptr::null_mut(), // oldp (not reading)
105+
std::ptr::null_mut(), // oldlenp (not reading)
106+
std::ptr::null_mut(), // newp (void operation)
107+
0, // newlen (void operation)
108+
);
109+
if ret != 0 {
110+
warn!("Arena purge failed with code: {}", ret);
111+
} else {
112+
debug!("Successfully purged all jemalloc arenas");
113+
}
114+
}
115+
} else {
116+
warn!("Failed to create CString for arena purge");
117+
}
118+
}

src/utils/arrow/mod.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,12 @@ use crate::event::DEFAULT_TIMESTAMP_KEY;
4848
///
4949
/// A vector of JSON objects representing the record batches.
5050
pub fn record_batches_to_json(records: &[RecordBatch]) -> Result<Vec<Map<String, Value>>> {
51-
let buf = vec![];
51+
// Early return for empty records to avoid unnecessary allocations
52+
if records.is_empty() {
53+
return Ok(Vec::new());
54+
}
55+
56+
let buf = Vec::with_capacity(records.len() * 1024); // Pre-allocate with reasonable capacity
5257
let mut writer = arrow_json::ArrayWriter::new(buf);
5358
for record in records {
5459
writer.write(record)?;
@@ -57,8 +62,11 @@ pub fn record_batches_to_json(records: &[RecordBatch]) -> Result<Vec<Map<String,
5762

5863
let buf = writer.into_inner();
5964

60-
let json_rows: Vec<Map<String, Value>> =
61-
serde_json::from_reader(buf.as_slice()).unwrap_or_default();
65+
// Use a cursor to avoid extra allocations during parsing
66+
let json_rows: Vec<Map<String, Value>> = {
67+
let cursor = std::io::Cursor::new(buf);
68+
serde_json::from_reader(cursor).unwrap_or_else(|_| Vec::with_capacity(0))
69+
};
6270

6371
Ok(json_rows)
6472
}

0 commit comments

Comments
 (0)