Skip to content

Commit a6afd90

Browse files
committed
fix: TRPC_STREAM do not have context
1 parent 0a23ad4 commit a6afd90

File tree

6 files changed

+78
-41
lines changed

6 files changed

+78
-41
lines changed

trpc/server/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ cc_library(
126126
"//trpc/codec/trpc",
127127
"//trpc/common:status",
128128
"//trpc/compressor:trpc_compressor",
129+
"//trpc/coroutine:fiber_local",
129130
"//trpc/filter:server_filter_controller_h",
130131
"//trpc/serialization:serialization_type",
131132
"//trpc/stream:stream_provider",

trpc/server/server_context.cc

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include "trpc/common/config/trpc_config.h"
2121
#include "trpc/compressor/trpc_compressor.h"
22+
#include "trpc/coroutine/fiber_local.h"
2223
#include "trpc/runtime/common/stats/frame_stats.h"
2324
#include "trpc/serialization/serialization_factory.h"
2425
#include "trpc/server/service.h"
@@ -303,4 +304,28 @@ void ServerContext::ThrottleConnection(bool set) {
303304
}
304305
}
305306

307+
// Context used for storing data in a fiber environment.
308+
FiberLocal<ServerContext*> fls_server_context;
309+
310+
// Context used for storing data in a regular thread environment, such as setting it in a business thread and releasing
311+
// it when the business request processing is completed.
312+
thread_local ServerContext* tls_server_context = nullptr;
313+
314+
void SetLocalServerContext(const ServerContextPtr& context) {
315+
// Set to fiberLocal in a fiber environment, and set to threadLocal in a regular thread environment.
316+
if (trpc::fiber::detail::GetCurrentFiberEntity()) {
317+
*fls_server_context = context.Get();
318+
} else {
319+
tls_server_context = context.Get();
320+
}
321+
}
322+
323+
ServerContextPtr GetLocalServerContext() {
324+
// Retrieve from fiberLocal in a fiber environment, and retrieve from threadLocal in a regular thread environment.
325+
if (trpc::fiber::detail::GetCurrentFiberEntity()) {
326+
return RefPtr(ref_ptr, *fls_server_context);
327+
}
328+
return RefPtr(ref_ptr, tls_server_context);
329+
}
330+
306331
} // namespace trpc

trpc/server/server_context.h

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -349,27 +349,21 @@ class ServerContext : public RefCounted<ServerContext> {
349349
void SetReqEncodeType(uint8_t type) { invoke_info_.req_encode_type = type; }
350350

351351
/// @brief Framework use or for testing. Set the compression type of request data.
352-
void SetReqCompressType(uint8_t compress_type) {
353-
invoke_info_.req_compress_type = compress_type;
354-
}
352+
void SetReqCompressType(uint8_t compress_type) { invoke_info_.req_compress_type = compress_type; }
355353

356354
/// @brief Get the compression type for compressing request message.
357355
uint8_t GetReqCompressType() const { return invoke_info_.req_compress_type; }
358356
/// @brief Deprecated: use `GetReqCompressType` instead.
359357
[[deprecated("use GetReqCompressType instead")]] uint8_t GetCompressType() const { return GetReqCompressType(); }
360358

361359
/// @brief Set the compression type for decompressing response message.
362-
void SetRspCompressType(uint8_t compress_type) {
363-
invoke_info_.rsp_compress_type = compress_type;
364-
}
360+
void SetRspCompressType(uint8_t compress_type) { invoke_info_.rsp_compress_type = compress_type; }
365361

366362
/// @brief Get the compression type for decompressing response message.
367363
uint8_t GetRspCompressType() const { return invoke_info_.rsp_compress_type; }
368364

369365
/// @brief Set the compression level for decompressing response message.
370-
void SetRspCompressLevel(uint8_t compress_level) {
371-
invoke_info_.rsp_compress_level = compress_level;
372-
}
366+
void SetRspCompressLevel(uint8_t compress_level) { invoke_info_.rsp_compress_level = compress_level; }
373367

374368
/// @brief Get the compression level for decompressing response message.
375369
uint8_t GetRspCompressLevel() const { return invoke_info_.rsp_compress_level; }
@@ -794,4 +788,12 @@ using ServerContextPtr = RefPtr<ServerContext>;
794788
template <typename T>
795789
using is_server_context = std::is_same<T, ServerContext>;
796790

791+
/// @brief Set the context to a thread-private variable. The private variable itself does not hold the context. The set
792+
/// operation must be used when the ctx is valid within its lifecycle.
793+
void SetLocalServerContext(const ServerContextPtr& context);
794+
795+
/// @brief Retrieve the context from a thread-private variable. The private variable itself does not hold the context.
796+
/// The get operation must be used when the ctx is valid within its lifecycle.
797+
ServerContextPtr GetLocalServerContext();
798+
797799
} // namespace trpc

