Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions example/calculator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <charconv>
#include <cstring>
#include <fstream>
#include <functional>
#include <iostream>
#include <kj/async.h>
#include <kj/common.h>
Expand Down Expand Up @@ -37,6 +38,7 @@ class InitImpl : public Init
}
};

// Exercises deprecated log callback signature
static void LogPrint(bool raise, const std::string& message)
{
if (raise) throw std::runtime_error(message);
Expand Down
6 changes: 3 additions & 3 deletions example/example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ static auto Spawn(mp::EventLoop& loop, const std::string& process_argv0, const s
return std::make_tuple(mp::ConnectStream<InitInterface>(loop, fd), pid);
}

static void LogPrint(bool raise, const std::string& message)
static void LogPrint(mp::LogMessage log_data)
{
if (raise) throw std::runtime_error(message);
std::ofstream("debug.log", std::ios_base::app) << message << std::endl;
if (log_data.level == mp::Log::Raise) throw std::runtime_error(log_data.message);
std::ofstream("debug.log", std::ios_base::app) << log_data.message << std::endl;
}

int main(int argc, char** argv)
Expand Down
6 changes: 3 additions & 3 deletions example/printer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ class InitImpl : public Init
std::unique_ptr<Printer> makePrinter() override { return std::make_unique<PrinterImpl>(); }
};

static void LogPrint(bool raise, const std::string& message)
static void LogPrint(mp::LogMessage log_data)
{
if (raise) throw std::runtime_error(message);
std::ofstream("debug.log", std::ios_base::app) << message << std::endl;
if (log_data.level == mp::Log::Raise) throw std::runtime_error(log_data.message);
std::ofstream("debug.log", std::ios_base::app) << log_data.message << std::endl;
}

int main(int argc, char** argv)
Expand Down
100 changes: 72 additions & 28 deletions include/mp/proxy-io.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,22 +98,59 @@ class LoggingErrorHandler : public kj::TaskSet::ErrorHandler
EventLoop& m_loop;
};

using LogFn = std::function<void(bool raise, std::string message)>;
enum class Log {
Trace = 0,
Debug,
Info,
Warning,
Error,
Raise,
};

struct LogMessage {

//! Message to be logged
std::string message;

//! The severity level of this message
Log level;
};

using LogFn = std::function<void(LogMessage)>;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to pass LogMessage by value? It's 40 bytes here, while I think the normal advice is max 2 pointers worth for objects passed by value.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const LogMessage& could make sense semantically. Only disadvantage might be that if log handler wanted to move the string or message into another data structure, it would no longer be able to do that, and would need to copy it instead. Probably a log handler would not do that, but it's not inconceivable. Maybe LogMessage&& would make more sense?

I'm guessing choice doesn't matter too much since cost of moving the struct should be small compared to cost of invoking the function through a pointer and actually formatting the log message. And it should be possible to change the type without breaking compatibility later, so anything here seems fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only disadvantage might be that if log handler wanted to move the string or message into another data structure, it would no longer be able to do that, and would need to copy it instead. Probably a log handler would not do that, but it's not inconceivable. Maybe LogMessage&& would make more sense?

This was my thinking as well. And since libmultiprocess always calls this function from Logger's dtor (just before the string is erased), it will always be move constructed. LogMessage&& would be fine too, but I don't see much upside there, and tend to reserve that for universal references unless there's some meaningful difference.


struct LogOptions {

//! External logging callback.
LogFn log_fn;

//! Maximum number of characters to use when representing
//! request and response structs as strings.
size_t max_chars{200};

//! Messages with a severity level less than log_level will not be
//! reported.
Log log_level{Log::Trace};
};

class Logger
{
public:
Logger(bool raise, LogFn& fn) : m_raise(raise), m_fn(fn) {}
Logger(Logger&& logger) : m_raise(logger.m_raise), m_fn(logger.m_fn), m_buffer(std::move(logger.m_buffer)) {}
Logger(const LogOptions& options, Log log_level) : m_options(options), m_log_level(log_level) {}

Logger(Logger&&) = delete;
Logger& operator=(Logger&&) = delete;
Logger(const Logger&) = delete;
Logger& operator=(const Logger&) = delete;

~Logger() noexcept(false)
{
if (m_fn) m_fn(m_raise, m_buffer.str());
if (enabled()) m_options.log_fn({std::move(m_buffer).str(), m_log_level});
}

template <typename T>
friend Logger& operator<<(Logger& logger, T&& value)
{
if (logger.m_fn) logger.m_buffer << std::forward<T>(value);
if (logger.enabled()) logger.m_buffer << std::forward<T>(value);
return logger;
}

Expand All @@ -123,20 +160,25 @@ class Logger
return logger << std::forward<T>(value);
}

