Skip to content

Commit 0e5a67f

Browse files
authored
Bugfix: fixed the following bugs (#227)
- fix the issue that program will cause a core dump at Promise::SetValue rarely when make rpc call from handle or io work thread - fix MQThreadPool memory leak when queue is full - fix the issue where the timeout parameter of HTTP asynchronous streaming ReadFullResponse interface did not take effect - fix the issue that stream length is 0 in HttpServiceProxy streaming request - correct type of timeout from int to uint32_t in http stream to avoid overflow during conversion - make tvar sampler remain valid before and after SamplerCollectorStop/Start
1 parent 0c7b592 commit 0e5a67f

File tree

12 files changed

+184
-135
lines changed

12 files changed

+184
-135
lines changed

trpc/future/future.h

Lines changed: 46 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -531,12 +531,12 @@ class FutureImpl : public FutureImplBase {
531531
state_.callback = new ContinuationWithValue<Func, PromiseType, T...>(std::forward<Func>(func),
532532
std::forward<PromiseType>(promise));
533533
if (executor) state_.callback->SetExecutor(executor);
534-
state_.has_callback = true;
535534

536-
std::lock_guard<std::mutex> lock(state_.mtx);
535+
std::unique_lock<std::mutex> lock(state_.mtx);
536+
state_.has_callback = true;
537537
// Got immediately executed.
538538
if (HasResult()) {
539-
TrySchedule();
539+
TrySchedule(lock);
540540
}
541541
}
542542

@@ -549,12 +549,12 @@ class FutureImpl : public FutureImplBase {
549549
void SetTerminalCallback(Func&& func, Executor* executor) {
550550
state_.callback = new TerminalWithValue<Func, T...>(std::forward<Func>(func));
551551
if (executor) state_.callback->SetExecutor(executor);
552-
state_.has_callback = true;
553552

554-
std::lock_guard<std::mutex> lock(state_.mtx);
553+
std::unique_lock<std::mutex> lock(state_.mtx);
554+
state_.has_callback = true;
555555
// Got immediately executed.
556556
if (HasResult()) {
557-
TrySchedule();
557+
TrySchedule(lock);
558558
}
559559
}
560560

@@ -571,12 +571,12 @@ class FutureImpl : public FutureImplBase {
571571
state_.callback = new ContinuationWithFuture<Func, PromiseType, T...>(std::forward<Func>(func),
572572
std::forward<PromiseType>(promise));
573573
if (executor) state_.callback->SetExecutor(executor);
574-
state_.has_callback = true;
575574

576-
std::lock_guard<std::mutex> lock(state_.mtx);
575+
std::unique_lock<std::mutex> lock(state_.mtx);
576+
state_.has_callback = true;
577577
// Got immediately executed.
578578
if (HasResult()) {
579-
TrySchedule();
579+
TrySchedule(lock);
580580
}
581581
}
582582

@@ -589,12 +589,12 @@ class FutureImpl : public FutureImplBase {
589589
void SetTerminalCallbackWrapped(Func&& func, Executor* executor) {
590590
state_.callback = new TerminalWithFuture<Func, T...>(std::forward<Func>(func));
591591
if (executor) state_.callback->SetExecutor(executor);
592-
state_.has_callback = true;
593592

594-
std::lock_guard<std::mutex> lock(state_.mtx);
593+
std::unique_lock<std::mutex> lock(state_.mtx);
594+
state_.has_callback = true;
595595
// Got immediately executed.
596596
if (HasResult()) {
597-
TrySchedule();
597+
TrySchedule(lock);
598598
}
599599
}
600600

@@ -603,41 +603,36 @@ class FutureImpl : public FutureImplBase {
603603
state_.value = std::move(value);
604604
state_.ready = true;
605605
// Result state may be looped by another thread.
606-
{
607-
std::lock_guard<std::mutex> lock(state_.mtx);
608-
state_.has_result = true;
609-
}
610-
TrySchedule();
606+
std::unique_lock<std::mutex> lock(state_.mtx);
607+
state_.has_result = true;
608+
TrySchedule(lock);
611609
}
612610

613611
/// @brief Exceptional value set through promise.
614612
void SetException(const Exception& e) {
615613
state_.exception = e;
616614
state_.failed = true;
617615
// Result state may be looped by another thread.
618-
{
619-
std::lock_guard<std::mutex> lock(state_.mtx);
620-
state_.has_result = true;
621-
}
622-
TrySchedule();
616+
std::unique_lock<std::mutex> lock(state_.mtx);
617+
state_.has_result = true;
618+
TrySchedule(lock);
623619
}
624620

625621
/// @brief Support non const parameter.
626622
void SetException(Exception&& e) {
627623
state_.exception = std::move(e);
628624
state_.failed = true;
629-
{
630-
std::lock_guard<std::mutex> lock(state_.mtx);
631-
state_.has_result = true;
632-
}
633-
TrySchedule();
625+
std::unique_lock<std::mutex> lock(state_.mtx);
626+
state_.has_result = true;
627+
TrySchedule(lock);
634628
}
635629

636630
private:
637631
/// @brief Future may or may not registered callback yet, check to inspire callback.
638632
/// @note Callback can only inspired once.
639-
void TrySchedule() {
633+
void TrySchedule(std::unique_lock<std::mutex>& lock) {
640634
if (state_.has_callback) {
635+
lock.unlock();
641636
if (IsReady()) {
642637
if (state_.schedule_flag.test_and_set() == false) {
643638
state_.callback->SetValue(GetValue());
@@ -704,11 +699,11 @@ class FutureImpl<T> : public FutureImplBase {
704699
state_.callback =
705700
new ContinuationWithValue<Func, PromiseType, T>(std::forward<Func>(func), std::forward<PromiseType>(promise));
706701
if (executor) state_.callback->SetExecutor(executor);
707-
state_.has_callback = true;
708702

709-
std::lock_guard<std::mutex> lock(state_.mtx);
703+
std::unique_lock<std::mutex> lock(state_.mtx);
704+
state_.has_callback = true;
710705
if (HasResult()) {
711-
TrySchedule();
706+
TrySchedule(lock);
712707
}
713708
}
714709

@@ -717,11 +712,11 @@ class FutureImpl<T> : public FutureImplBase {
717712
void SetTerminalCallback(Func&& func, Executor* executor) {
718713
state_.callback = new TerminalWithValue<Func, T>(std::forward<Func>(func));
719714
if (executor) state_.callback->SetExecutor(executor);
720-
state_.has_callback = true;
721715

722-
std::lock_guard<std::mutex> lock(state_.mtx);
716+
std::unique_lock<std::mutex> lock(state_.mtx);
717+
state_.has_callback = true;
723718
if (HasResult()) {
724-
TrySchedule();
719+
TrySchedule(lock);
725720
}
726721
}
727722

@@ -732,11 +727,11 @@ class FutureImpl<T> : public FutureImplBase {
732727
state_.callback =
733728
new ContinuationWithFuture<Func, PromiseType, T>(std::forward<Func>(func), std::forward<PromiseType>(promise));
734729
if (executor) state_.callback->SetExecutor(executor);
735-
state_.has_callback = true;
736730

737-
std::lock_guard<std::mutex> lock(state_.mtx);
731+
std::unique_lock<std::mutex> lock(state_.mtx);
732+
state_.has_callback = true;
738733
if (HasResult()) {
739-
TrySchedule();
734+
TrySchedule(lock);
740735
}
741736
}
742737

@@ -745,51 +740,46 @@ class FutureImpl<T> : public FutureImplBase {
745740
void SetTerminalCallbackWrapped(Func&& func, Executor* executor) {
746741
state_.callback = new TerminalWithFuture<Func, T>(std::forward<Func>(func));
747742
if (executor) state_.callback->SetExecutor(executor);
748-
state_.has_callback = true;
749743

750-
std::lock_guard<std::mutex> lock(state_.mtx);
744+
std::unique_lock<std::mutex> lock(state_.mtx);
745+
state_.has_callback = true;
751746
if (HasResult()) {
752-
TrySchedule();
747+
TrySchedule(lock);
753748
}
754749
}
755750

756751
/// @brief Same as multiple version.
757752
void SetValue(T&& value) {
758753
state_.value = std::move(value);
759754
state_.ready = true;
760-
{
761-
std::lock_guard<std::mutex> lock(state_.mtx);
762-
state_.has_result = true;
763-
}
764-
TrySchedule();
755+
std::unique_lock<std::mutex> lock(state_.mtx);
756+
state_.has_result = true;
757+
TrySchedule(lock);
765758
}
766759

767760
/// @brief Same as multiple version.
768761
void SetException(const Exception& e) {
769762
state_.exception = e;
770763
state_.failed = true;
771-
{
772-
std::lock_guard<std::mutex> lock(state_.mtx);
773-
state_.has_result = true;
774-
}
775-
TrySchedule();
764+
std::unique_lock<std::mutex> lock(state_.mtx);
765+
state_.has_result = true;
766+
TrySchedule(lock);
776767
}
777768

778769
/// @brief Same as multiple version.
779770
void SetException(Exception&& e) {
780771
state_.exception = std::move(e);
781772
state_.failed = true;
782-
{
783-
std::lock_guard<std::mutex> lock(state_.mtx);
784-
state_.has_result = true;
785-
}
786-
TrySchedule();
773+
std::unique_lock<std::mutex> lock(state_.mtx);
774+
state_.has_result = true;
775+
TrySchedule(lock);
787776
}
788777

789778
private:
790779
/// @brief Same as multiple version.
791-
void TrySchedule() {
780+
void TrySchedule(std::unique_lock<std::mutex>& lock) {
792781
if (state_.has_callback) {
782+
lock.unlock();
793783
if (IsReady()) {
794784
if (state_.schedule_flag.test_and_set() == false) {
795785
state_.callback->SetValue(GetValue());

trpc/future/future_test.cc

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -641,15 +641,13 @@ TEST(Future, ThenCopyableExecutor) {
641641
// Test mltiple threads executor.
642642
TEST(Future, TestExecuteOkInDifferentThread) {
643643
static int exec_count = 0;
644-
int loop_times = 50000;
644+
int loop_times = 500;
645645
for (int i = 0; i < loop_times; i++) {
646646
Promise<int> pr;
647647
auto fut = pr.GetFuture();
648648
std::thread t([pr = std::move(pr)]() mutable {
649-
std::this_thread::sleep_for(std::chrono::nanoseconds(1));
650649
pr.SetValue(1);
651650
});
652-
std::this_thread::sleep_for(std::chrono::nanoseconds(10000));
653651
fut.Then([](int&& val) {
654652
exec_count++;
655653
return MakeReadyFuture<>();

trpc/future/future_utility_test.cc

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,4 +629,24 @@ TEST(BlockingTryGet, test) {
629629
ASSERT_TRUE(TestBlockingTryGet(100, 1000));
630630
}
631631

632+
// Test in the capture promise by reference situation, the promise object is not destroyed during SetValue
633+
TEST(BlockingGet, capture_promise_by_ref) {
634+
constexpr int kMaxLoopTimes = 10000;
635+
std::atomic<int> execute_times = 0;
636+
for (int i = 0; i < kMaxLoopTimes; ++i) {
637+
std::unique_ptr<std::thread> t;
638+
{
639+
trpc::Promise<> pr;
640+
auto fut = pr.GetFuture();
641+
t = std::make_unique<std::thread>([&]() {
642+
pr.SetValue();
643+
execute_times++;
644+
});
645+
future::BlockingGet(std::move(fut));
646+
}
647+
t->join();
648+
}
649+
EXPECT_EQ(execute_times.load(), kMaxLoopTimes);
650+
}
651+
632652
} // namespace trpc

trpc/stream/http/async/stream.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ Future<> HttpAsyncStream::PushSendMessage(HttpStreamFramePtr&& msg) {
5656
return AsyncWrite(std::move(out));
5757
}
5858

59-
Future<http::HttpHeader> HttpAsyncStream::AsyncReadHeader(int timeout) {
59+
Future<http::HttpHeader> HttpAsyncStream::AsyncReadHeader(uint32_t timeout) {
6060
pending_header_.val = Promise<http::HttpHeader>();
6161

6262
auto ft = pending_header_.val.value().GetFuture();
@@ -80,7 +80,7 @@ Future<http::HttpHeader> HttpAsyncStream::AsyncReadHeader(int timeout) {
8080
return ft;
8181
}
8282

83-
Future<NoncontiguousBuffer> HttpAsyncStream::AsyncReadChunk(int timeout) {
83+
Future<NoncontiguousBuffer> HttpAsyncStream::AsyncReadChunk(uint32_t timeout) {
8484
// it can read only in chunked mode
8585
if (read_mode_ != DataMode::kChunked) {
8686
Status status{TRPC_STREAM_UNKNOWN_ERR, 0, "Can't read no chunk data"};
@@ -90,15 +90,15 @@ Future<NoncontiguousBuffer> HttpAsyncStream::AsyncReadChunk(int timeout) {
9090
return AsyncReadInner(ReadOperation::kReadChunk, 0, timeout);
9191
}
9292

93-
Future<NoncontiguousBuffer> HttpAsyncStream::AsyncReadAtMost(uint64_t len, int timeout) {
93+
Future<NoncontiguousBuffer> HttpAsyncStream::AsyncReadAtMost(uint64_t len, uint32_t timeout) {
9494
return AsyncReadInner(ReadOperation::kReadAtMost, len, timeout);
9595
}
9696

97-
Future<NoncontiguousBuffer> HttpAsyncStream::AsyncReadExactly(uint64_t len, int timeout) {
97+
Future<NoncontiguousBuffer> HttpAsyncStream::AsyncReadExactly(uint64_t len, uint32_t timeout) {
9898
return AsyncReadInner(ReadOperation::kReadExactly, len, timeout);
9999
}
100100

101-
Future<NoncontiguousBuffer> HttpAsyncStream::AsyncReadInner(ReadOperation op, uint64_t len, int timeout) {
101+
Future<NoncontiguousBuffer> HttpAsyncStream::AsyncReadInner(ReadOperation op, uint64_t len, uint32_t timeout) {
102102
if (read_mode_ == DataMode::kNoData) {
103103
// can not read data when content-length equal to 0
104104
return MakeReadyFuture<NoncontiguousBuffer>(NoncontiguousBuffer{});

trpc/stream/http/async/stream.h

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@ class HttpAsyncStream : public HttpCommonStream {
3636

3737
/// @brief Reads the header asynchronously.
3838
/// @param timeout time to wait for the header to be ready
39-
Future<http::HttpHeader> AsyncReadHeader(int timeout = std::numeric_limits<int>::max());
39+
Future<http::HttpHeader> AsyncReadHeader(uint32_t timeout = std::numeric_limits<uint32_t>::max());
4040

4141
/// @brief Reads a chunk in chunked mode asynchronously, note that reading in non-chunked mode will fail
4242
/// @param timeout time to wait for the header to be ready
43-
Future<NoncontiguousBuffer> AsyncReadChunk(int timeout = std::numeric_limits<int>::max());
43+
Future<NoncontiguousBuffer> AsyncReadChunk(uint32_t timeout = std::numeric_limits<int>::max());
4444

4545
/// @brief Reads at most len data asynchronously.
4646
/// @param len max size to read
@@ -50,15 +50,15 @@ class HttpAsyncStream : public HttpCommonStream {
5050
/// An empty buffer means that the end has been read
5151
/// Usage scenario 1: Limits the maximum length of each read When the memory is limited.
5252
/// Usage scenario 2: Gets part of data in time and send it downstream on route server.
53-
Future<NoncontiguousBuffer> AsyncReadAtMost(uint64_t len, int timeout = std::numeric_limits<int>::max());
53+
Future<NoncontiguousBuffer> AsyncReadAtMost(uint64_t len, uint32_t timeout = std::numeric_limits<uint32_t>::max());
5454

5555
/// @brief Reads data with a fixed length. If eof is read, it will return as much data as there is in the network
5656
/// @param len size to read
5757
/// @param timeout time to wait for the header to be ready
5858
/// @note If the read buffer size is less than the required length, it means that eof has been read.
5959
/// Usage scenario 1: The requested data is compressed by a fixed size, and needs to be read and decompressed by
6060
/// a fixed size.
61-
Future<NoncontiguousBuffer> AsyncReadExactly(uint64_t len, int timeout = std::numeric_limits<int>::max());
61+
Future<NoncontiguousBuffer> AsyncReadExactly(uint64_t len, uint32_t timeout = std::numeric_limits<uint32_t>::max());
6262

6363
protected:
6464
template <class T>
@@ -98,7 +98,7 @@ class HttpAsyncStream : public HttpCommonStream {
9898

9999
/// @brief Creates a scheduled waiting task
100100
template <class T>
101-
void CreatePendingTimer(PendingVal<T>* pending, int timeout);
101+
void CreatePendingTimer(PendingVal<T>* pending, uint32_t timeout);
102102

103103
/// @brief Checks the pending state
104104
template <class T>
@@ -110,7 +110,7 @@ class HttpAsyncStream : public HttpCommonStream {
110110

111111
void NotifyPendingDataQueue();
112112

113-
Future<NoncontiguousBuffer> AsyncReadInner(ReadOperation op, uint64_t len, int timeout);
113+
Future<NoncontiguousBuffer> AsyncReadInner(ReadOperation op, uint64_t len, uint32_t timeout);
114114

115115
protected:
116116
/// @brief Used to store asynchronous data request
@@ -147,7 +147,7 @@ void HttpAsyncStream::PendingDone(PendingVal<T>* pending) {
147147
}
148148

149149
template <class T>
150-
void HttpAsyncStream::CreatePendingTimer(PendingVal<T>* pending, int timeout) {
150+
void HttpAsyncStream::CreatePendingTimer(PendingVal<T>* pending, uint32_t timeout) {
151151
TRPC_CHECK_EQ(pending->timer_id, iotimer::InvalidID);
152152
pending->timer_id = iotimer::Create(timeout, 0, [this, pending]() {
153153
if (!pending->val) {

0 commit comments

Comments
 (0)