Skip to content

Commit 1df2a33

Browse files
committed
LBs should avoid calling LBs after lb.shutdown()
LoadBalancers shouldn't be called after shutdown(), but RingHashLb could have enqueued work to the SynchronizationContext that executed after shutdown(). This commit fixes problems discovered when auditing all LBs usage of the syncContext for that type of problem. Similarly, PickFirstLb could have requested a new connection after shutdown(). We want to avoid that sort of thing too. RingHashLb's test changed from CONNECTING to TRANSIENT_FAILURE to get the latest picker. Because two subchannels have failed it will be in TRANSIENT_FAILURE. Previously the test was using an older picker with out-of-date subchannelView, and the verifyConnection() was too imprecise to notice it was creating the wrong subchannel. As discovered in b/430347751, where ClusterImplLb was seeing a new subchannel being called after the child LB was shutdown (the shutdown itself had been caused by RingHashConfig not implementing equals() and was fixed by a8de9f0, which caused ClusterResolverLb to replace its state): ``` java.lang.NullPointerException at io.grpc.xds.ClusterImplLoadBalancer$ClusterImplLbHelper.createClusterLocalityFromAttributes(ClusterImplLoadBalancer.java:322) at io.grpc.xds.ClusterImplLoadBalancer$ClusterImplLbHelper.createSubchannel(ClusterImplLoadBalancer.java:236) at io.grpc.util.ForwardingLoadBalancerHelper.createSubchannel(ForwardingLoadBalancerHelper.java:47) at io.grpc.util.ForwardingLoadBalancerHelper.createSubchannel(ForwardingLoadBalancerHelper.java:47) at io.grpc.internal.PickFirstLeafLoadBalancer.createNewSubchannel(PickFirstLeafLoadBalancer.java:527) at io.grpc.internal.PickFirstLeafLoadBalancer.requestConnection(PickFirstLeafLoadBalancer.java:459) at io.grpc.internal.PickFirstLeafLoadBalancer.acceptResolvedAddresses(PickFirstLeafLoadBalancer.java:174) at io.grpc.xds.LazyLoadBalancer$LazyDelegate.activate(LazyLoadBalancer.java:64) at io.grpc.xds.LazyLoadBalancer$LazyDelegate.requestConnection(LazyLoadBalancer.java:97) at io.grpc.util.ForwardingLoadBalancer.requestConnection(ForwardingLoadBalancer.java:61) at io.grpc.xds.RingHashLoadBalancer$RingHashPicker.lambda$pickSubchannel$0(RingHashLoadBalancer.java:440) at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:96) at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:128) at io.grpc.xds.client.XdsClientImpl$ResourceSubscriber.onData(XdsClientImpl.java:817) ```
1 parent a5eaa66 commit 1df2a33

File tree

9 files changed

+145
-32
lines changed

9 files changed

+145
-32
lines changed

