Skip to content

Commit a6068c2

Browse files
timsauceralamb
andauthored
FFI_RecordBatchStream was causing a memory leak (#17190) (#17270)
Co-authored-by: Andrew Lamb <[email protected]>
1 parent 374fcec commit a6068c2

File tree

1 file changed

+16
-0
lines changed

1 file changed

+16
-0
lines changed

datafusion/ffi/src/record_batch_stream.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ pub struct FFI_RecordBatchStream {
5757
/// Return the schema of the record batch
5858
pub schema: unsafe extern "C" fn(stream: &Self) -> WrappedSchema,
5959

60+
/// Release the memory of the private data when it is no longer being used.
61+
pub release: unsafe extern "C" fn(arg: &mut Self),
62+
6063
/// Internal data. This is only to be accessed by the provider of the plan.
6164
/// The foreign library should never attempt to access this data.
6265
pub private_data: *mut c_void,
@@ -82,6 +85,7 @@ impl FFI_RecordBatchStream {
8285
FFI_RecordBatchStream {
8386
poll_next: poll_next_fn_wrapper,
8487
schema: schema_fn_wrapper,
88+
release: release_fn_wrapper,
8589
private_data,
8690
}
8791
}
@@ -96,6 +100,12 @@ unsafe extern "C" fn schema_fn_wrapper(stream: &FFI_RecordBatchStream) -> Wrappe
96100
(*stream).schema().into()
97101
}
98102

103+
unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_RecordBatchStream) {
104+
let private_data =
105+
Box::from_raw(provider.private_data as *mut RecordBatchStreamPrivateData);
106+
drop(private_data);
107+
}
108+
99109
fn record_batch_to_wrapped_array(
100110
record_batch: RecordBatch,
101111
) -> RResult<WrappedArray, RString> {
@@ -197,6 +207,12 @@ impl Stream for FFI_RecordBatchStream {
197207
}
198208
}
199209

210+
impl Drop for FFI_RecordBatchStream {
211+
fn drop(&mut self) {
212+
unsafe { (self.release)(self) }
213+
}
214+
}
215+
200216
#[cfg(test)]
201217
mod tests {
202218
use std::sync::Arc;

0 commit comments

Comments
 (0)