Skip to content

Commit 0d44203

Browse files
authored
xds: Delay priority LB updates from children (#9670)
If a child policy triggers an update to the parent priority policy it will be ignored if an update is already in process. This is the second attempt to make this change, the first one caused a problem with the ring hash LB. A new test that uses actual control plane and data plane servers is now included to prove the issue no longer appears.
1 parent ba182c3 commit 0d44203

File tree

3 files changed

+133
-27
lines changed

3 files changed

+133
-27
lines changed

xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java

Lines changed: 35 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ final class PriorityLoadBalancer extends LoadBalancer {
5858
private final XdsLogger logger;
5959

6060
// Includes all active and deactivated children. Mutable. New entries are only added from priority
61-
// 0 up to the selected priority. An entry is only deleted 15 minutes after the its deactivation.
61+
// 0 up to the selected priority. An entry is only deleted 15 minutes after its deactivation.
6262
private final Map<String, ChildLbState> children = new HashMap<>();
6363

6464
// Following fields are only null initially.
@@ -70,6 +70,8 @@ final class PriorityLoadBalancer extends LoadBalancer {
7070
@Nullable private String currentPriority;
7171
private ConnectivityState currentConnectivityState;
7272
private SubchannelPicker currentPicker;
73+
// Set to true if currently in the process of handling resolved addresses.
74+
private boolean handlingResolvedAddresses;
7375

7476
PriorityLoadBalancer(Helper helper) {
7577
this.helper = checkNotNull(helper, "helper");
@@ -94,11 +96,13 @@ public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
9496
children.get(priority).deactivate();
9597
}
9698
}
99+
handlingResolvedAddresses = true;
97100
for (String priority : priorityNames) {
98101
if (children.containsKey(priority)) {
99102
children.get(priority).updateResolvedAddresses();
100103
}
101104
}
105+
handlingResolvedAddresses = false;
102106
tryNextPriority();
103107
return true;
104108
}
@@ -134,8 +138,11 @@ private void tryNextPriority() {
134138
ChildLbState child =
135139
new ChildLbState(priority, priorityConfigs.get(priority).ignoreReresolution);
136140
children.put(priority, child);
137-
child.updateResolvedAddresses();
138141
updateOverallState(priority, CONNECTING, BUFFER_PICKER);
142+
// Calling the child's updateResolvedAddresses() can result in tryNextPriority() being
143+
// called recursively. We need to be sure to be done with processing here before it is
144+
// called.
145+
child.updateResolvedAddresses();
139146
return; // Give priority i time to connect.
140147
}
141148
ChildLbState child = children.get(priority);
@@ -298,32 +305,33 @@ public void refreshNameResolution() {
298305
@Override
299306
public void updateBalancingState(final ConnectivityState newState,
300307
final SubchannelPicker newPicker) {
301-
syncContext.execute(new Runnable() {
302-
@Override
303-
public void run() {
304-
if (!children.containsKey(priority)) {
305-
return;
306-
}
307-
connectivityState = newState;
308-
picker = newPicker;
309-
if (deletionTimer != null && deletionTimer.isPending()) {
310-
return;
311-
}
312-
if (newState.equals(CONNECTING) ) {
313-
if (!failOverTimer.isPending() && seenReadyOrIdleSinceTransientFailure) {
314-
failOverTimer = syncContext.schedule(new FailOverTask(), 10, TimeUnit.SECONDS,
315-
executor);
316-
}
317-
} else if (newState.equals(READY) || newState.equals(IDLE)) {
318-
seenReadyOrIdleSinceTransientFailure = true;
319-
failOverTimer.cancel();
320-
} else if (newState.equals(TRANSIENT_FAILURE)) {
321-
seenReadyOrIdleSinceTransientFailure = false;
322-
failOverTimer.cancel();
323-
}
324-
tryNextPriority();
308+
if (!children.containsKey(priority)) {
309+
return;
310+
}
311+
connectivityState = newState;
312+
picker = newPicker;
313+
314+
if (deletionTimer != null && deletionTimer.isPending()) {
315+
return;
316+
}
317+
if (newState.equals(CONNECTING)) {
318+
if (!failOverTimer.isPending() && seenReadyOrIdleSinceTransientFailure) {
319+
failOverTimer = syncContext.schedule(new FailOverTask(), 10, TimeUnit.SECONDS,
320+
executor);
325321
}
326-
});
322+
} else if (newState.equals(READY) || newState.equals(IDLE)) {
323+
seenReadyOrIdleSinceTransientFailure = true;
324+
failOverTimer.cancel();
325+
} else if (newState.equals(TRANSIENT_FAILURE)) {
326+
seenReadyOrIdleSinceTransientFailure = false;
327+
failOverTimer.cancel();
328+
}
329+
330+
// If we are currently handling newly resolved addresses, let's not try to reconfigure as
331+
// the address handling process will take care of that to provide an atomic config update.
332+
if (!handlingResolvedAddresses) {
333+
tryNextPriority();
334+
}
327335
}
328336

329337
@Override

xds/src/test/java/io/grpc/xds/FakeControlPlaneXdsIntegrationTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.protobuf.Any;
2525
import com.google.protobuf.Struct;
2626
import com.google.protobuf.Value;
27+
import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy;
2728
import io.envoyproxy.envoy.config.cluster.v3.LoadBalancingPolicy;
2829
import io.envoyproxy.envoy.config.cluster.v3.LoadBalancingPolicy.Policy;
2930
import io.envoyproxy.envoy.config.core.v3.TypedExtensionConfig;
@@ -151,4 +152,24 @@ public void onHeaders(Metadata headers) {
151152
};
152153
}
153154
}
155+
156+
/**
157+
* Basic test to make sure RING_HASH configuration works.
158+
*/
159+
@Test
160+
public void pingPong_ringHash() {
161+
controlPlane.setCdsConfig(
162+
ControlPlaneRule.buildCluster().toBuilder()
163+
.setLbPolicy(LbPolicy.RING_HASH).build());
164+
165+
ManagedChannel channel = dataPlane.getManagedChannel();
166+
SimpleServiceGrpc.SimpleServiceBlockingStub blockingStub = SimpleServiceGrpc.newBlockingStub(
167+
channel);
168+
SimpleRequest request = SimpleRequest.newBuilder()
169+
.build();
170+
SimpleResponse goldenResponse = SimpleResponse.newBuilder()
171+
.setResponseMessage("Hi, xDS!")
172+
.build();
173+
assertEquals(goldenResponse, blockingStub.unaryRpc(request));
174+
}
154175
}

xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static io.grpc.ConnectivityState.READY;
2323
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
2424
import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;
25+
import static org.mockito.ArgumentMatchers.any;
2526
import static org.mockito.ArgumentMatchers.eq;
2627
import static org.mockito.ArgumentMatchers.isA;
2728
import static org.mockito.Mockito.atLeastOnce;
@@ -686,6 +687,37 @@ public void raceBetweenShutdownAndChildLbBalancingStateUpdate() {
686687
verifyNoMoreInteractions(helper);
687688
}
688689

690+
@Test
691+
public void noDuplicateOverallBalancingStateUpdate() {
692+
FakeLoadBalancerProvider fakeLbProvider = new FakeLoadBalancerProvider();
693+
694+
PriorityChildConfig priorityChildConfig0 =
695+
new PriorityChildConfig(new PolicySelection(fakeLbProvider, new Object()), true);
696+
PriorityChildConfig priorityChildConfig1 =
697+
new PriorityChildConfig(new PolicySelection(fakeLbProvider, new Object()), false);
698+
PriorityLbConfig priorityLbConfig =
699+
new PriorityLbConfig(
700+
ImmutableMap.of("p0", priorityChildConfig0),
701+
ImmutableList.of("p0"));
702+
priorityLb.handleResolvedAddresses(
703+
ResolvedAddresses.newBuilder()
704+
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
705+
.setLoadBalancingPolicyConfig(priorityLbConfig)
706+
.build());
707+
708+
priorityLbConfig =
709+
new PriorityLbConfig(
710+
ImmutableMap.of("p0", priorityChildConfig0, "p1", priorityChildConfig1),
711+
ImmutableList.of("p0", "p1"));
712+
priorityLb.handleResolvedAddresses(
713+
ResolvedAddresses.newBuilder()
714+
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
715+
.setLoadBalancingPolicyConfig(priorityLbConfig)
716+
.build());
717+
718+
verify(helper, times(6)).updateBalancingState(any(), any());
719+
}
720+
689721
private void assertLatestConnectivityState(ConnectivityState expectedState) {
690722
verify(helper, atLeastOnce())
691723
.updateBalancingState(connectivityStateCaptor.capture(), pickerCaptor.capture());
@@ -714,4 +746,49 @@ private void assertCurrentPickerIsBufferPicker() {
714746
PickResult pickResult = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
715747
assertThat(pickResult).isEqualTo(PickResult.withNoResult());
716748
}
749+
750+
private static class FakeLoadBalancerProvider extends LoadBalancerProvider {
751+
752+
@Override
753+
public boolean isAvailable() {
754+
return true;
755+
}
756+
757+
@Override
758+
public int getPriority() {
759+
return 5;
760+
}
761+
762+
@Override
763+
public String getPolicyName() {
764+
return "foo";
765+
}
766+
767+
@Override
768+
public LoadBalancer newLoadBalancer(Helper helper) {
769+
return new FakeLoadBalancer(helper);
770+
}
771+
}
772+
773+
static class FakeLoadBalancer extends LoadBalancer {
774+
775+
private Helper helper;
776+
777+
FakeLoadBalancer(Helper helper) {
778+
this.helper = helper;
779+
}
780+
781+
@Override
782+
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
783+
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.INTERNAL));
784+
}
785+
786+
@Override
787+
public void handleNameResolutionError(Status error) {
788+
}
789+
790+
@Override
791+
public void shutdown() {
792+
}
793+
}
717794
}

0 commit comments

Comments
 (0)