api/src/main/java/io/grpc/LoadBalancer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1189,6 +1189,10 @@ public void ignoreRefreshNameResolutionCheck() {
11891189
* Returns a {@link SynchronizationContext} that runs tasks in the same Synchronization Context
11901190
* as that the callback methods on the {@link LoadBalancer} interface are run in.
11911191
*
1192+
* <p>Work added to the synchronization context might not run immediately, so LB implementations
1193+
* must be careful to ensure that any assumptions still hold when it is executed. In particular,
1194+
* the LB might have been shut down or subchannels might have changed state.
1195+
*
11921196
* <p>Pro-tip: in order to call {@link SynchronizationContext#schedule}, you need to provide a
11931197
* {@link ScheduledExecutorService}. {@link #getScheduledExecutorService} is provided for your
11941198
* convenience.

core/src/main/java/io/grpc/internal/ManagedChannelImpl.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1970,6 +1970,9 @@ public void run() {
19701970
public void requestConnection() {
19711971
syncContext.throwIfNotInThisSynchronizationContext();
19721972
checkState(started, "not started");
1973+
if (shutdown) {
1974+
return;
1975+
}
19731976
subchannel.obtainActiveTransport();
19741977
}
19751978

core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo
134134
SubchannelPicker picker;
135135
switch (newState) {
136136
case IDLE:
137-
picker = new RequestConnectionPicker(subchannel);
137+
picker = new RequestConnectionPicker();
138138
break;
139139
case CONNECTING:
140140
// It's safe to use RequestConnectionPicker here, so when coming from IDLE we could leave
@@ -197,22 +197,12 @@ public String toString() {
197197

198198
/** Picker that requests connection during the first pick, and returns noResult. */
199199
private final class RequestConnectionPicker extends SubchannelPicker {
200-
private final Subchannel subchannel;
201200
private final AtomicBoolean connectionRequested = new AtomicBoolean(false);
202201

203-
RequestConnectionPicker(Subchannel subchannel) {
204-
this.subchannel = checkNotNull(subchannel, "subchannel");
205-
}
206-
207202
@Override
208203
public PickResult pickSubchannel(PickSubchannelArgs args) {
209204
if (connectionRequested.compareAndSet(false, true)) {
210-
helper.getSynchronizationContext().execute(new Runnable() {
211-
@Override
212-
public void run() {
213-
subchannel.requestConnection();
214-
}
215-
});
205+
helper.getSynchronizationContext().execute(PickFirstLoadBalancer.this::requestConnection);
216206
}
217207
return PickResult.withNoResult();
218208
}

core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1767,6 +1767,19 @@ public void subchannelsNoConnectionShutdown() {
17671767
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
17681768
}
17691769

1770+
@Test
1771+
public void subchannelsRequestConnectionNoopAfterShutdown() {
1772+
createChannel();
1773+
Subchannel sub1 =
1774+
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
1775+
1776+
shutdownSafely(helper, sub1);
1777+
requestConnectionSafely(helper, sub1);
1778+
verify(mockTransportFactory, never())
1779+
.newClientTransport(
1780+
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
1781+
}
1782+
17701783
@Test
17711784
public void subchannelsNoConnectionShutdownNow() {
17721785
createChannel();

examples/src/main/java/io/grpc/examples/customloadbalance/ShufflingPickFirstLoadBalancer.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo
122122
SubchannelPicker picker;
123123
switch (currentState) {
124124
case IDLE:
125-
picker = new RequestConnectionPicker(subchannel);
125+
picker = new RequestConnectionPicker();
126126
break;
127127
case CONNECTING:
128128
picker = new Picker(PickResult.withNoResult());
@@ -182,24 +182,15 @@ public String toString() {
182182
*/
183183
private final class RequestConnectionPicker extends SubchannelPicker {
184184

185-
private final Subchannel subchannel;
186185
private final AtomicBoolean connectionRequested = new AtomicBoolean(false);
187186

188-
RequestConnectionPicker(Subchannel subchannel) {
189-
this.subchannel = checkNotNull(subchannel, "subchannel");
190-
}
191-
192187
@Override
193188
public PickResult pickSubchannel(PickSubchannelArgs args) {
194189
if (connectionRequested.compareAndSet(false, true)) {
195-
helper.getSynchronizationContext().execute(new Runnable() {
196-
@Override
197-
public void run() {
198-
subchannel.requestConnection();
199-
}
200-
});
190+
helper.getSynchronizationContext().execute(
191+
ShufflingPickFirstLoadBalancer.this::requestConnection);
201192
}
202193
return PickResult.withNoResult();
203194
}
204195
}
205-
}
196+
}

xds/src/main/java/io/grpc/xds/LazyLoadBalancer.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,13 @@ public void requestConnection() {
9999

100100
@Override
101101
public void shutdown() {
102+
delegate = new NoopLoadBalancer();
102103
}
103104

104105
private final class LazyPicker extends SubchannelPicker {
105106
@Override
106107
public PickResult pickSubchannel(PickSubchannelArgs args) {
108+
// activate() is a no-op after shutdown()
107109
helper.getSynchronizationContext().execute(LazyDelegate.this::activate);
108110
return PickResult.withNoResult();
109111
}
@@ -121,4 +123,17 @@ public Factory(LoadBalancer.Factory delegate) {
121123
return new LazyLoadBalancer(helper, delegate);
122124
}
123125
}
126+
127+
private static final class NoopLoadBalancer extends LoadBalancer {
128+
@Override
129+
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
130+
return Status.OK;
131+
}
132+
133+
@Override
134+
public void handleNameResolutionError(Status error) {}
135+
136+
@Override
137+
public void shutdown() {}
138+
}
124139
}

xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,9 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
438438

439439
if (subchannelView.connectivityState == IDLE) {
440440
syncContext.execute(() -> {
441-
childLbState.getLb().requestConnection();
441+
if (childLbState.getCurrentState() == IDLE) {
442+
childLbState.getLb().requestConnection();
443+
}
442444
});
443445

444446
return PickResult.withNoResult(); // Indicates that this should be retried after backoff
@@ -456,10 +458,11 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
456458
return childLbState.getCurrentPicker().pickSubchannel(args);
457459
}
458460
if (!requestedConnection && subchannelView.connectivityState == IDLE) {
459-
syncContext.execute(
460-
() -> {
461-
childLbState.getLb().requestConnection();
462-
});
461+
syncContext.execute(() -> {
462+
if (childLbState.getCurrentState() == IDLE) {
463+
childLbState.getLb().requestConnection();
464+
}
465+
});
463466
requestedConnection = true;
464467
}
465468
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright 2025 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.xds;
18+
19+
import static com.google.common.truth.Truth.assertThat;
20+
21+
import io.grpc.CallOptions;
22+
import io.grpc.ConnectivityState;
23+
import io.grpc.EquivalentAddressGroup;
24+
import io.grpc.LoadBalancer;
25+
import io.grpc.LoadBalancer.ResolvedAddresses;
26+
import io.grpc.LoadBalancer.SubchannelPicker;
27+
import io.grpc.ManagedChannel;
28+
import io.grpc.Metadata;
29+
import io.grpc.SynchronizationContext;
30+
import io.grpc.internal.PickSubchannelArgsImpl;
31+
import io.grpc.testing.TestMethodDescriptors;
32+
import java.util.Arrays;
33+
import org.junit.Test;
34+
import org.junit.runner.RunWith;
35+
import org.junit.runners.JUnit4;
36+
37+
/** Unit test for {@link io.grpc.xds.LazyLoadBalancer}. */
38+
@RunWith(JUnit4.class)
39+
public final class LazyLoadBalancerTest {
40+
private SynchronizationContext syncContext =
41+
new SynchronizationContext((t, e) -> {
42+
throw new AssertionError(e);
43+
});
44+
private LoadBalancer.PickSubchannelArgs args = new PickSubchannelArgsImpl(
45+
TestMethodDescriptors.voidMethod(),
46+
new Metadata(),
47+
CallOptions.DEFAULT,
48+
new LoadBalancer.PickDetailsConsumer() {});
49+
private FakeHelper helper = new FakeHelper();
50+
51+
@Test
52+
public void pickerIsNoopAfterEarlyShutdown() {
53+
LazyLoadBalancer lb = new LazyLoadBalancer(helper, new LoadBalancer.Factory() {
54+
@Override
55+
public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
56+
throw new AssertionError("unexpected");
57+
}
58+
});
59+
lb.acceptResolvedAddresses(ResolvedAddresses.newBuilder()
60+
.setAddresses(Arrays.asList())
61+
.build());
62+
SubchannelPicker picker = helper.picker;
63+
assertThat(picker).isNotNull();
64+
lb.shutdown();
65+
66+
picker.pickSubchannel(args);
67+
}
68+
69+
class FakeHelper extends LoadBalancer.Helper {
70+
ConnectivityState state;
71+
SubchannelPicker picker;
72+
73+
@Override
74+
public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) {
75+
throw new UnsupportedOperationException();
76+
}
77+
78+
@Override
79+
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
80+
this.state = newState;
81+
this.picker = newPicker;
82+
}
83+
84+
@Override
85+
public SynchronizationContext getSynchronizationContext() {
86+
return syncContext;
87+
}
88+
89+
@Override
90+
public String getAuthority() {
91+
return "localhost";
92+
}
93+
}
94+
}

xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ public void aggregateSubchannelStates_connectingReadyIdleFailure() {
261261
private void verifyConnection(int times) {
262262
for (int i = 0; i < times; i++) {
263263
Subchannel connectOnce = connectionRequestedQueue.poll();
264-
assertWithMessage("Null connection is at (%s) of (%s)", i, times)
264+
assertWithMessage("Expected %s new connections, but found %s", times, i)
265265
.that(connectOnce).isNotNull();
266266
clearInvocations(connectOnce);
267267
}
@@ -648,7 +648,7 @@ public void skipFailingHosts_firstTwoHostsFailed_pickNextFirstReady() {
648648
getSubchannel(servers, 2),
649649
ConnectivityStateInfo.forTransientFailure(
650650
Status.PERMISSION_DENIED.withDescription("permission denied")));
651-
verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
651+
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
652652
verifyConnection(0);
653653
PickResult result = pickerCaptor.getValue().pickSubchannel(args); // activate last subchannel
654654
assertThat(result.getStatus().isOk()).isTrue();

0 commit comments

Comments
 (0)