diff --git a/internal/xds/clients/internal/buffer/unbounded.go b/internal/xds/clients/internal/buffer/unbounded.go index 3e6e99d0e8e0..52b8dab892bc 100644 --- a/internal/xds/clients/internal/buffer/unbounded.go +++ b/internal/xds/clients/internal/buffer/unbounded.go @@ -83,6 +83,7 @@ func (b *Unbounded) Load() { default: } } else if b.closing && !b.closed { + b.closed = true close(b.c) } } diff --git a/internal/xds/clients/xdsclient/ads_stream.go b/internal/xds/clients/xdsclient/ads_stream.go index 3ad62ac16db8..24e66b834716 100644 --- a/internal/xds/clients/xdsclient/ads_stream.go +++ b/internal/xds/clients/xdsclient/ads_stream.go @@ -28,7 +28,6 @@ import ( igrpclog "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/xds/clients" "google.golang.org/grpc/internal/xds/clients/internal/backoff" - "google.golang.org/grpc/internal/xds/clients/internal/buffer" "google.golang.org/grpc/internal/xds/clients/internal/pretty" "google.golang.org/grpc/internal/xds/clients/xdsclient/internal/xdsresource" @@ -48,6 +47,13 @@ const ( perRPCVerbosityLevel = 9 ) +// request represents a queued request message to be sent on the ADS stream. It +// contains the type of the resource and the list of resource names to be sent. +type request struct { + typ ResourceType + resourceNames []string +} + // response represents a response received on the ADS stream. It contains the // type URL, version, and resources for the response. type response struct { @@ -76,9 +82,7 @@ type adsStreamEventHandler interface { type resourceTypeState struct { version string // Last acked version. Should not be reset when the stream breaks. nonce string // Last received nonce. Should be reset when the stream breaks. - bufferedRequests chan struct{} // Channel to buffer requests when writing is blocked. subscribedResources map[string]*xdsresource.ResourceWatchState // Map of subscribed resource names to their state. - pendingWrite bool // True if there is a pending write for this resource type. } // adsStreamImpl provides the functionality associated with an ADS (Aggregated @@ -99,15 +103,16 @@ type adsStreamImpl struct { // The following fields are initialized in the constructor and are not // written to afterwards, and hence can be accessed without a mutex. streamCh chan clients.Stream // New ADS streams are pushed here. - requestCh *buffer.Unbounded // Subscriptions and unsubscriptions are pushed here. runnerDoneCh chan struct{} // Notify completion of runner goroutine. cancel context.CancelFunc // To cancel the context passed to the runner goroutine. fc *adsFlowControl // Flow control for ADS stream. + notifySender chan struct{} // To notify the sending goroutine of a pending request. // Guards access to the below fields (and to the contents of the map). mu sync.Mutex resourceTypeState map[ResourceType]*resourceTypeState // Map of resource types to their state. firstRequest bool // False after the first request is sent out. + pendingRequests []request // Subscriptions and unsubscriptions are pushed here. } // adsStreamOpts contains the options for creating a new ADS Stream. @@ -132,9 +137,9 @@ func newADSStreamImpl(opts adsStreamOpts) *adsStreamImpl { watchExpiryTimeout: opts.watchExpiryTimeout, streamCh: make(chan clients.Stream, 1), - requestCh: buffer.NewUnbounded(), runnerDoneCh: make(chan struct{}), fc: newADSFlowControl(), + notifySender: make(chan struct{}, 1), resourceTypeState: make(map[ResourceType]*resourceTypeState), } @@ -151,76 +156,79 @@ func newADSStreamImpl(opts adsStreamOpts) *adsStreamImpl { func (s *adsStreamImpl) Stop() { s.cancel() s.fc.stop() - s.requestCh.Close() <-s.runnerDoneCh s.logger.Infof("Shutdown ADS stream") } // subscribe subscribes to the given resource. It is assumed that multiple // subscriptions for the same resource is deduped at the caller. A discovery -// request is sent out on the underlying stream for the resource type when there -// is sufficient flow control quota. +// request is sent out on the underlying stream, for the resource type with the +// newly subscribed resource. func (s *adsStreamImpl) subscribe(typ ResourceType, name string) { if s.logger.V(2) { s.logger.Infof("Subscribing to resource %q of type %q", name, typ.TypeName) } s.mu.Lock() - defer s.mu.Unlock() - state, ok := s.resourceTypeState[typ] if !ok { // An entry in the type state map is created as part of the first // subscription request for this type. - state = &resourceTypeState{ - subscribedResources: make(map[string]*xdsresource.ResourceWatchState), - bufferedRequests: make(chan struct{}, 1), - } + state = &resourceTypeState{subscribedResources: make(map[string]*xdsresource.ResourceWatchState)} s.resourceTypeState[typ] = state } // Create state for the newly subscribed resource. The watch timer will // be started when a request for this resource is actually sent out. state.subscribedResources[name] = &xdsresource.ResourceWatchState{State: xdsresource.ResourceWatchStateStarted} - state.pendingWrite = true // Send a request for the resource type with updated subscriptions. - s.requestCh.Put(typ) + s.pendingRequests = append(s.pendingRequests, request{typ: typ, resourceNames: resourceNames(state.subscribedResources)}) + s.mu.Unlock() + + select { + case s.notifySender <- struct{}{}: + default: + } } -// Unsubscribe cancels the subscription to the given resource. It is a no-op if +// unsubscribe cancels the subscription to the given resource. It is a no-op if // the given resource does not exist. The watch expiry timer associated with the // resource is stopped if one is active. A discovery request is sent out on the -// stream for the resource type when there is sufficient flow control quota. -func (s *adsStreamImpl) Unsubscribe(typ ResourceType, name string) { +// stream for the resource type with the updated set of resource names. +func (s *adsStreamImpl) unsubscribe(typ ResourceType, name string) { if s.logger.V(2) { s.logger.Infof("Unsubscribing to resource %q of type %q", name, typ.TypeName) } s.mu.Lock() - defer s.mu.Unlock() - state, ok := s.resourceTypeState[typ] if !ok { + s.mu.Unlock() return } - rs, ok := state.subscribedResources[name] if !ok { + s.mu.Unlock() return } if rs.ExpiryTimer != nil { rs.ExpiryTimer.Stop() } delete(state.subscribedResources, name) - state.pendingWrite = true // Send a request for the resource type with updated subscriptions. - s.requestCh.Put(typ) + s.pendingRequests = append(s.pendingRequests, request{typ: typ, resourceNames: resourceNames(state.subscribedResources)}) + s.mu.Unlock() + + select { + case s.notifySender <- struct{}{}: + default: + } } // runner is a long-running goroutine that handles the lifecycle of the ADS -// stream. It spwans another goroutine to handle writes of discovery request +// stream. It spawns another goroutine to handle writes of discovery request // messages on the stream. Whenever an existing stream fails, it performs // exponential backoff (if no messages were received on that stream) before // creating a new stream. @@ -280,53 +288,44 @@ func (s *adsStreamImpl) send(ctx context.Context) { stream = nil continue } - case req, ok := <-s.requestCh.Get(): - if !ok { - return + case <-s.notifySender: + // If there's no stream yet, skip the request. This request will be resent + // when a new stream is created. If no stream is created, the watcher will + // timeout (same as server not sending response back). + if stream == nil { + continue } - s.requestCh.Load() - typ := req.(ResourceType) - if err := s.sendNew(stream, typ); err != nil { + // Resetting the pendingRequests slice to nil works for both cases: + // - When we successfully sends the requests out on the wire. + // - When sending fails. This can happen only when the stream fails, + // and in this case, we rely on the `sendExisting` to send out + // requests for all subscriptions when the stream is recreated. + s.mu.Lock() + if err := s.sendNewLocked(stream, s.pendingRequests); err != nil { stream = nil - continue } + s.pendingRequests = nil + s.mu.Unlock() } } } -// sendNew attempts to send a discovery request based on a new subscription or -// unsubscription. If there is no flow control quota, the request is buffered -// and will be sent later. This method also starts the watch expiry timer for -// resources that were sent in the request for the first time, i.e. their watch -// state is `watchStateStarted`. -func (s *adsStreamImpl) sendNew(stream clients.Stream, typ ResourceType) error { - s.mu.Lock() - defer s.mu.Unlock() - - // If there's no stream yet, skip the request. This request will be resent - // when a new stream is created. If no stream is created, the watcher will - // timeout (same as server not sending response back). - if stream == nil { - return nil - } - - // If local processing of the most recently received response is not yet - // complete, i.e. fc.pending == true, queue this write and return early. - // This allows us to batch writes for requests which are generated as part - // of local processing of a received response. - state := s.resourceTypeState[typ] - bufferRequest := func() { - select { - case state.bufferedRequests <- struct{}{}: - default: +// sendNewLocked attempts to send a discovery request based on a new subscription or +// unsubscription. This method also starts the watch expiry timer for resources +// that were sent in the request for the first time, i.e. their watch state is +// `watchStateStarted`. +// +// Caller needs to hold c.mu. +func (s *adsStreamImpl) sendNewLocked(stream clients.Stream, requests []request) error { + for _, req := range requests { + state := s.resourceTypeState[req.typ] + if err := s.sendMessageLocked(stream, req.resourceNames, req.typ.TypeURL, state.version, state.nonce, nil); err != nil { + return err } + s.startWatchTimersLocked(req.typ, req.resourceNames) } - if s.fc.runIfPending(bufferRequest) { - return nil - } - - return s.sendMessageIfWritePendingLocked(stream, typ, state) + return nil } // sendExisting sends out discovery requests for existing resources when @@ -337,6 +336,10 @@ func (s *adsStreamImpl) sendExisting(stream clients.Stream) error { s.mu.Lock() defer s.mu.Unlock() + // Clear any queued requests. Previously subscribed to resources will be + // resent below. + s.pendingRequests = nil + for typ, state := range s.resourceTypeState { // Reset only the nonces map when the stream restarts. // @@ -355,69 +358,15 @@ func (s *adsStreamImpl) sendExisting(stream clients.Stream) error { continue } - state.pendingWrite = true - if err := s.sendMessageIfWritePendingLocked(stream, typ, state); err != nil { + names := resourceNames(state.subscribedResources) + if err := s.sendMessageLocked(stream, names, typ.TypeURL, state.version, state.nonce, nil); err != nil { return err } + s.startWatchTimersLocked(typ, names) } return nil } -// sendBuffered sends out discovery requests for resources that were buffered -// when they were subscribed to, because local processing of the previously -// received response was not yet complete. -// -// The stream argument is guaranteed to be non-nil. -func (s *adsStreamImpl) sendBuffered(stream clients.Stream) error { - s.mu.Lock() - defer s.mu.Unlock() - - for typ, state := range s.resourceTypeState { - select { - case <-state.bufferedRequests: - if err := s.sendMessageIfWritePendingLocked(stream, typ, state); err != nil { - return err - } - default: - // No buffered request. - continue - } - } - return nil -} - -// sendMessageIfWritePendingLocked attempts to sends a discovery request to the -// server, if there is a pending write for the given resource type. -// -// If the request is successfully sent, the pending write field is cleared and -// watch timers are started for the resources in the request. -// -// Caller needs to hold c.mu. -func (s *adsStreamImpl) sendMessageIfWritePendingLocked(stream clients.Stream, typ ResourceType, state *resourceTypeState) error { - if !state.pendingWrite { - if s.logger.V(2) { - s.logger.Infof("Skipping sending request for type %q, because all subscribed resources were already sent", typ.TypeURL) - } - return nil - } - - names := resourceNames(state.subscribedResources) - if err := s.sendMessageLocked(stream, names, typ.TypeURL, state.version, state.nonce, nil); err != nil { - return err - } - state.pendingWrite = false - - // Drain the buffered requests channel because we just sent a request for this - // resource type. - select { - case <-state.bufferedRequests: - default: - } - - s.startWatchTimersLocked(typ, names) - return nil -} - // sendMessageLocked sends a discovery request to the server, populating the // different fields of the message with the given parameters. Returns a non-nil // error if the request could not be sent. @@ -467,11 +416,9 @@ func (s *adsStreamImpl) sendMessageLocked(stream clients.Stream, names []string, // recv is responsible for receiving messages from the ADS stream. // // It performs the following actions: -// - Waits for local flow control to be available before sending buffered -// requests, if any. -// - Receives a message from the ADS stream. If an error is encountered here, -// it is handled by the onError method which propagates the error to all -// watchers. +// - Waits for local flow control to be available before it receives a message +// from the ADS stream. If an error is encountered here, it is handled by +// the onError method which propagates the error to all watchers. // - Invokes the event handler's OnADSResponse method to process the message. // - Sends an ACK or NACK to the server based on the response. // @@ -488,10 +435,6 @@ func (s *adsStreamImpl) recv(stream clients.Stream) bool { return msgReceived } - // Send out a request if anything was buffered while we were waiting for - // local processing of the previous response to complete. - s.sendBuffered(stream) - resources, url, version, nonce, err := s.recvMessage(stream) if err != nil { s.onError(err, msgReceived) @@ -760,23 +703,6 @@ func (fc *adsFlowControl) setPending(pending bool) { } } -func (fc *adsFlowControl) runIfPending(f func()) bool { - fc.mu.Lock() - defer fc.mu.Unlock() - - if fc.stopped { - return false - } - - // If there's a pending update, run the function while still holding the - // lock. This ensures that the pending state does not change between the - // check and the function call. - if fc.pending { - f() - } - return fc.pending -} - // wait blocks until all the watchers have consumed the most recent update. // Returns true if the flow control was stopped while waiting, false otherwise. func (fc *adsFlowControl) wait() bool { diff --git a/internal/xds/clients/xdsclient/channel.go b/internal/xds/clients/xdsclient/channel.go index 7c40f1dab8f8..9da5eb361865 100644 --- a/internal/xds/clients/xdsclient/channel.go +++ b/internal/xds/clients/xdsclient/channel.go @@ -163,7 +163,7 @@ func (xc *xdsChannel) unsubscribe(typ ResourceType, name string) { } return } - xc.ads.Unsubscribe(typ, name) + xc.ads.unsubscribe(typ, name) } // The following onADSXxx() methods implement the StreamEventHandler interface