trpc/server/service_adapter.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,8 @@ bool ServiceAdapter::HandleMessage(const ConnectionPtr& conn, std::deque<std::an
285285
succ = false;
286286
}
287287

288+
SetLocalServerContext(req_msg->context);
289+
288290
MsgTaskHandler msg_handler = [this, req_msg]() mutable {
289291
auto& context = req_msg->context;
290292

@@ -302,6 +304,7 @@ bool ServiceAdapter::HandleMessage(const ConnectionPtr& conn, std::deque<std::an
302304

303305
if (send) {
304306
this->transport_->SendMsg(send);
307+
SetLocalServerContext(nullptr);
305308
}
306309
};
307310

@@ -364,6 +367,7 @@ bool ServiceAdapter::HandleFiberMessage(const ConnectionPtr& conn, std::deque<st
364367
context->SetBeginTimestampUs(trpc::time::GetMicroSeconds());
365368

366369
RunServerFilters(FilterPoint::SERVER_POST_SCHED_RECV_MSG, req_msg);
370+
SetLocalServerContext(req_msg->context);
367371

368372
STransportRspMsg* send = nullptr;
369373
Service* service = context->GetService();
@@ -381,6 +385,7 @@ bool ServiceAdapter::HandleFiberMessage(const ConnectionPtr& conn, std::deque<st
381385
if (send) {
382386
send->context->SetReserved(static_cast<void*>(conn.Get()));
383387
this->transport_->SendMsg(send);
388+
SetLocalServerContext(nullptr);
384389
}
385390

386391
conn->Deref();

trpc/util/log/logging.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ using trpc::kTrpcLogCacheStringDefault;
259259
/// @note Use case: Separate business logs from framework logs,
260260
/// Different business logs specify different loggers.
261261
/// For example, if remote logs are connected, business logs can be output to remote.
262-
#define TRPC_FLOW_LOG(instance, msg) TRPC_STREAM(instance, ::trpc::Log::info, msg)
262+
#define TRPC_FLOW_LOG(instance, msg) TRPC_STREAM(instance, ::trpc::Log::info, nullptr, msg)
263263
#define TRPC_FLOW_LOG_EX(context, instance, msg) TRPC_STREAM(instance, ::trpc::Log::info, context, msg)
264264

265265
/// @brief Provides ASSERT that does not invalidate in release mode

trpc/util/log/stream_like.h

Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -24,30 +24,34 @@
2424
__TRPC_STREAM__ << msg
2525

2626
/// @brief stream-like log macros
27-
#define TRPC_STREAM(instance, level, msg) \
28-
do { \
29-
const auto& __TRPC_CPP_STREAM_LOGGER_INSTANCE__ = ::trpc::LogFactory::GetInstance()->Get(); \
30-
if (__TRPC_CPP_STREAM_LOGGER_INSTANCE__) { \
31-
if (__TRPC_CPP_STREAM_LOGGER_INSTANCE__->ShouldLog(instance, level)) { \
32-
TRPC_LOG_TRY { \
33-
STREAM_APPENDER(msg); \
34-
__TRPC_CPP_STREAM_LOGGER_INSTANCE__->LogIt(instance, level, __FILE__, __LINE__, __FUNCTION__, \
35-
__TRPC_STREAM__.str()); \
36-
} \
37-
TRPC_LOG_CATCH(instance) \
38-
} \
39-
} else { \
40-
if (::trpc::Log::ShouldNoLog(instance, level)) { \
41-
TRPC_LOG_TRY { \
42-
STREAM_APPENDER(msg); \
43-
::trpc::Log::NoLog(instance, level, __FILE__, __LINE__, __FUNCTION__, __TRPC_STREAM__.str()); \
44-
} \
45-
TRPC_LOG_CATCH(instance) \
46-
} \
47-
} \
27+
#define TRPC_STREAM(instance, level, context, msg) \
28+
do { \
29+
const auto& __TRPC_CPP_STREAM_LOGGER_INSTANCE__ = ::trpc::LogFactory::GetInstance()->Get(); \
30+
if (__TRPC_CPP_STREAM_LOGGER_INSTANCE__) { \
31+
if (__TRPC_CPP_STREAM_LOGGER_INSTANCE__->ShouldLog(instance, level)) { \
32+
TRPC_LOG_TRY { \
33+
STREAM_APPENDER(msg); \
34+
if (context) { \
35+
__TRPC_CPP_STREAM_LOGGER_INSTANCE__->LogIt(instance, level, __FILE__, __LINE__, __FUNCTION__, \
36+
__TRPC_STREAM__.str(), context->GetAllFilterData()); \
37+
} else { \
38+
__TRPC_CPP_STREAM_LOGGER_INSTANCE__->LogIt(instance, level, __FILE__, __LINE__, __FUNCTION__, \
39+
__TRPC_STREAM__.str()); \
40+
} \
41+
} \
42+
TRPC_LOG_CATCH(instance) \
43+
} \
44+
} else { \
45+
if (::trpc::Log::ShouldNoLog(instance, level)) { \
46+
TRPC_LOG_TRY { \
47+
STREAM_APPENDER(msg); \
48+
::trpc::Log::NoLog(instance, level, __FILE__, __LINE__, __FUNCTION__, __TRPC_STREAM__.str()); \
49+
} \
50+
TRPC_LOG_CATCH(instance) \
51+
} \
52+
} \
4853
} while (0)
4954

50-
5155
/// @brief stream-like log macros for tRPC-Cpp framework log
5256
#define TRPC_STREAM_DEFAULT(instance, level, msg) \
5357
do { \
@@ -73,7 +77,7 @@
7377
} while (0)
7478

7579
/// @brief stream-like log macros for tRPC-Cpp framework
76-
#define TRPC_STREAM_EX_DEFAULT(instance, level, context, msg) \
80+
#define TRPC_STREAM_EX_DEFAULT(instance, level, context, msg) \
7781
do { \
7882
const auto& __TRPC_CPP_STREAM_LOGGER_INSTANCE__ = ::trpc::LogFactory::GetInstance()->Get(); \
7983
if (__TRPC_CPP_STREAM_LOGGER_INSTANCE__) { \
@@ -96,7 +100,6 @@
96100
} \
97101
} while (0)
98102

99-
100103
/// @brief stream-like log macros
101104
#define TRPC_STREAM_EX(instance, level, context, msg) \
102105
do { \
@@ -136,16 +139,17 @@
136139
}
137140

138141
/// @brief uses default logger for logging with context
139-
#define TRPC_LOG_MSG_IF_EX(level, context, condition, msg) \
140-
if (condition) { \
141-
TRPC_LOG_MSG_EX(level, context, msg); \
142+
#define TRPC_LOG_MSG_IF_EX(level, context, condition, msg) \
143+
if (condition) { \
144+
TRPC_LOG_MSG_EX(level, context, msg); \
142145
}
143146

144-
#define TRPC_LOGGER_MSG_IF_EX(level, context, instance, condition, msg) \
145-
if (condition) { \
146-
TRPC_LOGGER_MSG_EX(level, context, instance, msg); \
147+
#define TRPC_LOGGER_MSG_IF_EX(level, context, instance, condition, msg) \
148+
if (condition) { \
149+
TRPC_LOGGER_MSG_EX(level, context, instance, msg); \
147150
}
148151

149152
#define TRPC_LOGGER_MSG_EX(level, context, instance, msg) TRPC_STREAM_EX(instance, level, context, msg)
150153

151-
#define TRPC_LOG_MSG_EX(level, context, msg) TRPC_STREAM_EX_DEFAULT(::trpc::log::kTrpcLogCacheStringDefault, level, context, msg)
154+
#define TRPC_LOG_MSG_EX(level, context, msg) \
155+
TRPC_STREAM_EX_DEFAULT(::trpc::log::kTrpcLogCacheStringDefault, level, context, msg)

0 commit comments

Comments
 (0)