Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion balancer/pickfirst/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,13 @@ func (b *pickfirstBalancer) ExitIdle() {
b.mu.Lock()
defer b.mu.Unlock()
if b.state == connectivity.Idle {
// Move the balancer into CONNECTING state immediately. This is done to
// avoid staying in IDLE if a resolver update arrives before the first
// SubConn reports CONNECTING.
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
b.startFirstPassLocked()
}
}
Expand Down Expand Up @@ -604,7 +611,7 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
if !b.addressList.seekTo(sd.addr) {
// This should not fail as we should have only one SubConn after
// entering READY. The SubConn should be present in the addressList.
b.logger.Errorf("Address %q not found address list in %v", sd.addr, b.addressList.addresses)
b.logger.Errorf("Address %q not found address list in %v", sd.addr, b.addressList.addresses)
return
}
if !b.healthCheckingEnabled {
Expand Down
96 changes: 96 additions & 0 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1504,6 +1504,102 @@ func (s) TestPickFirstLeaf_AddressUpdateWithMetadata(t *testing.T) {
}
}

// Tests the scenario where a connection is established and then breaks, leading
// to a reconnection attempt. While the reconnection is in progress, a resolver
// update with a new address is received. The test verifies that the balancer
// creates a new SubConn for the new address and that the ClientConn eventually
// becomes READY.
func (s) TestPickFirstLeaf_Reconnection(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to have this as an e2e style test. We wouldn't be able to use the blocking context dialer testutil that we have, since we can only control the connection after we start connecting. So, by that time, the subchannel would have already moved to Connecting. I spent a couple of minutes on it, but couldn't think of a way to be able to do this in a e2e style. But if you have an idea that can work without much work, that would be great. But I wouldn't block this PR for that test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, by that time, the subchannel would have already moved to Connecting.

I thought of using a real channel, but couldn't figure out a simple way to delay/drop the Connecting update, so decided to use the fake channel.

One solution I thought of was to wrap the SubConn state listener in a parent LB to intercept and drop connectivity updates. That was making the test more complex than using a fake channel.

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewBalancerClientConn(t)
bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{})
defer bal.Close()
ccState := balancer.ClientConnState{
ResolverState: resolver.State{
Endpoints: []resolver.Endpoint{
{Addresses: []resolver.Address{{Addr: "1.1.1.1:1"}}},
},
},
}
if err := bal.UpdateClientConnState(ccState); err != nil {
t.Fatalf("UpdateClientConnState(%v) returned error: %v", ccState, err)
}

select {
case state := <-cc.NewStateCh:
if got, want := state, connectivity.Connecting; got != want {
t.Fatalf("Received unexpected ClientConn sate: got %v, want %v", got, want)
}
case <-ctx.Done():
t.Fatal("Context timed out waiting for ClientConn state update.")
}

sc1 := <-cc.NewSubConnCh
select {
case <-sc1.ConnectCh:
case <-ctx.Done():
t.Fatal("Context timed out waiting for Connect() to be called on sc1.")
}
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})

if err := cc.WaitForConnectivityState(ctx, connectivity.Ready); err != nil {
t.Fatalf("Context timed out waiting for ClientConn to become READY.")
}

// Simulate a connection breakage, this should result the channel
// transitioning to IDLE.
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})
if err := cc.WaitForConnectivityState(ctx, connectivity.Idle); err != nil {
t.Fatalf("Context timed out waiting for ClientConn to enter IDLE.")
}

// Calling the idle picker should result in the SubConn being re-connected.
picker := <-cc.NewPickerCh
if _, err := picker.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable {
t.Fatalf("picker.Pick() returned error: %v, want %v", err, balancer.ErrNoSubConnAvailable)
}

select {
case <-sc1.ConnectCh:
case <-ctx.Done():
t.Fatal("Context timed out waiting for Connect() to be called on sc1.")
}

// Send a resolver update, removing the existing SubConn. Since the balancer
// is connecting, it should create a new SubConn for the new backend
// address.
ccState = balancer.ClientConnState{
ResolverState: resolver.State{
Endpoints: []resolver.Endpoint{
{Addresses: []resolver.Address{{Addr: "2.2.2.2:2"}}},
},
},
}
if err := bal.UpdateClientConnState(ccState); err != nil {
t.Fatalf("UpdateClientConnState(%v) returned error: %v", ccState, err)
}

var sc2 *testutils.TestSubConn
select {
case sc2 = <-cc.NewSubConnCh:
case <-ctx.Done():
t.Fatal("Context timed out waiting for new SubConn to be created.")
}

select {
case <-sc2.ConnectCh:
case <-ctx.Done():
t.Fatal("Context timed out waiting for Connect() to be called on sc2.")
}
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
if err := cc.WaitForConnectivityState(ctx, connectivity.Ready); err != nil {
t.Fatalf("Context timed out waiting for ClientConn to become READY.")
}
}

// healthListenerCapturingCCWrapper is used to capture the health listener so
// that health updates can be mocked for testing.
type healthListenerCapturingCCWrapper struct {
Expand Down