bool m_raise;
LogFn& m_fn;
explicit operator bool() const
{
return enabled();
}

private:
bool enabled() const
{
return m_options.log_fn && m_log_level >= m_options.log_level;
}

const LogOptions& m_options;
Log m_log_level;
std::ostringstream m_buffer;
};

struct LogOptions {
#define MP_LOGPLAIN(loop, ...) if (mp::Logger logger{(loop).m_log_opts, __VA_ARGS__}; logger) logger

//! External logging callback.
LogFn log_fn;

//! Maximum number of characters to use when representing
//! request and response structs as strings.
size_t max_chars{200};
};
#define MP_LOG(loop, ...) MP_LOGPLAIN(loop, __VA_ARGS__) << "{" << LongThreadName((loop).m_exe_name) << "} "

std::string LongThreadName(const char* exe_name);

Expand Down Expand Up @@ -168,8 +210,19 @@ std::string LongThreadName(const char* exe_name);
class EventLoop
{
public:
//! Construct event loop object.
EventLoop(const char* exe_name, LogFn log_fn, void* context = nullptr);
//! Construct event loop object with default logging options.
EventLoop(const char* exe_name, LogFn log_fn, void* context = nullptr)
: EventLoop(exe_name, LogOptions{std::move(log_fn)}, context){}

//! Construct event loop object with specified logging options.
EventLoop(const char* exe_name, LogOptions log_opts, void* context = nullptr);

//! Backwards-compatible constructor for previous (deprecated) logging callback signature
EventLoop(const char* exe_name, std::function<void(bool, std::string)> old_callback, void* context = nullptr)
: EventLoop(exe_name,
LogFn{[old_callback = std::move(old_callback)](LogMessage log_data) {old_callback(log_data.level == Log::Raise, std::move(log_data.message));}},
context){}

~EventLoop();

//! Run event loop. Does not return until shutdown. This should only be
Expand Down Expand Up @@ -210,15 +263,6 @@ class EventLoop
//! Check if loop should exit.
bool done() const MP_REQUIRES(m_mutex);

Logger log()
{
Logger logger(false, m_log_opts.log_fn);
logger << "{" << LongThreadName(m_exe_name) << "} ";
return logger;
}
Logger logPlain() { return {false, m_log_opts.log_fn}; }
Logger raise() { return {true, m_log_opts.log_fn}; }

//! Process name included in thread names so combined debug output from
//! multiple processes is easier to understand.
const char* m_exe_name;
Expand Down Expand Up @@ -643,7 +687,7 @@ std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int f
init_client = connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<InitInterface>();
Connection* connection_ptr = connection.get();
connection->onDisconnect([&loop, connection_ptr] {
loop.log() << "IPC client: unexpected network disconnect.";
MP_LOG(loop, Log::Warning) << "IPC client: unexpected network disconnect.";
delete connection_ptr;
});
});
Expand All @@ -666,7 +710,7 @@ void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init
});
auto it = loop.m_incoming_connections.begin();
it->onDisconnect([&loop, it] {
loop.log() << "IPC server: socket disconnected.";
MP_LOG(loop, Log::Info) << "IPC server: socket disconnected.";
loop.m_incoming_connections.erase(it);
});
}
Expand Down
39 changes: 23 additions & 16 deletions include/mp/proxy-types.h
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ template <typename Client>
void clientDestroy(Client& client)
{
if (client.m_context.connection) {
client.m_context.loop->log() << "IPC client destroy " << typeid(client).name();
MP_LOG(*client.m_context.loop, Log::Info) << "IPC client destroy " << typeid(client).name();
} else {
KJ_LOG(INFO, "IPC interrupted client destroy", typeid(client).name());
}
Expand All @@ -577,7 +577,7 @@ void clientDestroy(Client& client)
template <typename Server>
void serverDestroy(Server& server)
{
server.m_context.loop->log() << "IPC server destroy " << typeid(server).name();
MP_LOG(*server.m_context.loop, Log::Info) << "IPC server destroy " << typeid(server).name();
}

//! Entry point called by generated client code that looks like:
Expand Down Expand Up @@ -605,7 +605,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
// declaration so the server method runs in a dedicated thread.
assert(!g_thread_context.loop_thread);
g_thread_context.waiter = std::make_unique<Waiter>();
proxy_client.m_context.loop->logPlain()
MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Info)
<< "{" << g_thread_context.thread_name
<< "} IPC client first request from current thread, constructing waiter";
}
Expand All @@ -629,15 +629,19 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
using FieldList = typename ProxyClientMethodTraits<typename Request::Params>::Fields;
invoke_context.emplace(*proxy_client.m_context.connection, thread_context);
IterateFields().handleChain(*invoke_context, request, FieldList(), typename FieldObjs::BuildParams{&fields}...);
proxy_client.m_context.loop->logPlain()
MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Debug)
<< "{" << thread_context.thread_name << "} IPC client send "
<< TypeName<typename Request::Params>() << " " << LogEscape(request.toString(), proxy_client.m_context.loop->m_log_opts.max_chars);
<< TypeName<typename Request::Params>();
MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Trace)
<< "send data: " << LogEscape(request.toString(), proxy_client.m_context.loop->m_log_opts.max_chars);

