diff --git a/example/calculator.cpp b/example/calculator.cpp index 016a0486..86ce388b 100644 --- a/example/calculator.cpp +++ b/example/calculator.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -37,6 +38,7 @@ class InitImpl : public Init } }; +// Exercises deprecated log callback signature static void LogPrint(bool raise, const std::string& message) { if (raise) throw std::runtime_error(message); diff --git a/example/example.cpp b/example/example.cpp index 5088f796..38313977 100644 --- a/example/example.cpp +++ b/example/example.cpp @@ -35,10 +35,10 @@ static auto Spawn(mp::EventLoop& loop, const std::string& process_argv0, const s return std::make_tuple(mp::ConnectStream(loop, fd), pid); } -static void LogPrint(bool raise, const std::string& message) +static void LogPrint(mp::LogMessage log_data) { - if (raise) throw std::runtime_error(message); - std::ofstream("debug.log", std::ios_base::app) << message << std::endl; + if (log_data.level == mp::Log::Raise) throw std::runtime_error(log_data.message); + std::ofstream("debug.log", std::ios_base::app) << log_data.message << std::endl; } int main(int argc, char** argv) diff --git a/example/printer.cpp b/example/printer.cpp index eb384018..9150d59b 100644 --- a/example/printer.cpp +++ b/example/printer.cpp @@ -32,10 +32,10 @@ class InitImpl : public Init std::unique_ptr makePrinter() override { return std::make_unique(); } }; -static void LogPrint(bool raise, const std::string& message) +static void LogPrint(mp::LogMessage log_data) { - if (raise) throw std::runtime_error(message); - std::ofstream("debug.log", std::ios_base::app) << message << std::endl; + if (log_data.level == mp::Log::Raise) throw std::runtime_error(log_data.message); + std::ofstream("debug.log", std::ios_base::app) << log_data.message << std::endl; } int main(int argc, char** argv) diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index 9be80625..0abef89d 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -98,22 +98,59 @@ class LoggingErrorHandler : public kj::TaskSet::ErrorHandler EventLoop& m_loop; }; -using LogFn = std::function; +enum class Log { + Trace = 0, + Debug, + Info, + Warning, + Error, + Raise, +}; + +struct LogMessage { + + //! Message to be logged + std::string message; + + //! The severity level of this message + Log level; +}; + +using LogFn = std::function; + +struct LogOptions { + + //! External logging callback. + LogFn log_fn; + + //! Maximum number of characters to use when representing + //! request and response structs as strings. + size_t max_chars{200}; + + //! Messages with a severity level less than log_level will not be + //! reported. + Log log_level{Log::Trace}; +}; class Logger { public: - Logger(bool raise, LogFn& fn) : m_raise(raise), m_fn(fn) {} - Logger(Logger&& logger) : m_raise(logger.m_raise), m_fn(logger.m_fn), m_buffer(std::move(logger.m_buffer)) {} + Logger(const LogOptions& options, Log log_level) : m_options(options), m_log_level(log_level) {} + + Logger(Logger&&) = delete; + Logger& operator=(Logger&&) = delete; + Logger(const Logger&) = delete; + Logger& operator=(const Logger&) = delete; + ~Logger() noexcept(false) { - if (m_fn) m_fn(m_raise, m_buffer.str()); + if (enabled()) m_options.log_fn({std::move(m_buffer).str(), m_log_level}); } template friend Logger& operator<<(Logger& logger, T&& value) { - if (logger.m_fn) logger.m_buffer << std::forward(value); + if (logger.enabled()) logger.m_buffer << std::forward(value); return logger; } @@ -123,20 +160,25 @@ class Logger return logger << std::forward(value); } - bool m_raise; - LogFn& m_fn; + explicit operator bool() const + { + return enabled(); + } + +private: + bool enabled() const + { + return m_options.log_fn && m_log_level >= m_options.log_level; + } + + const LogOptions& m_options; + Log m_log_level; std::ostringstream m_buffer; }; -struct LogOptions { +#define MP_LOGPLAIN(loop, ...) if (mp::Logger logger{(loop).m_log_opts, __VA_ARGS__}; logger) logger - //! External logging callback. - LogFn log_fn; - - //! Maximum number of characters to use when representing - //! request and response structs as strings. - size_t max_chars{200}; -}; +#define MP_LOG(loop, ...) MP_LOGPLAIN(loop, __VA_ARGS__) << "{" << LongThreadName((loop).m_exe_name) << "} " std::string LongThreadName(const char* exe_name); @@ -168,8 +210,19 @@ std::string LongThreadName(const char* exe_name); class EventLoop { public: - //! Construct event loop object. - EventLoop(const char* exe_name, LogFn log_fn, void* context = nullptr); + //! Construct event loop object with default logging options. + EventLoop(const char* exe_name, LogFn log_fn, void* context = nullptr) + : EventLoop(exe_name, LogOptions{std::move(log_fn)}, context){} + + //! Construct event loop object with specified logging options. + EventLoop(const char* exe_name, LogOptions log_opts, void* context = nullptr); + + //! Backwards-compatible constructor for previous (deprecated) logging callback signature + EventLoop(const char* exe_name, std::function old_callback, void* context = nullptr) + : EventLoop(exe_name, + LogFn{[old_callback = std::move(old_callback)](LogMessage log_data) {old_callback(log_data.level == Log::Raise, std::move(log_data.message));}}, + context){} + ~EventLoop(); //! Run event loop. Does not return until shutdown. This should only be @@ -210,15 +263,6 @@ class EventLoop //! Check if loop should exit. bool done() const MP_REQUIRES(m_mutex); - Logger log() - { - Logger logger(false, m_log_opts.log_fn); - logger << "{" << LongThreadName(m_exe_name) << "} "; - return logger; - } - Logger logPlain() { return {false, m_log_opts.log_fn}; } - Logger raise() { return {true, m_log_opts.log_fn}; } - //! Process name included in thread names so combined debug output from //! multiple processes is easier to understand. const char* m_exe_name; @@ -643,7 +687,7 @@ std::unique_ptr> ConnectStream(EventLoop& loop, int f init_client = connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs(); Connection* connection_ptr = connection.get(); connection->onDisconnect([&loop, connection_ptr] { - loop.log() << "IPC client: unexpected network disconnect."; + MP_LOG(loop, Log::Warning) << "IPC client: unexpected network disconnect."; delete connection_ptr; }); }); @@ -666,7 +710,7 @@ void _Serve(EventLoop& loop, kj::Own&& stream, InitImpl& init }); auto it = loop.m_incoming_connections.begin(); it->onDisconnect([&loop, it] { - loop.log() << "IPC server: socket disconnected."; + MP_LOG(loop, Log::Info) << "IPC server: socket disconnected."; loop.m_incoming_connections.erase(it); }); } diff --git a/include/mp/proxy-types.h b/include/mp/proxy-types.h index 222defd9..22468b15 100644 --- a/include/mp/proxy-types.h +++ b/include/mp/proxy-types.h @@ -568,7 +568,7 @@ template void clientDestroy(Client& client) { if (client.m_context.connection) { - client.m_context.loop->log() << "IPC client destroy " << typeid(client).name(); + MP_LOG(*client.m_context.loop, Log::Info) << "IPC client destroy " << typeid(client).name(); } else { KJ_LOG(INFO, "IPC interrupted client destroy", typeid(client).name()); } @@ -577,7 +577,7 @@ void clientDestroy(Client& client) template void serverDestroy(Server& server) { - server.m_context.loop->log() << "IPC server destroy " << typeid(server).name(); + MP_LOG(*server.m_context.loop, Log::Info) << "IPC server destroy " << typeid(server).name(); } //! Entry point called by generated client code that looks like: @@ -605,7 +605,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel // declaration so the server method runs in a dedicated thread. assert(!g_thread_context.loop_thread); g_thread_context.waiter = std::make_unique(); - proxy_client.m_context.loop->logPlain() + MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Info) << "{" << g_thread_context.thread_name << "} IPC client first request from current thread, constructing waiter"; } @@ -629,15 +629,19 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel using FieldList = typename ProxyClientMethodTraits::Fields; invoke_context.emplace(*proxy_client.m_context.connection, thread_context); IterateFields().handleChain(*invoke_context, request, FieldList(), typename FieldObjs::BuildParams{&fields}...); - proxy_client.m_context.loop->logPlain() + MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Debug) << "{" << thread_context.thread_name << "} IPC client send " - << TypeName() << " " << LogEscape(request.toString(), proxy_client.m_context.loop->m_log_opts.max_chars); + << TypeName(); + MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Trace) + << "send data: " << LogEscape(request.toString(), proxy_client.m_context.loop->m_log_opts.max_chars); proxy_client.m_context.loop->m_task_set->add(request.send().then( [&](::capnp::Response&& response) { - proxy_client.m_context.loop->logPlain() + MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Debug) << "{" << thread_context.thread_name << "} IPC client recv " - << TypeName() << " " << LogEscape(response.toString(), proxy_client.m_context.loop->m_log_opts.max_chars); + << TypeName(); + MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Trace) + << "recv data: " << LogEscape(response.toString(), proxy_client.m_context.loop->m_log_opts.max_chars); try { IterateFields().handleChain( *invoke_context, response, FieldList(), typename FieldObjs::ReadResults{&fields}...); @@ -653,7 +657,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel disconnected = "IPC client method call interrupted by disconnect."; } else { kj_exception = kj::str("kj::Exception: ", e).cStr(); - proxy_client.m_context.loop->logPlain() + MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Info) << "{" << thread_context.thread_name << "} IPC client exception " << kj_exception; } const Lock lock(thread_context.waiter->m_mutex); @@ -665,8 +669,8 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel Lock lock(thread_context.waiter->m_mutex); thread_context.waiter->wait(lock, [&done]() { return done; }); if (exception) std::rethrow_exception(exception); - if (!kj_exception.empty()) proxy_client.m_context.loop->raise() << kj_exception; - if (disconnected) proxy_client.m_context.loop->raise() << disconnected; + if (!kj_exception.empty()) MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Raise) << kj_exception; + if (disconnected) MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Raise) << disconnected; } //! Invoke callable `fn()` that may return void. If it does return void, replace @@ -700,8 +704,10 @@ kj::Promise serverInvoke(Server& server, CallContext& call_context, Fn fn) using Results = typename decltype(call_context.getResults())::Builds; int req = ++server_reqs; - server.m_context.loop->log() << "IPC server recv request #" << req << " " - << TypeName() << " " << LogEscape(params.toString(), server.m_context.loop->m_log_opts.max_chars); + MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server recv request #" << req << " " + << TypeName(); + MP_LOG(*server.m_context.loop, Log::Trace) << "request data: " + << LogEscape(params.toString(), server.m_context.loop->m_log_opts.max_chars); try { using ServerContext = ServerInvokeContext; @@ -717,14 +723,15 @@ kj::Promise serverInvoke(Server& server, CallContext& call_context, Fn fn) return ReplaceVoid([&]() { return fn.invoke(server_context, ArgList()); }, [&]() { return kj::Promise(kj::mv(call_context)); }) .then([&server, req](CallContext call_context) { - server.m_context.loop->log() << "IPC server send response #" << req << " " << TypeName() - << " " << LogEscape(call_context.getResults().toString(), server.m_context.loop->m_log_opts.max_chars); + MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server send response #" << req << " " << TypeName(); + MP_LOG(*server.m_context.loop, Log::Trace) << "response data: " + << LogEscape(call_context.getResults().toString(), server.m_context.loop->m_log_opts.max_chars); }); } catch (const std::exception& e) { - server.m_context.loop->log() << "IPC server unhandled exception: " << e.what(); + MP_LOG(*server.m_context.loop, Log::Error) << "IPC server unhandled exception: " << e.what(); throw; } catch (...) { - server.m_context.loop->log() << "IPC server unhandled exception"; + MP_LOG(*server.m_context.loop, Log::Error) << "IPC server unhandled exception"; throw; } } diff --git a/include/mp/type-context.h b/include/mp/type-context.h index 9bd939e4..09ac1790 100644 --- a/include/mp/type-context.h +++ b/include/mp/type-context.h @@ -150,16 +150,16 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& // thread. KJ_IF_MAYBE (thread_server, perhaps) { const auto& thread = static_cast&>(*thread_server); - server.m_context.loop->log() + MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}"; if (!thread.m_thread_context.waiter->post(std::move(invoke))) { - server.m_context.loop->log() + MP_LOG(*server.m_context.loop, Log::Error) << "IPC server error request #" << req << " {" << thread.m_thread_context.thread_name << "}" << ", thread busy"; throw std::runtime_error("thread busy"); } } else { - server.m_context.loop->log() + MP_LOG(*server.m_context.loop, Log::Error) << "IPC server error request #" << req << ", missing thread to execute request"; throw std::runtime_error("invalid thread handle"); } diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index 06825c99..95da96b5 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -42,7 +42,7 @@ thread_local ThreadContext g_thread_context; void LoggingErrorHandler::taskFailed(kj::Exception&& exception) { KJ_LOG(ERROR, "Uncaught exception in daemonized task.", exception); - m_loop.log() << "Uncaught exception in daemonized task."; + MP_LOG(m_loop, Log::Error) << "Uncaught exception in daemonized task."; } EventLoopRef::EventLoopRef(EventLoop& loop, Lock* lock) : m_loop(&loop), m_lock(lock) @@ -191,13 +191,13 @@ void EventLoop::addAsyncCleanup(std::function fn) startAsyncThread(); } -EventLoop::EventLoop(const char* exe_name, LogFn log_fn, void* context) +EventLoop::EventLoop(const char* exe_name, LogOptions log_opts, void* context) : m_exe_name(exe_name), m_io_context(kj::setupAsyncIo()), m_task_set(new kj::TaskSet(m_error_handler)), + m_log_opts(std::move(log_opts)), m_context(context) { - m_log_opts.log_fn = log_fn; int fds[2]; KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds)); m_wait_fd = fds[0]; @@ -251,9 +251,9 @@ void EventLoop::loop() break; } } - log() << "EventLoop::loop done, cancelling event listeners."; + MP_LOG(*this, Log::Info) << "EventLoop::loop done, cancelling event listeners."; m_task_set.reset(); - log() << "EventLoop::loop bye."; + MP_LOG(*this, Log::Info) << "EventLoop::loop bye."; wait_stream = nullptr; KJ_SYSCALL(::close(post_fd)); const Lock lock(m_mutex); diff --git a/test/mp/test/test.cpp b/test/mp/test/test.cpp index c9f9b51d..79b440f8 100644 --- a/test/mp/test/test.cpp +++ b/test/mp/test/test.cpp @@ -67,9 +67,9 @@ class TestSetup TestSetup(bool client_owns_connection = true) : thread{[&] { - EventLoop loop("mptest", [](bool raise, const std::string& log) { - std::cout << "LOG" << raise << ": " << log << "\n"; - if (raise) throw std::runtime_error(log); + EventLoop loop("mptest", [](mp::LogMessage log_data) { + std::cout << "LOG" << (int)log_data.level << ": " << log_data.message << "\n"; + if (log_data.level == mp::Log::Raise) throw std::runtime_error(log_data.message); }); auto pipe = loop.m_io_context.provider->newTwoWayPipe();