From 39ad0f5eaecb96938af78e99bcd4f0f1a939271a Mon Sep 17 00:00:00 2001 From: Russell Yanofsky Date: Mon, 25 Jan 2021 18:20:01 -0500 Subject: [PATCH 1/4] Generate ProxyType traits for interface types --- src/mp/gen.cpp | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/mp/gen.cpp b/src/mp/gen.cpp index a0bc9733..05d00475 100644 --- a/src/mp/gen.cpp +++ b/src/mp/gen.cpp @@ -532,6 +532,18 @@ void Generate(kj::StringPtr src_prefix, client << "};\n"; server << "};\n"; dec << "\n" << client.str() << "\n" << server.str() << "\n"; + KJ_IF_MAYBE(bracket, proxied_class_type.findFirst('<')) { + // Skip ProxyType definition for complex type expressions which + // could lead to duplicate definitions. They can be defined + // manually if actually needed. + } else { + dec << "template<>\nstruct ProxyType<" << proxied_class_type << ">\n{\n"; + dec << " using Type = " << proxied_class_type << ";\n"; + dec << " using Message = " << message_namespace << "::" << node_name << ";\n"; + dec << " using Client = ProxyClient;\n"; + dec << " using Server = ProxyServer;\n"; + dec << "};\n"; + } def_types << "ProxyClient<" << message_namespace << "::" << node_name << ">::~ProxyClient() { clientDestroy(*this); " << client_destroy.str() << " }\n"; def_types << "ProxyServer<" << message_namespace << "::" << node_name From 34e9b78ec16d50132d92256aa3a2af1a699d5d7a Mon Sep 17 00:00:00 2001 From: Russell Yanofsky Date: Mon, 25 Jan 2021 19:51:52 -0500 Subject: [PATCH 2/4] refactor: Move connection field to ProxyContext struct No change in behavior --- include/mp/proxy-io.h | 46 +++++++++++++++++------------------ include/mp/proxy-types.h | 52 ++++++++++++++++++++-------------------- include/mp/proxy.h | 12 ++++++++-- 3 files changed, 59 insertions(+), 51 deletions(-) diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index 81642111..d18ba24c 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -46,7 +46,7 @@ struct ServerInvokeContext : InvokeContext int req; ServerInvokeContext(ProxyServer& proxy_server, CallContext& call_context, int req) - : InvokeContext{proxy_server.m_connection}, proxy_server{proxy_server}, call_context{call_context}, req{req} + : InvokeContext{*proxy_server.m_context.connection}, proxy_server{proxy_server}, call_context{call_context}, req{req} { } }; @@ -350,22 +350,22 @@ template ProxyClientBase::ProxyClientBase(typename Interface::Client client, Connection* connection, bool destroy_connection) - : m_client(std::move(client)), m_connection(connection), m_destroy_connection(destroy_connection) + : m_client(std::move(client)), m_context(connection), m_destroy_connection(destroy_connection) { { - std::unique_lock lock(m_connection->m_loop.m_mutex); - m_connection->m_loop.addClient(lock); + std::unique_lock lock(m_context.connection->m_loop.m_mutex); + m_context.connection->m_loop.addClient(lock); } - m_cleanup = m_connection->addSyncCleanup([this]() { + m_cleanup = m_context.connection->addSyncCleanup([this]() { // Release client capability by move-assigning to temporary. { typename Interface::Client(std::move(self().m_client)); } { - std::unique_lock lock(m_connection->m_loop.m_mutex); - m_connection->m_loop.removeClient(lock); + std::unique_lock lock(m_context.connection->m_loop.m_mutex); + m_context.connection->m_loop.removeClient(lock); } - m_connection = nullptr; + m_context.connection = nullptr; }); self().construct(); } @@ -381,31 +381,31 @@ ProxyClientBase::~ProxyClientBase() noexcept // - A garbage collection sequence where the connection or event loop shuts // down while external code is still holding client references. // - // The first case is handled here in destructor when m_loop is not null. The - // second case is handled by the m_cleanup function, which sets m_connection to + // The first case is handled here when m_context.connection is not null. The + // second case is handled by the cleanup function, which sets m_context.connection to // null so nothing happens here. - if (m_connection) { + if (m_context.connection) { // Remove m_cleanup callback so it doesn't run and try to access // this object after it's already destroyed. - m_connection->removeSyncCleanup(m_cleanup); + m_context.connection->removeSyncCleanup(m_cleanup); // Destroy remote object, waiting for it to deleted server side. self().destroy(); // FIXME: Could just invoke removed addCleanup fn here instead of duplicating code - m_connection->m_loop.sync([&]() { + m_context.connection->m_loop.sync([&]() { // Release client capability by move-assigning to temporary. { typename Interface::Client(std::move(self().m_client)); } { - std::unique_lock lock(m_connection->m_loop.m_mutex); - m_connection->m_loop.removeClient(lock); + std::unique_lock lock(m_context.connection->m_loop.m_mutex); + m_context.connection->m_loop.removeClient(lock); } if (m_destroy_connection) { - delete m_connection; - m_connection = nullptr; + delete m_context.connection; + m_context.connection = nullptr; } }); } @@ -413,11 +413,11 @@ ProxyClientBase::~ProxyClientBase() noexcept template ProxyServerBase::ProxyServerBase(std::shared_ptr impl, Connection& connection) - : m_impl(std::move(impl)), m_connection(connection) + : m_impl(std::move(impl)), m_context(&connection) { assert(m_impl); - std::unique_lock lock(m_connection.m_loop.m_mutex); - m_connection.m_loop.addClient(lock); + std::unique_lock lock(m_context.connection->m_loop.m_mutex); + m_context.connection->m_loop.addClient(lock); } template @@ -430,10 +430,10 @@ ProxyServerBase::~ProxyServerBase() // (event loop) thread since destructors could be making IPC calls or // doing expensive cleanup. auto impl = std::move(m_impl); - m_connection.addAsyncCleanup([impl]() mutable { impl.reset(); }); + m_context.connection->addAsyncCleanup([impl]() mutable { impl.reset(); }); } - std::unique_lock lock(m_connection.m_loop.m_mutex); - m_connection.m_loop.removeClient(lock); + std::unique_lock lock(m_context.connection->m_loop.m_mutex); + m_context.connection->m_loop.removeClient(lock); } template diff --git a/include/mp/proxy-types.h b/include/mp/proxy-types.h index 743b210c..3da8c1c6 100644 --- a/include/mp/proxy-types.h +++ b/include/mp/proxy-types.h @@ -125,12 +125,12 @@ auto PassField(TypeList<>, ServerContext& server_context, const Fn& fn, const Ar ServerContext server_context{server, call_context, req}; { auto& request_threads = g_thread_context.request_threads; - auto request_thread = request_threads.find(&server.m_connection); + auto request_thread = request_threads.find(server.m_context.connection); if (request_thread == request_threads.end()) { request_thread = g_thread_context.request_threads - .emplace(std::piecewise_construct, std::forward_as_tuple(&server.m_connection), - std::forward_as_tuple(context_arg.getCallbackThread(), &server.m_connection, + .emplace(std::piecewise_construct, std::forward_as_tuple(server.m_context.connection), + std::forward_as_tuple(context_arg.getCallbackThread(), server.m_context.connection, /* destroy_connection= */ false)) .first; } else { @@ -142,13 +142,13 @@ auto PassField(TypeList<>, ServerContext& server_context, const Fn& fn, const Ar fn.invoke(server_context, args...); } KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { - server.m_connection.m_loop.sync([&] { + server.m_context.connection->m_loop.sync([&] { auto fulfiller_dispose = kj::mv(fulfiller); fulfiller_dispose->fulfill(kj::mv(call_context)); }); })) { - server.m_connection.m_loop.sync([&]() { + server.m_context.connection->m_loop.sync([&]() { auto fulfiller_dispose = kj::mv(fulfiller); fulfiller_dispose->reject(kj::mv(*exception)); }); @@ -156,18 +156,18 @@ auto PassField(TypeList<>, ServerContext& server_context, const Fn& fn, const Ar }))); auto thread_client = context_arg.getThread(); - return JoinPromises(server.m_connection.m_threads.getLocalServer(thread_client) + return JoinPromises(server.m_context.connection->m_threads.getLocalServer(thread_client) .then([&server, invoke, req](kj::Maybe perhaps) { KJ_IF_MAYBE(thread_server, perhaps) { const auto& thread = static_cast&>(*thread_server); - server.m_connection.m_loop.log() << "IPC server post request #" << req << " {" + server.m_context.connection->m_loop.log() << "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}"; thread.m_thread_context.waiter->post(std::move(invoke)); } else { - server.m_connection.m_loop.log() << "IPC server error request #" << req + server.m_context.connection->m_loop.log() << "IPC server error request #" << req << ", missing thread to execute request"; throw std::runtime_error("invalid thread handle"); } @@ -1014,7 +1014,7 @@ auto PassField(TypeList, ServerContext& server_context, Fn&& fn, Arg const auto& params = server_context.call_context.getParams(); const auto& input = Make(params); using Interface = typename Decay::Calls; - auto param = std::make_unique>(input.get(), &server_context.proxy_server.m_connection, false); + auto param = std::make_unique>(input.get(), server_context.proxy_server.m_context.connection, false); fn.invoke(server_context, std::forward(args)..., *param); } @@ -1345,8 +1345,8 @@ struct CapRequestTraits<::capnp::Request<_Params, _Results>> template void clientDestroy(Client& client) { - if (client.m_connection) { - client.m_connection->m_loop.log() << "IPC client destroy " << typeid(client).name(); + if (client.m_context.connection) { + client.m_context.connection->m_loop.log() << "IPC client destroy " << typeid(client).name(); } else { KJ_LOG(INFO, "IPC interrupted client destroy", typeid(client).name()); } @@ -1355,18 +1355,18 @@ void clientDestroy(Client& client) template void serverDestroy(Server& server) { - server.m_connection.m_loop.log() << "IPC server destroy" << typeid(server).name(); + server.m_context.connection->m_loop.log() << "IPC server destroy" << typeid(server).name(); } template void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, FieldObjs&&... fields) { - if (!proxy_client.m_connection) { + if (!proxy_client.m_context.connection) { throw std::logic_error("clientInvoke call made after disconnect"); } if (!g_thread_context.waiter) { assert(g_thread_context.thread_name.empty()); - g_thread_context.thread_name = ThreadName(proxy_client.m_connection->m_loop.m_exe_name); + g_thread_context.thread_name = ThreadName(proxy_client.m_context.connection->m_loop.m_exe_name); // If next assert triggers, it means clientInvoke is being called from // the capnp event loop thread. This can happen when a ProxyServer // method implementation that runs synchronously on the event loop @@ -1377,26 +1377,26 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel // declaration so the server method runs in a dedicated thread. assert(!g_thread_context.loop_thread); g_thread_context.waiter = std::make_unique(); - proxy_client.m_connection->m_loop.logPlain() + proxy_client.m_context.connection->m_loop.logPlain() << "{" << g_thread_context.thread_name << "} IPC client first request from current thread, constructing waiter"; } - ClientInvokeContext invoke_context{*proxy_client.m_connection, g_thread_context}; + ClientInvokeContext invoke_context{*proxy_client.m_context.connection, g_thread_context}; std::exception_ptr exception; std::string kj_exception; bool done = false; - proxy_client.m_connection->m_loop.sync([&]() { + proxy_client.m_context.connection->m_loop.sync([&]() { auto request = (proxy_client.m_client.*get_request)(nullptr); using Request = CapRequestTraits; using FieldList = typename ProxyClientMethodTraits::Fields; IterateFields().handleChain(invoke_context, request, FieldList(), typename FieldObjs::BuildParams{&fields}...); - proxy_client.m_connection->m_loop.logPlain() + proxy_client.m_context.connection->m_loop.logPlain() << "{" << invoke_context.thread_context.thread_name << "} IPC client send " << TypeName() << " " << LogEscape(request.toString()); - proxy_client.m_connection->m_loop.m_task_set->add(request.send().then( + proxy_client.m_context.connection->m_loop.m_task_set->add(request.send().then( [&](::capnp::Response&& response) { - proxy_client.m_connection->m_loop.logPlain() + proxy_client.m_context.connection->m_loop.logPlain() << "{" << invoke_context.thread_context.thread_name << "} IPC client recv " << TypeName() << " " << LogEscape(response.toString()); try { @@ -1411,7 +1411,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel }, [&](const ::kj::Exception& e) { kj_exception = kj::str("kj::Exception: ", e).cStr(); - proxy_client.m_connection->m_loop.logPlain() + proxy_client.m_context.connection->m_loop.logPlain() << "{" << invoke_context.thread_context.thread_name << "} IPC client exception " << kj_exception; std::unique_lock lock(invoke_context.thread_context.waiter->m_mutex); done = true; @@ -1422,7 +1422,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel std::unique_lock lock(invoke_context.thread_context.waiter->m_mutex); invoke_context.thread_context.waiter->wait(lock, [&done]() { return done; }); if (exception) std::rethrow_exception(exception); - if (!kj_exception.empty()) proxy_client.m_connection->m_loop.raise() << kj_exception; + if (!kj_exception.empty()) proxy_client.m_context.connection->m_loop.raise() << kj_exception; } //! Invoke callable `fn()` that may return void. If it does return void, replace @@ -1454,7 +1454,7 @@ kj::Promise serverInvoke(Server& server, CallContext& call_context, Fn fn) using Results = typename decltype(call_context.getResults())::Builds; int req = ++server_reqs; - server.m_connection.m_loop.log() << "IPC server recv request #" << req << " " + server.m_context.connection->m_loop.log() << "IPC server recv request #" << req << " " << TypeName() << " " << LogEscape(params.toString()); try { @@ -1464,14 +1464,14 @@ kj::Promise serverInvoke(Server& server, CallContext& call_context, Fn fn) return ReplaceVoid([&]() { return fn.invoke(server_context, ArgList()); }, [&]() { return kj::Promise(kj::mv(call_context)); }) .then([&server, req](CallContext call_context) { - server.m_connection.m_loop.log() << "IPC server send response #" << req << " " << TypeName() + server.m_context.connection->m_loop.log() << "IPC server send response #" << req << " " << TypeName() << " " << LogEscape(call_context.getResults().toString()); }); } catch (const std::exception& e) { - server.m_connection.m_loop.log() << "IPC server unhandled exception: " << e.what(); + server.m_context.connection->m_loop.log() << "IPC server unhandled exception: " << e.what(); throw; } catch (...) { - server.m_connection.m_loop.log() << "IPC server unhandled exception"; + server.m_context.connection->m_loop.log() << "IPC server unhandled exception"; throw; } } diff --git a/include/mp/proxy.h b/include/mp/proxy.h index ff828884..1d18a3ec 100644 --- a/include/mp/proxy.h +++ b/include/mp/proxy.h @@ -39,6 +39,14 @@ struct ProxyType; using CleanupList = std::list>; using CleanupIt = typename CleanupList::iterator; +//! Context data associated with proxy client and server classes. +struct ProxyContext +{ + Connection* connection; + + ProxyContext(Connection* connection) : connection(connection) {} +}; + //! Base class for generated ProxyClient classes that implement a C++ interface //! and forward calls to a capnp interface. template @@ -59,7 +67,7 @@ class ProxyClientBase : public Impl_ ProxyClient& self() { return static_cast&>(*this); } typename Interface::Client m_client; - Connection* m_connection; + ProxyContext m_context; bool m_destroy_connection; CleanupIt m_cleanup; //!< Pointer to self-cleanup callback registered to handle connection object getting destroyed //!< before this client object. @@ -100,7 +108,7 @@ struct ProxyServerBase : public virtual Interface_::Server * appropriate times depending on semantics of the particular method being * wrapped. */ std::shared_ptr m_impl; - Connection& m_connection; + ProxyContext m_context; }; //! Customizable (through template specialization) base class used in generated ProxyServer implementations from From fbdaaa75aa582541163a3530e1c167751a133807 Mon Sep 17 00:00:00 2001 From: Russell Yanofsky Date: Mon, 25 Jan 2021 20:44:36 -0500 Subject: [PATCH 3/4] Add cleanup callbacks to ProxyContext Support attaching custom cleanup functions to proxy client and server classes --- include/mp/proxy-io.h | 41 +++++++++++++++++++++++++++------------ include/mp/proxy.h | 4 +--- test/src/mp/test/test.cpp | 5 +++++ 3 files changed, 35 insertions(+), 15 deletions(-) diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index d18ba24c..d853c551 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -350,13 +350,16 @@ template ProxyClientBase::ProxyClientBase(typename Interface::Client client, Connection* connection, bool destroy_connection) - : m_client(std::move(client)), m_context(connection), m_destroy_connection(destroy_connection) + : m_client(std::move(client)), m_context(connection) + { { std::unique_lock lock(m_context.connection->m_loop.m_mutex); m_context.connection->m_loop.addClient(lock); } - m_cleanup = m_context.connection->addSyncCleanup([this]() { + + // Handler for the connection getting destroyed before this client object. + auto cleanup = m_context.connection->addSyncCleanup([this]() { // Release client capability by move-assigning to temporary. { typename Interface::Client(std::move(self().m_client)); @@ -367,12 +370,7 @@ ProxyClientBase::ProxyClientBase(typename Interface::Client cli } m_context.connection = nullptr; }); - self().construct(); -} -template -ProxyClientBase::~ProxyClientBase() noexcept -{ // Two shutdown sequences are supported: // // - A normal sequence where client proxy objects are deleted by external @@ -384,10 +382,11 @@ ProxyClientBase::~ProxyClientBase() noexcept // The first case is handled here when m_context.connection is not null. The // second case is handled by the cleanup function, which sets m_context.connection to // null so nothing happens here. + m_context.cleanup.emplace_front([this, destroy_connection, cleanup]{ if (m_context.connection) { - // Remove m_cleanup callback so it doesn't run and try to access + // Remove cleanup callback so it doesn't run and try to access // this object after it's already destroyed. - m_context.connection->removeSyncCleanup(m_cleanup); + m_context.connection->removeSyncCleanup(cleanup); // Destroy remote object, waiting for it to deleted server side. self().destroy(); @@ -403,12 +402,22 @@ ProxyClientBase::~ProxyClientBase() noexcept m_context.connection->m_loop.removeClient(lock); } - if (m_destroy_connection) { + if (destroy_connection) { delete m_context.connection; m_context.connection = nullptr; } }); } + }); + self().construct(); +} + +template +ProxyClientBase::~ProxyClientBase() noexcept +{ + for (auto& cleanup : m_context.cleanup) { + cleanup(); + } } template @@ -429,8 +438,12 @@ ProxyServerBase::~ProxyServerBase() // destructor on, run asynchronously. Do not run destructor on current // (event loop) thread since destructors could be making IPC calls or // doing expensive cleanup. - auto impl = std::move(m_impl); - m_context.connection->addAsyncCleanup([impl]() mutable { impl.reset(); }); + m_context.connection->addAsyncCleanup([impl=std::move(m_impl), c=std::move(m_context.cleanup)]() mutable { + impl.reset(); + for (auto& cleanup : c) { + cleanup(); + } + }); } std::unique_lock lock(m_context.connection->m_loop.m_mutex); m_context.connection->m_loop.removeClient(lock); @@ -440,6 +453,10 @@ template void ProxyServerBase::invokeDestroy() { m_impl.reset(); + for (auto& cleanup : m_context.cleanup) { + cleanup(); + } + m_context.cleanup.clear(); } struct ThreadContext diff --git a/include/mp/proxy.h b/include/mp/proxy.h index 1d18a3ec..602c4ae4 100644 --- a/include/mp/proxy.h +++ b/include/mp/proxy.h @@ -43,6 +43,7 @@ using CleanupIt = typename CleanupList::iterator; struct ProxyContext { Connection* connection; + std::list> cleanup; ProxyContext(Connection* connection) : connection(connection) {} }; @@ -68,9 +69,6 @@ class ProxyClientBase : public Impl_ typename Interface::Client m_client; ProxyContext m_context; - bool m_destroy_connection; - CleanupIt m_cleanup; //!< Pointer to self-cleanup callback registered to handle connection object getting destroyed - //!< before this client object. }; //! Customizable (through template specialization) base class used in generated ProxyClient implementations from diff --git a/test/src/mp/test/test.cpp b/test/src/mp/test/test.cpp index 9c278250..54993ccc 100644 --- a/test/src/mp/test/test.cpp +++ b/test/src/mp/test/test.cpp @@ -88,6 +88,11 @@ KJ_TEST("Call FooInterface methods") disconnect_client(); thread.join(); + + bool destroyed = false; + foo->m_context.cleanup.emplace_front([&destroyed]{ destroyed = true; }); + foo.reset(); + KJ_EXPECT(destroyed); } } // namespace test From ce8e8b654a4384c5e2560c3b5cee4ab8eb5344d6 Mon Sep 17 00:00:00 2001 From: Russell Yanofsky Date: Wed, 27 Jan 2021 21:39:09 -0500 Subject: [PATCH 4/4] Add ProxyTypeRegister typeid map --- include/mp/proxy-types.h | 12 ++++++++++++ src/mp/gen.cpp | 3 +++ 2 files changed, 15 insertions(+) diff --git a/include/mp/proxy-types.h b/include/mp/proxy-types.h index 3da8c1c6..99bf4569 100644 --- a/include/mp/proxy-types.h +++ b/include/mp/proxy-types.h @@ -10,6 +10,7 @@ #include #include #include +#include #include namespace mp { @@ -1476,6 +1477,17 @@ kj::Promise serverInvoke(Server& server, CallContext& call_context, Fn fn) } } +//! Map to convert client interface pointers to ProxyContext struct references +//! at runtime using typeids. +struct ProxyTypeRegister { + template + ProxyTypeRegister(TypeList) { + types().emplace(typeid(Interface), [](void* iface) -> ProxyContext& { return static_cast::Client&>(*static_cast(iface)).m_context; }); + } + using Types = std::map; + static Types& types() { static Types types; return types; } +}; + } // namespace mp #endif // MP_PROXY_TYPES_H diff --git a/src/mp/gen.cpp b/src/mp/gen.cpp index 05d00475..a5c7f60c 100644 --- a/src/mp/gen.cpp +++ b/src/mp/gen.cpp @@ -222,6 +222,7 @@ void Generate(kj::StringPtr src_prefix, std::ostringstream dec; std::ostringstream def_server; std::ostringstream def_client; + std::ostringstream int_client; std::ostringstream def_types; auto add_accessor = [&](kj::StringPtr name) { @@ -543,6 +544,7 @@ void Generate(kj::StringPtr src_prefix, dec << " using Client = ProxyClient;\n"; dec << " using Server = ProxyServer;\n"; dec << "};\n"; + int_client << "ProxyTypeRegister t" << node_nested.getId() << "{TypeList<" << proxied_class_type << ">{}};\n"; } def_types << "ProxyClient<" << message_namespace << "::" << node_name << ">::~ProxyClient() { clientDestroy(*this); " << client_destroy.str() << " }\n"; @@ -559,6 +561,7 @@ void Generate(kj::StringPtr src_prefix, cpp_server << "} // namespace mp\n"; cpp_client << def_client.str(); + cpp_client << "namespace {\n" << int_client.str() << "} // namespace\n"; cpp_client << "} // namespace mp\n"; cpp_types << def_types.str();