Skip to content

Commit 759de4d

Browse files
authored
xds: include nonce from the most recent response in new requests (#3420)
Even if the new request is for a new watch. None and version are reset on new ADS streams.
1 parent cfc5fec commit 759de4d

File tree

2 files changed

+41
-32
lines changed

2 files changed

+41
-32
lines changed

xds/internal/client/v2client.go

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,13 @@ type v2Client struct {
6767
// these are set to nil. All accesses to the map protected and any value
6868
// inside the map should be protected with the above mutex.
6969
watchMap map[string]*watchInfo
70-
// ackMap contains the version that was acked (the version in the ack
70+
// versionMap contains the version that was acked (the version in the ack
7171
// request that was sent on wire). The key is typeURL, the value is the
72-
// version string, becaues the versions for different resource types
73-
// should be independent.
74-
ackMap map[string]string
72+
// version string, becaues the versions for different resource types should
73+
// be independent.
74+
versionMap map[string]string
75+
// nonceMap contains the nonce from the most recent received response.
76+
nonceMap map[string]string
7577
// rdsCache maintains a mapping of {routeConfigName --> clusterName} from
7678
// validated route configurations received in RDS responses. We cache all
7779
// valid route configurations, whether or not we are interested in them
@@ -107,10 +109,11 @@ func newV2Client(cc *grpc.ClientConn, nodeProto *corepb.Node, backoff func(int)
107109
streamCh: make(chan adsStream, 1),
108110
sendCh: buffer.NewUnbounded(),
109111

110-
watchMap: make(map[string]*watchInfo),
111-
ackMap: make(map[string]string),
112-
rdsCache: make(map[string]string),
113-
cdsCache: make(map[string]CDSUpdate),
112+
watchMap: make(map[string]*watchInfo),
113+
versionMap: make(map[string]string),
114+
nonceMap: make(map[string]string),
115+
rdsCache: make(map[string]string),
116+
cdsCache: make(map[string]CDSUpdate),
114117
}
115118
v2c.ctx, v2c.cancelCtx = context.WithCancel(context.Background())
116119

@@ -179,7 +182,7 @@ func (v2c *v2Client) run() {
179182
// string
180183
// - If this is an ack, version will be the version from the response
181184
// - If this is a nack, version will be the previous acked version (from
182-
// ackMap). If there was no ack before, it will be an empty string
185+
// versionMap). If there was no ack before, it will be an empty string
183186
func (v2c *v2Client) sendRequest(stream adsStream, resourceNames []string, typeURL, version, nonce string) bool {
184187
req := &xdspb.DiscoveryRequest{
185188
Node: v2c.nodeProto,
@@ -208,7 +211,8 @@ func (v2c *v2Client) sendExisting(stream adsStream) bool {
208211
defer v2c.mu.Unlock()
209212

210213
// Reset the ack versions when the stream restarts.
211-
v2c.ackMap = make(map[string]string)
214+
v2c.versionMap = make(map[string]string)
215+
v2c.nonceMap = make(map[string]string)
212216

213217
for typeURL, wi := range v2c.watchMap {
214218
if !v2c.sendRequest(stream, wi.target, typeURL, "", "") {
@@ -239,10 +243,12 @@ func (v2c *v2Client) processWatchInfo(t *watchInfo) (target []string, typeURL, v
239243
v2c.checkCacheAndUpdateWatchMap(t)
240244
// TODO: if watch is called again with the same resource names,
241245
// there's no need to send another request.
242-
//
243-
// TODO: should we reset version (for ack) when a new watch is
244-
// started? Or do this only if the resource names are different
245-
// (so we send a new request)?
246+
247+
// We don't reset version or nonce when a new watch is started. The version
248+
// and nonce from previous response are carried by the request unless the
249+
// stream is recreated.
250+
version = v2c.versionMap[typeURL]
251+
nonce = v2c.nonceMap[typeURL]
246252
return
247253
}
248254

@@ -270,13 +276,14 @@ func (v2c *v2Client) processAckInfo(t *ackInfo) (target []string, typeURL, versi
270276
target = wi.target
271277
if version == "" {
272278
// This is a nack, get the previous acked version.
273-
version = v2c.ackMap[typeURL]
279+
version = v2c.versionMap[typeURL]
274280
// version will still be an empty string if typeURL isn't
275-
// found in ackMap, this can happen if there wasn't any ack
281+
// found in versionMap, this can happen if there wasn't any ack
276282
// before.
277283
} else {
278-
v2c.ackMap[typeURL] = version
284+
v2c.versionMap[typeURL] = version
279285
}
286+
v2c.nonceMap[typeURL] = nonce
280287
return
281288
}
282289

xds/internal/client/v2client_ack_test.go

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func sendXDSRespWithVersion(ch chan<- *fakeserver.Response, respWithoutVersion *
6161

6262
// startXDS calls watch to send the first request. It then sends a good response
6363
// and checks for ack.
64-
func startXDS(t *testing.T, xdsname string, v2c *v2Client, reqChan *testutils.Channel, req *xdspb.DiscoveryRequest) *testutils.Channel {
64+
func startXDS(t *testing.T, xdsname string, v2c *v2Client, reqChan *testutils.Channel, req *xdspb.DiscoveryRequest, preVersion string, preNonce string) *testutils.Channel {
6565
callbackCh := testutils.NewChannel()
6666
switch xdsname {
6767
case "LDS":
@@ -86,7 +86,7 @@ func startXDS(t *testing.T, xdsname string, v2c *v2Client, reqChan *testutils.Ch
8686
})
8787
}
8888

89-
if err := compareXDSRequest(reqChan, req, "", ""); err != nil {
89+
if err := compareXDSRequest(reqChan, req, preVersion, preNonce); err != nil {
9090
t.Fatalf("Failed to receive %s request: %v", xdsname, err)
9191
}
9292
t.Logf("FakeServer received %s request...", xdsname)
@@ -98,8 +98,8 @@ func startXDS(t *testing.T, xdsname string, v2c *v2Client, reqChan *testutils.Ch
9898
//
9999
// It also waits and checks that the ack request contains the given version, and
100100
// the generated nonce.
101-
func sendGoodResp(t *testing.T, xdsname string, fakeServer *fakeserver.Server, version int, goodResp *xdspb.DiscoveryResponse, wantReq *xdspb.DiscoveryRequest, callbackCh *testutils.Channel) {
102-
nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, goodResp, version)
101+
func sendGoodResp(t *testing.T, xdsname string, fakeServer *fakeserver.Server, version int, goodResp *xdspb.DiscoveryResponse, wantReq *xdspb.DiscoveryRequest, callbackCh *testutils.Channel) (nonce string) {
102+
nonce = sendXDSRespWithVersion(fakeServer.XDSResponseChan, goodResp, version)
103103
t.Logf("Good %s response pushed to fakeServer...", xdsname)
104104

105105
if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(version), nonce); err != nil {
@@ -111,6 +111,7 @@ func sendGoodResp(t *testing.T, xdsname string, fakeServer *fakeserver.Server, v
111111
t.Errorf("Timeout when expecting %s update", xdsname)
112112
}
113113
t.Logf("Good %s response callback executed", xdsname)
114+
return
114115
}
115116

116117
// sendBadResp sends a bad response with the given version. This response will
@@ -159,16 +160,16 @@ func (s) TestV2ClientAck(t *testing.T) {
159160
t.Log("Started xds v2Client...")
160161

161162
// Start the watch, send a good response, and check for ack.
162-
cbLDS := startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest)
163+
cbLDS := startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest, "", "")
163164
sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS)
164165
versionLDS++
165-
cbRDS := startXDS(t, "RDS", v2c, fakeServer.XDSRequestChan, goodRDSRequest)
166+
cbRDS := startXDS(t, "RDS", v2c, fakeServer.XDSRequestChan, goodRDSRequest, "", "")
166167
sendGoodResp(t, "RDS", fakeServer, versionRDS, goodRDSResponse1, goodRDSRequest, cbRDS)
167168
versionRDS++
168-
cbCDS := startXDS(t, "CDS", v2c, fakeServer.XDSRequestChan, goodCDSRequest)
169+
cbCDS := startXDS(t, "CDS", v2c, fakeServer.XDSRequestChan, goodCDSRequest, "", "")
169170
sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS)
170171
versionCDS++
171-
cbEDS := startXDS(t, "EDS", v2c, fakeServer.XDSRequestChan, goodEDSRequest)
172+
cbEDS := startXDS(t, "EDS", v2c, fakeServer.XDSRequestChan, goodEDSRequest, "", "")
172173
sendGoodResp(t, "EDS", fakeServer, versionEDS, goodEDSResponse1, goodEDSRequest, cbEDS)
173174
versionEDS++
174175

@@ -205,7 +206,7 @@ func (s) TestV2ClientAckFirstIsNack(t *testing.T) {
205206
t.Log("Started xds v2Client...")
206207

207208
// Start the watch, send a good response, and check for ack.
208-
cbLDS := startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest)
209+
cbLDS := startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest, "", "")
209210

210211
nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{
211212
Resources: []*anypb.Any{{}},
@@ -237,15 +238,16 @@ func (s) TestV2ClientAckNackAfterNewWatch(t *testing.T) {
237238
t.Log("Started xds v2Client...")
238239

239240
// Start the watch, send a good response, and check for ack.
240-
cbLDS := startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest)
241-
sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS)
242-
versionLDS++
241+
cbLDS := startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest, "", "")
242+
nonce := sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS)
243243

244-
// Start a new watch.
245-
cbLDS = startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest)
244+
// Start a new watch. The version in the new request should be the version
245+
// from the previous response, thus versionLDS before ++.
246+
cbLDS = startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest, strconv.Itoa(versionLDS), nonce)
247+
versionLDS++
246248

247249
// This is an invalid response after the new watch.
248-
nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{
250+
nonce = sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{
249251
Resources: []*anypb.Any{{}},
250252
TypeUrl: ldsURL,
251253
}, versionLDS)

0 commit comments

Comments
 (0)