Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 48 additions & 31 deletions include/mp/proxy-io.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ struct ServerInvokeContext : InvokeContext
int req;

ServerInvokeContext(ProxyServer& proxy_server, CallContext& call_context, int req)
: InvokeContext{proxy_server.m_connection}, proxy_server{proxy_server}, call_context{call_context}, req{req}
: InvokeContext{*proxy_server.m_context.connection}, proxy_server{proxy_server}, call_context{call_context}, req{req}
{
}
};
Expand Down Expand Up @@ -350,29 +350,27 @@ template <typename Interface, typename Impl>
ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client client,
Connection* connection,
bool destroy_connection)
: m_client(std::move(client)), m_connection(connection), m_destroy_connection(destroy_connection)
: m_client(std::move(client)), m_context(connection)

{
{
std::unique_lock<std::mutex> lock(m_connection->m_loop.m_mutex);
m_connection->m_loop.addClient(lock);
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.addClient(lock);
}
m_cleanup = m_connection->addSyncCleanup([this]() {

// Handler for the connection getting destroyed before this client object.
auto cleanup = m_context.connection->addSyncCleanup([this]() {
// Release client capability by move-assigning to temporary.
{
typename Interface::Client(std::move(self().m_client));
}
{
std::unique_lock<std::mutex> lock(m_connection->m_loop.m_mutex);
m_connection->m_loop.removeClient(lock);
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.removeClient(lock);
}
m_connection = nullptr;
m_context.connection = nullptr;
});
self().construct();
}

template <typename Interface, typename Impl>
ProxyClientBase<Interface, Impl>::~ProxyClientBase() noexcept
{
// Two shutdown sequences are supported:
//
// - A normal sequence where client proxy objects are deleted by external
Expand All @@ -381,43 +379,54 @@ ProxyClientBase<Interface, Impl>::~ProxyClientBase() noexcept
// - A garbage collection sequence where the connection or event loop shuts
// down while external code is still holding client references.
//
// The first case is handled here in destructor when m_loop is not null. The
// second case is handled by the m_cleanup function, which sets m_connection to
// The first case is handled here when m_context.connection is not null. The
// second case is handled by the cleanup function, which sets m_context.connection to
// null so nothing happens here.
if (m_connection) {
// Remove m_cleanup callback so it doesn't run and try to access
m_context.cleanup.emplace_front([this, destroy_connection, cleanup]{
if (m_context.connection) {
// Remove cleanup callback so it doesn't run and try to access
// this object after it's already destroyed.
m_connection->removeSyncCleanup(m_cleanup);
m_context.connection->removeSyncCleanup(cleanup);

// Destroy remote object, waiting for it to deleted server side.
self().destroy();

// FIXME: Could just invoke removed addCleanup fn here instead of duplicating code
m_connection->m_loop.sync([&]() {
m_context.connection->m_loop.sync([&]() {
// Release client capability by move-assigning to temporary.
{
typename Interface::Client(std::move(self().m_client));
}
{
std::unique_lock<std::mutex> lock(m_connection->m_loop.m_mutex);
m_connection->m_loop.removeClient(lock);
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.removeClient(lock);
}

if (m_destroy_connection) {
delete m_connection;
m_connection = nullptr;
if (destroy_connection) {
delete m_context.connection;
m_context.connection = nullptr;
}
});
}
});
self().construct();
}

template <typename Interface, typename Impl>
ProxyClientBase<Interface, Impl>::~ProxyClientBase() noexcept
{
for (auto& cleanup : m_context.cleanup) {
cleanup();
}
}

template <typename Interface, typename Impl>
ProxyServerBase<Interface, Impl>::ProxyServerBase(std::shared_ptr<Impl> impl, Connection& connection)
: m_impl(std::move(impl)), m_connection(connection)
: m_impl(std::move(impl)), m_context(&connection)
{
assert(m_impl);
std::unique_lock<std::mutex> lock(m_connection.m_loop.m_mutex);
m_connection.m_loop.addClient(lock);
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.addClient(lock);
}

template <typename Interface, typename Impl>
Expand All @@ -429,17 +438,25 @@ ProxyServerBase<Interface, Impl>::~ProxyServerBase()
// destructor on, run asynchronously. Do not run destructor on current
// (event loop) thread since destructors could be making IPC calls or
// doing expensive cleanup.
auto impl = std::move(m_impl);
m_connection.addAsyncCleanup([impl]() mutable { impl.reset(); });
m_context.connection->addAsyncCleanup([impl=std::move(m_impl), c=std::move(m_context.cleanup)]() mutable {
impl.reset();
for (auto& cleanup : c) {
cleanup();
}
});
}
std::unique_lock<std::mutex> lock(m_connection.m_loop.m_mutex);
m_connection.m_loop.removeClient(lock);
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.removeClient(lock);
}

template <typename Interface, typename Impl>
void ProxyServerBase<Interface, Impl>::invokeDestroy()
{
m_impl.reset();
for (auto& cleanup : m_context.cleanup) {
cleanup();
}
m_context.cleanup.clear();
}

struct ThreadContext
Expand Down
64 changes: 38 additions & 26 deletions include/mp/proxy-types.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <exception>
#include <optional>
#include <set>
#include <typeindex>
#include <vector>

namespace mp {
Expand Down Expand Up @@ -125,12 +126,12 @@ auto PassField(TypeList<>, ServerContext& server_context, const Fn& fn, const Ar
ServerContext server_context{server, call_context, req};
{
auto& request_threads = g_thread_context.request_threads;
auto request_thread = request_threads.find(&server.m_connection);
auto request_thread = request_threads.find(server.m_context.connection);
if (request_thread == request_threads.end()) {
request_thread =
g_thread_context.request_threads
.emplace(std::piecewise_construct, std::forward_as_tuple(&server.m_connection),
std::forward_as_tuple(context_arg.getCallbackThread(), &server.m_connection,
.emplace(std::piecewise_construct, std::forward_as_tuple(server.m_context.connection),
std::forward_as_tuple(context_arg.getCallbackThread(), server.m_context.connection,
/* destroy_connection= */ false))
.first;
} else {
Expand All @@ -142,32 +143,32 @@ auto PassField(TypeList<>, ServerContext& server_context, const Fn& fn, const Ar
fn.invoke(server_context, args...);
}
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
server.m_connection.m_loop.sync([&] {
server.m_context.connection->m_loop.sync([&] {
auto fulfiller_dispose = kj::mv(fulfiller);
fulfiller_dispose->fulfill(kj::mv(call_context));
});
}))
{
server.m_connection.m_loop.sync([&]() {
server.m_context.connection->m_loop.sync([&]() {
auto fulfiller_dispose = kj::mv(fulfiller);
fulfiller_dispose->reject(kj::mv(*exception));
});
}
})));

auto thread_client = context_arg.getThread();
return JoinPromises(server.m_connection.m_threads.getLocalServer(thread_client)
return JoinPromises(server.m_context.connection->m_threads.getLocalServer(thread_client)
.then([&server, invoke, req](kj::Maybe<Thread::Server&> perhaps) {
KJ_IF_MAYBE(thread_server, perhaps)
{
const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
server.m_connection.m_loop.log() << "IPC server post request #" << req << " {"
server.m_context.connection->m_loop.log() << "IPC server post request #" << req << " {"
<< thread.m_thread_context.thread_name << "}";
thread.m_thread_context.waiter->post(std::move(invoke));
}
else
{
server.m_connection.m_loop.log() << "IPC server error request #" << req
server.m_context.connection->m_loop.log() << "IPC server error request #" << req
<< ", missing thread to execute request";
throw std::runtime_error("invalid thread handle");
}
Expand Down Expand Up @@ -1014,7 +1015,7 @@ auto PassField(TypeList<LocalType&>, ServerContext& server_context, Fn&& fn, Arg
const auto& params = server_context.call_context.getParams();
const auto& input = Make<StructField, Accessor>(params);
using Interface = typename Decay<decltype(input.get())>::Calls;
auto param = std::make_unique<ProxyClient<Interface>>(input.get(), &server_context.proxy_server.m_connection, false);
auto param = std::make_unique<ProxyClient<Interface>>(input.get(), server_context.proxy_server.m_context.connection, false);
fn.invoke(server_context, std::forward<Args>(args)..., *param);
}

Expand Down Expand Up @@ -1345,8 +1346,8 @@ struct CapRequestTraits<::capnp::Request<_Params, _Results>>
template <typename Client>
void clientDestroy(Client& client)
{
if (client.m_connection) {
client.m_connection->m_loop.log() << "IPC client destroy " << typeid(client).name();
if (client.m_context.connection) {
client.m_context.connection->m_loop.log() << "IPC client destroy " << typeid(client).name();
} else {
KJ_LOG(INFO, "IPC interrupted client destroy", typeid(client).name());
}
Expand All @@ -1355,18 +1356,18 @@ void clientDestroy(Client& client)
template <typename Server>
void serverDestroy(Server& server)
{
server.m_connection.m_loop.log() << "IPC server destroy" << typeid(server).name();
server.m_context.connection->m_loop.log() << "IPC server destroy" << typeid(server).name();
}

template <typename ProxyClient, typename GetRequest, typename... FieldObjs>
void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, FieldObjs&&... fields)
{
if (!proxy_client.m_connection) {
if (!proxy_client.m_context.connection) {
throw std::logic_error("clientInvoke call made after disconnect");
}
if (!g_thread_context.waiter) {
assert(g_thread_context.thread_name.empty());
g_thread_context.thread_name = ThreadName(proxy_client.m_connection->m_loop.m_exe_name);
g_thread_context.thread_name = ThreadName(proxy_client.m_context.connection->m_loop.m_exe_name);
// If next assert triggers, it means clientInvoke is being called from
// the capnp event loop thread. This can happen when a ProxyServer
// method implementation that runs synchronously on the event loop
Expand All @@ -1377,26 +1378,26 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
// declaration so the server method runs in a dedicated thread.
assert(!g_thread_context.loop_thread);
g_thread_context.waiter = std::make_unique<Waiter>();
proxy_client.m_connection->m_loop.logPlain()
proxy_client.m_context.connection->m_loop.logPlain()
<< "{" << g_thread_context.thread_name
<< "} IPC client first request from current thread, constructing waiter";
}
ClientInvokeContext invoke_context{*proxy_client.m_connection, g_thread_context};
ClientInvokeContext invoke_context{*proxy_client.m_context.connection, g_thread_context};
std::exception_ptr exception;
std::string kj_exception;
bool done = false;
proxy_client.m_connection->m_loop.sync([&]() {
proxy_client.m_context.connection->m_loop.sync([&]() {
auto request = (proxy_client.m_client.*get_request)(nullptr);
using Request = CapRequestTraits<decltype(request)>;
using FieldList = typename ProxyClientMethodTraits<typename Request::Params>::Fields;
IterateFields().handleChain(invoke_context, request, FieldList(), typename FieldObjs::BuildParams{&fields}...);
proxy_client.m_connection->m_loop.logPlain()
proxy_client.m_context.connection->m_loop.logPlain()
<< "{" << invoke_context.thread_context.thread_name << "} IPC client send "
<< TypeName<typename Request::Params>() << " " << LogEscape(request.toString());

proxy_client.m_connection->m_loop.m_task_set->add(request.send().then(
proxy_client.m_context.connection->m_loop.m_task_set->add(request.send().then(
[&](::capnp::Response<typename Request::Results>&& response) {
proxy_client.m_connection->m_loop.logPlain()
proxy_client.m_context.connection->m_loop.logPlain()
<< "{" << invoke_context.thread_context.thread_name << "} IPC client recv "
<< TypeName<typename Request::Results>() << " " << LogEscape(response.toString());
try {
Expand All @@ -1411,7 +1412,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
},
[&](const ::kj::Exception& e) {
kj_exception = kj::str("kj::Exception: ", e).cStr();
proxy_client.m_connection->m_loop.logPlain()
proxy_client.m_context.connection->m_loop.logPlain()
<< "{" << invoke_context.thread_context.thread_name << "} IPC client exception " << kj_exception;
std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
done = true;
Expand All @@ -1422,7 +1423,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
invoke_context.thread_context.waiter->wait(lock, [&done]() { return done; });
if (exception) std::rethrow_exception(exception);
if (!kj_exception.empty()) proxy_client.m_connection->m_loop.raise() << kj_exception;
if (!kj_exception.empty()) proxy_client.m_context.connection->m_loop.raise() << kj_exception;
}

//! Invoke callable `fn()` that may return void. If it does return void, replace
Expand Down Expand Up @@ -1454,7 +1455,7 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
using Results = typename decltype(call_context.getResults())::Builds;

int req = ++server_reqs;
server.m_connection.m_loop.log() << "IPC server recv request #" << req << " "
server.m_context.connection->m_loop.log() << "IPC server recv request #" << req << " "
<< TypeName<typename Params::Reads>() << " " << LogEscape(params.toString());

try {
Expand All @@ -1464,18 +1465,29 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
return ReplaceVoid([&]() { return fn.invoke(server_context, ArgList()); },
[&]() { return kj::Promise<CallContext>(kj::mv(call_context)); })
.then([&server, req](CallContext call_context) {
server.m_connection.m_loop.log() << "IPC server send response #" << req << " " << TypeName<Results>()
server.m_context.connection->m_loop.log() << "IPC server send response #" << req << " " << TypeName<Results>()
<< " " << LogEscape(call_context.getResults().toString());
});
} catch (const std::exception& e) {
server.m_connection.m_loop.log() << "IPC server unhandled exception: " << e.what();
server.m_context.connection->m_loop.log() << "IPC server unhandled exception: " << e.what();
throw;
} catch (...) {
server.m_connection.m_loop.log() << "IPC server unhandled exception";
server.m_context.connection->m_loop.log() << "IPC server unhandled exception";
throw;
}
}

//! Map to convert client interface pointers to ProxyContext struct references
//! at runtime using typeids.
struct ProxyTypeRegister {
template<typename Interface>
ProxyTypeRegister(TypeList<Interface>) {
types().emplace(typeid(Interface), [](void* iface) -> ProxyContext& { return static_cast<typename mp::ProxyType<Interface>::Client&>(*static_cast<Interface*>(iface)).m_context; });
}
using Types = std::map<std::type_index, ProxyContext&(*)(void*)>;
static Types& types() { static Types types; return types; }
};

} // namespace mp

#endif // MP_PROXY_TYPES_H
16 changes: 11 additions & 5 deletions include/mp/proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ struct ProxyType;
using CleanupList = std::list<std::function<void()>>;
using CleanupIt = typename CleanupList::iterator;

//! Context data associated with proxy client and server classes.
struct ProxyContext
{
Connection* connection;
std::list<std::function<void()>> cleanup;

ProxyContext(Connection* connection) : connection(connection) {}
};

//! Base class for generated ProxyClient classes that implement a C++ interface
//! and forward calls to a capnp interface.
template <typename Interface_, typename Impl_>
Expand All @@ -59,10 +68,7 @@ class ProxyClientBase : public Impl_
ProxyClient<Interface>& self() { return static_cast<ProxyClient<Interface>&>(*this); }

typename Interface::Client m_client;
Connection* m_connection;
bool m_destroy_connection;
CleanupIt m_cleanup; //!< Pointer to self-cleanup callback registered to handle connection object getting destroyed
//!< before this client object.
ProxyContext m_context;
};

//! Customizable (through template specialization) base class used in generated ProxyClient implementations from
Expand Down Expand Up @@ -100,7 +106,7 @@ struct ProxyServerBase : public virtual Interface_::Server
* appropriate times depending on semantics of the particular method being
* wrapped. */
std::shared_ptr<Impl> m_impl;
Connection& m_connection;
ProxyContext m_context;
};

//! Customizable (through template specialization) base class used in generated ProxyServer implementations from
Expand Down
Loading