-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Preserve Thread Context by gRPC Interceptor #19776
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Preserve Thread Context by gRPC Interceptor #19776
Conversation
|
❌ Gradle check result for 371161d: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
d2985ea to
ab49942
Compare
|
❌ Gradle check result for ab49942: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
| // Check for duplicates and throw exception if found | ||
| for (Map.Entry<Integer, List<OrderedGrpcInterceptor>> entry : orderMap.entrySet()) { | ||
| if (entry.getValue().size() > 1) { | ||
| throw new IllegalArgumentException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if it would be helpful to output entry.getValue() in a friendly way (maybe call getClass() on each one?) to show which specific interceptors collide.
For the record, ActionFilter seems to allow duplicate order, but will (I think) just end up sorting things based on the plugin loading order. I think it's a good idea to be more strict with Grpc interceptors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh -- I missed the fact that this code was just moved from loadExtensions to createComponents and is not newly-written. Still, I think it might help debugging if we know which interceptors conflict.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Updated the error Msg to display conflicting interceptor name.
| this.workerEventLoopGroup = new NioEventLoopGroup(nettyEventLoopThreads, daemonThreadFactory(settings, "grpc_worker")); | ||
|
|
||
| // Use OpenSearch's managed thread pool for gRPC request processing | ||
| this.grpcExecutor = threadPool.executor("grpc"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
| logger.debug("Captured ThreadContext after interceptors on thread: {}", Thread.currentThread().getName()); | ||
|
|
||
| // Wrap the listener to restore ThreadContext in all callbacks | ||
| return new ForwardingServerCallListener.SimpleForwardingServerCallListener<>(delegate) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Conceptually, this seems pretty similar to ContextPreservingActionListener.
I'm not very familiar with ContextPreservingActionListener, but I'm wondering if we can follow an identical pattern, where you take a Supplier<ThreadContext.StoredContext> (obtained from threadContext.newRestorableContext()). Then I think runWithThreadContext could just be:
try (ThreadContext.StoredContext ignored = context.get()) {
r.run();
}
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@msfroh main use-case I'm aware of for ContextPreservingActionListener is to run one specific action as system-level (i.e. stash the headers) and restore immediately upon completion so subsequent actions run in the original context.
In practice, it makes sense in the context of the security plugin to stash the context for privileged actions like system index access and then restore the original context of the authenticated user. It doesn't appear to be widely used, but I do see this comment in the core repo:
OpenSearch/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java
Lines 200 to 204 in 789f136
| * Without security the origin is ignored, but security uses it to authorize | |
| * actions that are made up of many sub-actions. These actions call | |
| * {@link #stashWithOrigin} before performing on behalf of a user that | |
| * should be allowed even if the user doesn't have permission to perform | |
| * those actions on their own. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
main use-case I'm aware of for ContextPreservingActionListener is to run one specific action as system-level (i.e. stash the headers) and restore immediately upon completion so subsequent actions run in the original context.
I could be mistaken, but I thought that the purpose of ContextPreservingActionListener is to propagate/preserve the ThreadLocal context across the threads for a given request. When you create a Runnable, you capture the current thread's context, then when the runnable runs on a worker thread, it applies that captured context (propagating it from the caller to the worker), then when the Runnable completes, it restores the worker thread's previous context.
That seems to be the same thing that @aparajita31pandey is doing with this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thankyou, code now follows the same pattern as ContextPreservingActionListener. Supplier is handling the restore pattern internally when get() is called.
Signed-off-by: Aparajita Pandey <[email protected]>
Signed-off-by: Aparajita Pandey <[email protected]>
Signed-off-by: Aparajita Pandey <[email protected]>
dd0261d to
adca9b9
Compare
Signed-off-by: Aparajita Pandey <[email protected]>
|
❌ Gradle check result for 13a82b7: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: Aparajita Pandey <[email protected]>
Signed-off-by: Aparajita Pandey <[email protected]>
|
❌ Gradle check result for df61802: null Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
❌ Gradle check result for df61802: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: Aparajita Pandey <[email protected]>
|
❌ Gradle check result for d5f9a71: null Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Description
This change is build on top of #19005, where gRPC interceptor was introduced. It add ThreadContext to be accessed by the interceptors and also thread context preservation between interceptor and grpc Service execution
Problem: gRPC callbacks run on different threads than interceptors. OpenSearch's ThreadContext added by interceptor are in thread-local and gets lost during thread switches, when reaches gRPC ServiceImpl. In this diff,
After interceptors populate ThreadContext, we capture it using threadContext.newRestorableContext(false) and wrap all gRPC listener callbacks to restore the captured context before execution, following the same pattern as
ContextPreservingActionListenerWith this change, it bridges the gap between gRPC's async threading model and OpenSearch's thread-local context model.
Related Issues
Builds on #19005
Check List
Test Plan
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.