@@ -49,9 +49,10 @@ void LoggingErrorHandler::taskFailed(kj::Exception&& exception)
4949 m_loop.log () << " Uncaught exception in daemonized task." ;
5050}
5151
52- EventLoopRef::EventLoopRef (EventLoop& loop, std::unique_lock<std::mutex> * lock) : m_loop(&loop), m_lock(lock)
52+ EventLoopRef::EventLoopRef (EventLoop& loop, Lock * lock) : m_loop(&loop), m_lock(lock)
5353{
5454 auto loop_lock{PtrOrValue{m_lock, m_loop->m_mutex }};
55+ loop_lock->assert_locked (m_loop->m_mutex );
5556 m_loop->m_num_clients += 1 ;
5657}
5758
@@ -61,9 +62,10 @@ bool EventLoopRef::reset()
6162 if (auto * loop{m_loop}) {
6263 m_loop = nullptr ;
6364 auto loop_lock{PtrOrValue{m_lock, loop->m_mutex }};
65+ loop_lock->assert_locked (loop->m_mutex );
6466 assert (loop->m_num_clients > 0 );
6567 loop->m_num_clients -= 1 ;
66- if (loop->done (*loop_lock )) {
68+ if (loop->done ()) {
6769 done = true ;
6870 loop->m_cv .notify_all ();
6971 int post_fd{loop->m_post_fd };
@@ -134,17 +136,17 @@ Connection::~Connection()
134136 m_sync_cleanup_fns.pop_front ();
135137 }
136138 while (!m_async_cleanup_fns.empty ()) {
137- const std::unique_lock<std::mutex> lock (m_loop->m_mutex );
139+ const Lock lock (m_loop->m_mutex );
138140 m_loop->m_async_fns .emplace_back (std::move (m_async_cleanup_fns.front ()));
139141 m_async_cleanup_fns.pop_front ();
140142 }
141- std::unique_lock<std::mutex> lock (m_loop->m_mutex );
142- m_loop->startAsyncThread (lock );
143+ Lock lock (m_loop->m_mutex );
144+ m_loop->startAsyncThread ();
143145}
144146
145147CleanupIt Connection::addSyncCleanup (std::function<void ()> fn)
146148{
147- const std::unique_lock<std::mutex> lock (m_loop->m_mutex );
149+ const Lock lock (m_loop->m_mutex );
148150 // Add cleanup callbacks to the front of list, so sync cleanup functions run
149151 // in LIFO order. This is a good approach because sync cleanup functions are
150152 // added as client objects are created, and it is natural to clean up
@@ -158,13 +160,13 @@ CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
158160
159161void Connection::removeSyncCleanup (CleanupIt it)
160162{
161- const std::unique_lock<std::mutex> lock (m_loop->m_mutex );
163+ const Lock lock (m_loop->m_mutex );
162164 m_sync_cleanup_fns.erase (it);
163165}
164166
165167void Connection::addAsyncCleanup (std::function<void ()> fn)
166168{
167- const std::unique_lock<std::mutex> lock (m_loop->m_mutex );
169+ const Lock lock (m_loop->m_mutex );
168170 // Add async cleanup callbacks to the back of the list. Unlike the sync
169171 // cleanup list, this list order is more significant because it determines
170172 // the order server objects are destroyed when there is a sudden disconnect,
@@ -200,7 +202,7 @@ EventLoop::EventLoop(const char* exe_name, LogFn log_fn, void* context)
200202EventLoop::~EventLoop ()
201203{
202204 if (m_async_thread.joinable ()) m_async_thread.join ();
203- const std::lock_guard<std::mutex> lock (m_mutex);
205+ const Lock lock (m_mutex);
204206 KJ_ASSERT (m_post_fn == nullptr );
205207 KJ_ASSERT (m_async_fns.empty ());
206208 KJ_ASSERT (m_wait_fd == -1 );
@@ -225,12 +227,12 @@ void EventLoop::loop()
225227 for (;;) {
226228 const size_t read_bytes = wait_stream->read (&buffer, 0 , 1 ).wait (m_io_context.waitScope );
227229 if (read_bytes != 1 ) throw std::logic_error (" EventLoop wait_stream closed unexpectedly" );
228- std::unique_lock<std::mutex> lock (m_mutex);
230+ Lock lock (m_mutex);
229231 if (m_post_fn) {
230232 Unlock (lock, *m_post_fn);
231233 m_post_fn = nullptr ;
232234 m_cv.notify_all ();
233- } else if (done (lock )) {
235+ } else if (done ()) {
234236 // Intentionally do not break if m_post_fn was set, even if done()
235237 // would return true, to ensure that the EventLoopRef write(post_fd)
236238 // call always succeeds and the loop does not exit between the time
@@ -243,7 +245,7 @@ void EventLoop::loop()
243245 log () << " EventLoop::loop bye." ;
244246 wait_stream = nullptr ;
245247 KJ_SYSCALL (::close (post_fd));
246- const std::unique_lock<std::mutex> lock (m_mutex);
248+ const Lock lock (m_mutex);
247249 m_wait_fd = -1 ;
248250 m_post_fd = -1 ;
249251}
@@ -254,27 +256,27 @@ void EventLoop::post(kj::Function<void()> fn)
254256 fn ();
255257 return ;
256258 }
257- std::unique_lock<std::mutex> lock (m_mutex);
259+ Lock lock (m_mutex);
258260 EventLoopRef ref (*this , &lock);
259- m_cv.wait (lock, [this ] { return m_post_fn == nullptr ; });
261+ m_cv.wait (lock. m_lock , [this ]() MP_REQUIRES (m_mutex) { return m_post_fn == nullptr ; });
260262 m_post_fn = &fn;
261263 int post_fd{m_post_fd};
262264 Unlock (lock, [&] {
263265 char buffer = 0 ;
264266 KJ_SYSCALL (write (post_fd, &buffer, 1 ));
265267 });
266- m_cv.wait (lock, [this , &fn] { return m_post_fn != &fn; });
268+ m_cv.wait (lock. m_lock , [this , &fn]() MP_REQUIRES (m_mutex) { return m_post_fn != &fn; });
267269}
268270
269- void EventLoop::startAsyncThread (std::unique_lock<std::mutex>& lock )
271+ void EventLoop::startAsyncThread ()
270272{
271273 if (m_async_thread.joinable ()) {
272274 // Notify to wake up the async thread if it is already running.
273275 m_cv.notify_all ();
274276 } else if (!m_async_fns.empty ()) {
275277 m_async_thread = std::thread ([this ] {
276- std::unique_lock<std::mutex> lock (m_mutex);
277- while (!done (lock )) {
278+ Lock lock (m_mutex);
279+ while (!done ()) {
278280 if (!m_async_fns.empty ()) {
279281 EventLoopRef ref{*this , &lock};
280282 const std::function<void ()> fn = std::move (m_async_fns.front ());
@@ -289,17 +291,15 @@ void EventLoop::startAsyncThread(std::unique_lock<std::mutex>& lock)
289291 // Continue without waiting in case there are more async_fns
290292 continue ;
291293 }
292- m_cv.wait (lock);
294+ m_cv.wait (lock. m_lock );
293295 }
294296 });
295297 }
296298}
297299
298- bool EventLoop::done (std::unique_lock<std::mutex>& lock ) const
300+ bool EventLoop::done () const
299301{
300302 assert (m_num_clients >= 0 );
301- assert (lock.owns_lock ());
302- assert (lock.mutex () == &m_mutex);
303303 return m_num_clients == 0 && m_async_fns.empty ();
304304}
305305
0 commit comments