Skip to content

Commit 03d31b6

Browse files
authored
Merge pull request #375 from geovie/feature/threadsafe_callback
Add ThreadSafeCallback helper to schedule callbacks on the main thread
2 parents 24a4ea1 + 4f3b1f2 commit 03d31b6

File tree

16 files changed

+421
-1
lines changed

16 files changed

+421
-1
lines changed

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ default = ["legacy-runtime"]
3131
# error message formatting from version to version, so they're disabled by default.
3232
enable-static-tests = []
3333

34+
# Enable the EventHandler API of RFC 25.
35+
event-handler-api = []
36+
3437
# Enable the default panic hook. Useful for debugging neon itself.
3538
default-panic-hook = []
3639

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
//! Facilities for running a callback in the libuv main thread.
2+
3+
/// Creates a new event handler which can be used to execute a callback in the libuv main thread
4+
pub use neon_sys::Neon_EventHandler_New as new;
5+
/// Executes the given callback in the libuv main thread
6+
pub use neon_sys::Neon_EventHandler_Schedule as schedule;
7+
// Free the thread safe callback and any memory hold
8+
pub use neon_sys::Neon_EventHandler_Delete as delete;

crates/neon-runtime/src/nan/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@ pub mod fun;
1515
pub mod convert;
1616
pub mod class;
1717
pub mod task;
18+
pub mod handler;
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
use raw::Local;
2+
use std::os::raw::c_void;
3+
4+
pub unsafe extern "C" fn new(_isolate: *mut c_void, _this: Local, _callback: Local) -> *mut c_void { unimplemented!() }
5+
pub unsafe extern "C" fn schedule(_thread_safe_cb: *mut c_void, _rust_callback: *mut c_void,
6+
_complete: unsafe extern fn(Local, Local, *mut c_void)) { unimplemented!() }
7+
pub unsafe extern "C" fn delete(_thread_safe_cb: *mut c_void) { unimplemented!() }

crates/neon-runtime/src/napi/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@ pub mod scope;
1414
pub mod string;
1515
pub mod tag;
1616
pub mod task;
17+
pub mod handler;

crates/neon-sys/native/src/neon.cc

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include "neon_string.h"
99
#include "neon_class_metadata.h"
1010
#include "neon_task.h"
11+
#include "neon_event.h"
1112