proxy_client.m_context.loop->m_task_set->add(request.send().then(
[&](::capnp::Response<typename Request::Results>&& response) {
proxy_client.m_context.loop->logPlain()
MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Debug)
<< "{" << thread_context.thread_name << "} IPC client recv "
<< TypeName<typename Request::Results>() << " " << LogEscape(response.toString(), proxy_client.m_context.loop->m_log_opts.max_chars);
<< TypeName<typename Request::Results>();
MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Trace)
<< "recv data: " << LogEscape(response.toString(), proxy_client.m_context.loop->m_log_opts.max_chars);
try {
IterateFields().handleChain(
*invoke_context, response, FieldList(), typename FieldObjs::ReadResults{&fields}...);
Expand All @@ -653,7 +657,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
disconnected = "IPC client method call interrupted by disconnect.";
} else {
kj_exception = kj::str("kj::Exception: ", e).cStr();
proxy_client.m_context.loop->logPlain()
MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Info)
<< "{" << thread_context.thread_name << "} IPC client exception " << kj_exception;
}
const Lock lock(thread_context.waiter->m_mutex);
Expand All @@ -665,8 +669,8 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
Lock lock(thread_context.waiter->m_mutex);
thread_context.waiter->wait(lock, [&done]() { return done; });
if (exception) std::rethrow_exception(exception);
if (!kj_exception.empty()) proxy_client.m_context.loop->raise() << kj_exception;
if (disconnected) proxy_client.m_context.loop->raise() << disconnected;
if (!kj_exception.empty()) MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Raise) << kj_exception;
if (disconnected) MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Raise) << disconnected;
}

//! Invoke callable `fn()` that may return void. If it does return void, replace
Expand Down Expand Up @@ -700,8 +704,10 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
using Results = typename decltype(call_context.getResults())::Builds;

int req = ++server_reqs;
server.m_context.loop->log() << "IPC server recv request #" << req << " "
<< TypeName<typename Params::Reads>() << " " << LogEscape(params.toString(), server.m_context.loop->m_log_opts.max_chars);
MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server recv request #" << req << " "
<< TypeName<typename Params::Reads>();
MP_LOG(*server.m_context.loop, Log::Trace) << "request data: "
<< LogEscape(params.toString(), server.m_context.loop->m_log_opts.max_chars);