1213
extern "C" void Neon_Call_SetReturn(v8::FunctionCallbackInfo<v8::Value> *info, v8::Local<v8::Value> value) {
1314
info->GetReturnValue().Set(value);
@@ -520,3 +521,17 @@ extern "C" void Neon_Task_Schedule(void *task, Neon_TaskPerformCallback perform,
520521
neon::Task *internal_task = new neon::Task(isolate, task, perform, complete, callback);
521522
neon::queue_task(internal_task);
522523
}
524+
525+
extern "C" void* Neon_EventHandler_New(v8::Isolate *isolate, v8::Local<v8::Value> self, v8::Local<v8::Function> callback) {
526+
return new neon::EventHandler(isolate, self, callback);
527+
}
528+
529+
extern "C" void Neon_EventHandler_Schedule(void *thread_safe_cb, void *rust_callback, Neon_EventHandler handler) {
530+
neon::EventHandler *cb = static_cast<neon::EventHandler*>(thread_safe_cb);
531+
cb->schedule(rust_callback, handler);
532+
}
533+
534+
extern "C" void Neon_EventHandler_Delete(void * thread_safe_cb) {
535+
neon::EventHandler *cb = static_cast<neon::EventHandler*>(thread_safe_cb);
536+
cb->close();
537+
}

crates/neon-sys/native/src/neon.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,12 @@ extern "C" {
133133
typedef void (*Neon_TaskCompleteCallback)(void *, void *, v8::Local<v8::Value> *out);
134134

135135
void Neon_Task_Schedule(void *task, Neon_TaskPerformCallback perform, Neon_TaskCompleteCallback complete, v8::Local<v8::Function> callback);
136+
137+
typedef void (*Neon_EventHandler)(v8::Local<v8::Value> self, v8::Local<v8::Value> callback, void* arg_cb);
138+
139+
void* Neon_EventHandler_New(v8::Isolate *isolate, v8::Local<v8::Value> self, v8::Local<v8::Function> callback);
140+
void Neon_EventHandler_Schedule(void* thread_safe_cb, void* rust_callback, Neon_EventHandler handler);
141+
void Neon_EventHandler_Delete(void* thread_safe_cb);
136142
}
137143

138144
#endif
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
#ifndef NEON_EVENTHANDLER_H_
2+
#define NEON_EVENTHANDLER_H_
3+
4+
#include <uv.h>
5+
#include "neon.h"
6+
#include "v8.h"
7+
#include <mutex>
8+
9+
namespace neon {
10+
11+
// THe implementation of this class was adapted from
12+
// https://github.com/mika-fischer/napi-thread-safe-callback
13+
class EventHandler {
14+
public:
15+
EventHandler(v8::Isolate *isolate,
16+
v8::Local<v8::Value> self,
17+
v8::Local<v8::Function> callback): isolate_(isolate), close_(false)
18+
{
19+
async_.data = this;
20+
uv_async_init(uv_default_loop(), &async_, async_complete);
21+
// Save the this argument and the callback to be invoked.
22+
self_.Reset(isolate, self);
23+
callback_.Reset(isolate, callback);
24+
// Save the context (aka realm) to be used when invoking the callback.
25+
context_.Reset(isolate, isolate->GetCurrentContext());
26+
}
27+
28+
void schedule(void *rust_callback, Neon_EventHandler handler) {
29+
{
30+
std::lock_guard<std::mutex> lock(mutex_);
31+
handlers_.push_back({ rust_callback, handler });
32+
}
33+
uv_async_send(&async_);
34+
}
35+
36+
void close() {
37+
// close is called when the rust struct is dropped
38+
// this guarantees that it's called only once and
39+
// that no other method (call) will be called after it.
40+
close_ = true;
41+
uv_async_send(&async_);
42+
}
43+
44+
void complete() {
45+
// Ensure that we have all the proper scopes installed on the C++ stack before
46+
// invoking the callback, and use the context (i.e. realm) we saved with the task.
47+
v8::Isolate::Scope isolate_scope(isolate_);
48+
v8::HandleScope handle_scope(isolate_);
49+
v8::Local<v8::Context> context = v8::Local<v8::Context>::New(isolate_, context_);
50+
v8::Context::Scope context_scope(context);
51+
52+
v8::Local<v8::Value> self = v8::Local<v8::Value>::New(isolate_, self_);
53+
v8::Local<v8::Function> callback = v8::Local<v8::Function>::New(isolate_, callback_);
54+
55+
while (true) {
56+
std::vector<HandlerData> handlers;
57+
{
58+
std::lock_guard<std::mutex> lock(mutex_);
59+
if (handlers_.empty()) {
60+
break;
61+
} else {
62+
handlers.swap(handlers_);
63+
}
64+
}
65+
for (const HandlerData &data : handlers) {
66+
data.handler(self, callback, data.rust_callback);
67+
}
68+
}
69+
70+
if (close_) {
71+
uv_close(reinterpret_cast<uv_handle_t*>(&async_), [](uv_handle_t* handle) {
72+
delete static_cast<EventHandler*>(handle->data);
73+
});
74+
}
75+
}
76+
77+
private:
78+
static void async_complete(uv_async_t* handle) {
79+
EventHandler* cb = static_cast<EventHandler*>(handle->data);
80+
cb->complete();
81+
}
82+
83+
uv_async_t async_;
84+
v8::Isolate *isolate_;
85+
v8::Persistent<v8::Value> self_;
86+
v8::Persistent<v8::Function> callback_;
87+
v8::Persistent<v8::Context> context_;
88+
89+
struct HandlerData {
90+
void *rust_callback;
91+
Neon_EventHandler handler;
92+
};
93+
94+
std::mutex mutex_;
95+
std::vector<HandlerData> handlers_;
96+
97+
bool close_;
98+
};
99+
}
100+
101+
#endif

crates/neon-sys/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,4 +198,8 @@ extern "C" {
198198
complete: unsafe extern fn(*mut c_void, *mut c_void, &mut Local),
199199
callback: Local);
200200

201+
pub fn Neon_EventHandler_New(isolate: Isolate, this: Local, callback: Local) -> *mut c_void;
202+
pub fn Neon_EventHandler_Schedule(thread_safe_cb: *mut c_void, rust_callback: *mut c_void,
203+
complete: unsafe extern fn(Local, Local, *mut c_void));
204+
pub fn Neon_EventHandler_Delete(thread_safe_cb: *mut c_void);
201205
}