try {
using ServerContext = ServerInvokeContext<Server, CallContext>;
Expand All @@ -717,14 +723,15 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
return ReplaceVoid([&]() { return fn.invoke(server_context, ArgList()); },
[&]() { return kj::Promise<CallContext>(kj::mv(call_context)); })
.then([&server, req](CallContext call_context) {
server.m_context.loop->log() << "IPC server send response #" << req << " " << TypeName<Results>()
<< " " << LogEscape(call_context.getResults().toString(), server.m_context.loop->m_log_opts.max_chars);
MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server send response #" << req << " " << TypeName<Results>();
MP_LOG(*server.m_context.loop, Log::Trace) << "response data: "
<< LogEscape(call_context.getResults().toString(), server.m_context.loop->m_log_opts.max_chars);
});
} catch (const std::exception& e) {
server.m_context.loop->log() << "IPC server unhandled exception: " << e.what();
MP_LOG(*server.m_context.loop, Log::Error) << "IPC server unhandled exception: " << e.what();
throw;
} catch (...) {
server.m_context.loop->log() << "IPC server unhandled exception";
MP_LOG(*server.m_context.loop, Log::Error) << "IPC server unhandled exception";
throw;
}
}
Expand Down
6 changes: 3 additions & 3 deletions include/mp/type-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,16 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
// thread.
KJ_IF_MAYBE (thread_server, perhaps) {
const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
server.m_context.loop->log()
MP_LOG(*server.m_context.loop, Log::Debug)
<< "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
if (!thread.m_thread_context.waiter->post(std::move(invoke))) {
server.m_context.loop->log()
MP_LOG(*server.m_context.loop, Log::Error)
<< "IPC server error request #" << req
<< " {" << thread.m_thread_context.thread_name << "}" << ", thread busy";
throw std::runtime_error("thread busy");
}
} else {
server.m_context.loop->log()
MP_LOG(*server.m_context.loop, Log::Error)
<< "IPC server error request #" << req << ", missing thread to execute request";
throw std::runtime_error("invalid thread handle");
}
Expand Down
10 changes: 5 additions & 5 deletions src/mp/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ thread_local ThreadContext g_thread_context;
void LoggingErrorHandler::taskFailed(kj::Exception&& exception)
{
KJ_LOG(ERROR, "Uncaught exception in daemonized task.", exception);
m_loop.log() << "Uncaught exception in daemonized task.";
MP_LOG(m_loop, Log::Error) << "Uncaught exception in daemonized task.";
}

EventLoopRef::EventLoopRef(EventLoop& loop, Lock* lock) : m_loop(&loop), m_lock(lock)
Expand Down Expand Up @@ -191,13 +191,13 @@ void EventLoop::addAsyncCleanup(std::function<void()> fn)
startAsyncThread();
}

EventLoop::EventLoop(const char* exe_name, LogFn log_fn, void* context)
EventLoop::EventLoop(const char* exe_name, LogOptions log_opts, void* context)
: m_exe_name(exe_name),
m_io_context(kj::setupAsyncIo()),
m_task_set(new kj::TaskSet(m_error_handler)),
m_log_opts(std::move(log_opts)),
m_context(context)
{
m_log_opts.log_fn = log_fn;
int fds[2];
KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
m_wait_fd = fds[0];
Expand Down Expand Up @@ -251,9 +251,9 @@ void EventLoop::loop()
break;
}
}
log() << "EventLoop::loop done, cancelling event listeners.";
MP_LOG(*this, Log::Info) << "EventLoop::loop done, cancelling event listeners.";
m_task_set.reset();
log() << "EventLoop::loop bye.";
MP_LOG(*this, Log::Info) << "EventLoop::loop bye.";
wait_stream = nullptr;
KJ_SYSCALL(::close(post_fd));
const Lock lock(m_mutex);
Expand Down
6 changes: 3 additions & 3 deletions test/mp/test/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ class TestSetup

TestSetup(bool client_owns_connection = true)
: thread{[&] {
EventLoop loop("mptest", [](bool raise, const std::string& log) {
std::cout << "LOG" << raise << ": " << log << "\n";
if (raise) throw std::runtime_error(log);
EventLoop loop("mptest", [](mp::LogMessage log_data) {
std::cout << "LOG" << (int)log_data.level << ": " << log_data.message << "\n";
if (log_data.level == mp::Log::Raise) throw std::runtime_error(log_data.message);
});
auto pipe = loop.m_io_context.provider->newTwoWayPipe();

Expand Down
Loading