src/event/mod.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
//! Helper to run a callback in the libuv main thread.
2+
3+
use std::mem;
4+
use std::os::raw::c_void;
5+
6+
use types::*;
7+
use handle::{Handle, Managed};
8+
use neon_runtime;
9+
use neon_runtime::raw;
10+
use std::sync::Arc;
11+
use context::Context;
12+
13+
type EventContext<'a> = crate::context::TaskContext<'a>;
14+
15+
struct EventHandlerInner(*mut c_void);
16+
17+
unsafe impl Send for EventHandlerInner {}
18+
unsafe impl Sync for EventHandlerInner {}
19+
20+
impl Drop for EventHandlerInner {
21+
fn drop(&mut self) {
22+
unsafe {
23+
neon_runtime::handler::delete(self.0);
24+
}
25+
}
26+
}
27+
28+
#[derive(Clone)]
29+
pub struct EventHandler(Arc<EventHandlerInner>);
30+
31+
impl EventHandler {
32+
#[cfg(feature = "napi-runtime")]
33+
pub fn new<'a, C: Context<'a>, T: Value>(cx: &C, this: Handle<T>, callback: Handle<JsFunction>) -> Self {
34+
unimplemented!()
35+
}
36+
37+
#[cfg(feature = "legacy-runtime")]
38+
pub fn new<'a, C: Context<'a>, T: Value>(cx: &C, this: Handle<T>, callback: Handle<JsFunction>) -> Self {
39+
let cb = unsafe {
40+
neon_runtime::handler::new(cx.env().to_raw(), this.to_raw(), callback.to_raw())
41+
};
42+
EventHandler(Arc::new(EventHandlerInner(cb)))
43+
}
44+
45+
pub fn schedule<T, F>(&self, arg_cb: F)
46+
where T: Value,
47+
F: for<'a> FnOnce(&mut EventContext<'a>) -> Vec<Handle<'a, T>>,
48+
F: Send + 'static {
49+
self.schedule_with(move |cx, this, callback| {
50+
let args = arg_cb(cx);
51+
let _result = callback.call(cx, this, args);
52+
})
53+
}
54+
55+
pub fn schedule_with<F>(&self, arg_cb: F)
56+
where F: FnOnce(&mut EventContext, Handle<JsValue>, Handle<JsFunction>),
57+
F: Send + 'static {
58+
let callback = Box::into_raw(Box::new(arg_cb)) as *mut c_void;
59+
unsafe {
60+
neon_runtime::handler::schedule((*self.0).0, callback, handle_callback::<F>);
61+
}
62+
}
63+
}
64+
65+
unsafe extern "C" fn handle_callback<F>(this: raw::Local, func: raw::Local, callback: *mut c_void)
66+
where F: FnOnce(&mut EventContext, Handle<JsValue>, Handle<JsFunction>), F: Send + 'static {
67+
EventContext::with(|mut cx: EventContext| {
68+
let this = JsValue::new_internal(this);
69+
let func: Handle<JsFunction> = Handle::new_internal(JsFunction::from_raw(func));
70+
let callback: Box<F> = Box::from_raw(mem::transmute(callback));
71+
callback(&mut cx, this, func);
72+
})
73+
}

0 commit comments

Comments
 (0)