diff --git a/internal/grpctest/tlogger.go b/internal/grpctest/tlogger.go index 492381758762..746aa9b089e2 100644 --- a/internal/grpctest/tlogger.go +++ b/internal/grpctest/tlogger.go @@ -66,10 +66,10 @@ type tLogger struct { v int initialized bool - mu sync.Mutex // guards t, start, and errors - t *testing.T - start time.Time - errors map[*regexp.Regexp]int + mu sync.Mutex + t *testing.T + start time.Time + logs map[logType]map[*regexp.Regexp]int } func init() { @@ -87,7 +87,11 @@ func init() { } } // Initialize tLogr with the determined verbosity level. - tLogr = &tLogger{errors: make(map[*regexp.Regexp]int), v: vLevel} + logsMap := map[logType]map[*regexp.Regexp]int{ + errorLog: {}, + warningLog: {}, + } + tLogr = &tLogger{logs: logsMap, v: vLevel} } // getCallingPrefix returns the at the given depth from the stack. @@ -115,11 +119,14 @@ func (tl *tLogger) log(ltype logType, depth int, format string, args ...any) { switch ltype { case errorLog: // fmt.Sprintln is used rather than fmt.Sprint because tl.Log uses fmt.Sprintln behavior. - if tl.expected(fmt.Sprintln(args...)) { + if tl.expected(fmt.Sprintln(args...), errorLog) { tl.t.Log(args...) } else { tl.t.Error(args...) } + case warningLog: + tl.expected(fmt.Sprintln(args...), warningLog) + tl.t.Log(args...) case fatalLog: panic(fmt.Sprint(args...)) default: @@ -130,11 +137,14 @@ func (tl *tLogger) log(ltype logType, depth int, format string, args ...any) { format = "%v " + format + "%s" switch ltype { case errorLog: - if tl.expected(fmt.Sprintf(format, args...)) { + if tl.expected(fmt.Sprintf(format, args...), errorLog) { tl.t.Logf(format, args...) } else { tl.t.Errorf(format, args...) } + case warningLog: + tl.expected(fmt.Sprintln(args...), warningLog) + tl.t.Log(args...) case fatalLog: panic(fmt.Sprintf(format, args...)) default: @@ -154,7 +164,8 @@ func (tl *tLogger) update(t *testing.T) { } tl.t = t tl.start = time.Now() - tl.errors = map[*regexp.Regexp]int{} + tl.logs[errorLog] = map[*regexp.Regexp]int{} + tl.logs[warningLog] = map[*regexp.Regexp]int{} } // ExpectError declares an error to be expected. For the next test, the first @@ -163,11 +174,20 @@ func (tl *tLogger) update(t *testing.T) { // Update(). Note that if an expected error is not encountered, this will cause // the test to fail. func ExpectError(expr string) { - ExpectErrorN(expr, 1) + expectLogsN(expr, 1, errorLog) } // ExpectErrorN declares an error to be expected n times. func ExpectErrorN(expr string, n int) { + expectLogsN(expr, n, errorLog) +} + +// ExpectWarning declares a warning to be expected. +func ExpectWarning(expr string) { + expectLogsN(expr, 1, warningLog) +} + +func expectLogsN(expr string, n int, logType logType) { tLogr.mu.Lock() defer tLogr.mu.Unlock() re, err := regexp.Compile(expr) @@ -175,28 +195,35 @@ func ExpectErrorN(expr string, n int) { tLogr.t.Error(err) return } - tLogr.errors[re] += n + tLogr.logs[logType][re] += n } // endTest checks if expected errors were not encountered. func (tl *tLogger) endTest(t *testing.T) { tl.mu.Lock() defer tl.mu.Unlock() - for re, count := range tl.errors { + for re, count := range tl.logs[errorLog] { if count > 0 { t.Errorf("Expected error '%v' not encountered", re.String()) } } - tl.errors = map[*regexp.Regexp]int{} + for re, count := range tl.logs[warningLog] { + if count > 0 { + t.Errorf("Expected warning '%v' not encountered", re.String()) + } + } + tl.logs[errorLog] = map[*regexp.Regexp]int{} + tl.logs[warningLog] = map[*regexp.Regexp]int{} } -// expected determines if the error string is protected or not. -func (tl *tLogger) expected(s string) bool { - for re, count := range tl.errors { +// expected determines if the log string of the particular type is protected or +// not. +func (tl *tLogger) expected(s string, logType logType) bool { + for re, count := range tl.logs[logType] { if re.FindStringIndex(s) != nil { - tl.errors[re]-- + tl.logs[logType][re]-- if count <= 1 { - delete(tl.errors, re) + delete(tl.logs[logType], re) } return true } diff --git a/internal/xds/xdsclient/xdsresource/xdsconfig.go b/internal/xds/xdsclient/xdsresource/xdsconfig.go new file mode 100644 index 000000000000..345c83fbba5d --- /dev/null +++ b/internal/xds/xdsclient/xdsresource/xdsconfig.go @@ -0,0 +1,113 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xdsresource + +import "google.golang.org/grpc/resolver" + +// XDSConfig holds the complete gRPC client-side xDS configuration containing +// all necessary resources. +type XDSConfig struct { + // Listener holds the listener configuration. It is guaranteed to be + // non-nil. + Listener *ListenerUpdate + + // RouteConfig holds the route configuration. It will be populated even if + // the route configuration was inlined into the Listener resource. It is + // guaranteed to be non-nil. + RouteConfig *RouteConfigUpdate + + // VirtualHost is selected from the route configuration whose domain field + // offers the best match against the provided dataplane authority. It is + // guaranteed to be non-nil. + VirtualHost *VirtualHost + + // Clusters is a map from cluster name to its configuration. + Clusters map[string]*ClusterResult +} + +// ClusterResult contains a cluster's configuration when a valid resource is +// received from the management server. It contains an error when: +// - an invalid resource is received from the management server and +// a valid resource was not already present or +// - the cluster resource does not exist on the management server +type ClusterResult struct { + Config ClusterConfig + Err error +} + +// ClusterConfig contains configuration for a single cluster. +type ClusterConfig struct { + // Cluster configuration for the cluster. This field is always set to a + // non-nil value. + Cluster *ClusterUpdate + // EndpointConfig contains endpoint configuration for a leaf cluster. This + // field is only set for EDS and LOGICAL_DNS clusters. + EndpointConfig *EndpointConfig + // AggregateConfig contains configuration for an aggregate cluster. This + // field is only set for AGGREGATE clusters. + AggregateConfig *AggregateConfig +} + +// AggregateConfig holds the configuration for an aggregate cluster. +type AggregateConfig struct { + // LeafClusters contains a prioritized list of names of the leaf clusters + // for the cluster. + LeafClusters []string +} + +// EndpointConfig contains configuration corresponding to the endpoints in a +// cluster. Only one of EDSUpdate or DNSEndpoints will be populated based on the +// cluster type. +type EndpointConfig struct { + // Endpoint configurartion for the EDS clusters. + EDSUpdate *EndpointsUpdate + // Endpoint configuration for the LOGICAL_DNS clusters. + DNSEndpoints *DNSUpdate + // ResolutionNote stores error encountered while obtaining endpoints data for the cluster. It may contain a nil value when a valid endpoint datais received. It contains an error when: + // - an invalid resource is received from the management server or + // - the endpoint resource does not exist on the management server + ResolutionNote error +} + +// DNSUpdate represents the result of a DNS resolution, containing a list of +// discovered endpoints. This is only populated for the LOGICAL_DNS clusters. +type DNSUpdate struct { + // Endpoints is the complete list of endpoints returned by the DNS resolver. + Endpoints []resolver.Endpoint +} + +// xdsConfigkey is the type used as the key to store XDSConfig in the Attributes +// field of resolver.State. +type xdsConfigkey struct{} + +// SetXDSConfig returns a copy of state in which the Attributes field is updated +// with the XDSConfig. +func SetXDSConfig(state resolver.State, config *XDSConfig) resolver.State { + state.Attributes = state.Attributes.WithValue(xdsConfigkey{}, config) + return state +} + +// XDSConfigFromResolverState returns XDSConfig stored as an attribute in the +// resolver state. +func XDSConfigFromResolverState(state resolver.State) *XDSConfig { + state.Attributes.Value(xdsConfigkey{}) + if v := state.Attributes.Value(xdsConfigkey{}); v != nil { + return v.(*XDSConfig) + } + return nil +} diff --git a/internal/xds/xdsdepmgr/watch_service.go b/internal/xds/xdsdepmgr/watch_service.go new file mode 100644 index 000000000000..bc8ad36ee2d3 --- /dev/null +++ b/internal/xds/xdsdepmgr/watch_service.go @@ -0,0 +1,115 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsdepmgr + +import ( + "sync/atomic" + + "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" +) + +type listenerWatcher struct { + resourceName string + cancel func() + depMgr *DependencyManager + stopped atomic.Bool +} + +func newListenerWatcher(resourceName string, depMgr *DependencyManager) *listenerWatcher { + lw := &listenerWatcher{resourceName: resourceName, depMgr: depMgr} + lw.cancel = xdsresource.WatchListener(depMgr.xdsClient, resourceName, lw) + return lw +} + +func (l *listenerWatcher) ResourceChanged(update *xdsresource.ListenerUpdate, onDone func()) { + if l.stopped.Load() { + onDone() + return + } + l.depMgr.onListenerResourceUpdate(update, onDone) +} + +func (l *listenerWatcher) ResourceError(err error, onDone func()) { + if l.stopped.Load() { + onDone() + return + } + l.depMgr.onListenerResourceError(err, onDone) +} + +func (l *listenerWatcher) AmbientError(err error, onDone func()) { + if l.stopped.Load() { + onDone() + return + } + l.depMgr.onListenerResourceAmbientError(err, onDone) +} + +func (l *listenerWatcher) stop() { + l.stopped.Store(true) + l.cancel() + if l.depMgr.logger.V(2) { + l.depMgr.logger.Infof("Canceling watch on Listener resource %q", l.resourceName) + } +} + +type routeConfigWatcher struct { + resourceName string + cancel func() + depMgr *DependencyManager + stopped atomic.Bool +} + +func newRouteConfigWatcher(resourceName string, depMgr *DependencyManager) *routeConfigWatcher { + rw := &routeConfigWatcher{resourceName: resourceName, depMgr: depMgr} + rw.cancel = xdsresource.WatchRouteConfig(depMgr.xdsClient, resourceName, rw) + return rw +} + +func (r *routeConfigWatcher) ResourceChanged(u *xdsresource.RouteConfigUpdate, onDone func()) { + if r.stopped.Load() { + onDone() + return + } + r.depMgr.onRouteConfigResourceUpdate(r.resourceName, u, onDone) +} + +func (r *routeConfigWatcher) ResourceError(err error, onDone func()) { + if r.stopped.Load() { + onDone() + return + } + r.depMgr.onRouteConfigResourceError(r.resourceName, err, onDone) +} + +func (r *routeConfigWatcher) AmbientError(err error, onDone func()) { + if r.stopped.Load() { + onDone() + return + } + r.depMgr.onRouteConfigResourceAmbientError(r.resourceName, err, onDone) +} + +func (r *routeConfigWatcher) stop() { + r.stopped.Store(true) + r.cancel() + if r.depMgr.logger.V(2) { + r.depMgr.logger.Infof("Canceling watch on RouteConfiguration resource %q", r.resourceName) + } +} diff --git a/internal/xds/xdsdepmgr/xds_dependency_manager.go b/internal/xds/xdsdepmgr/xds_dependency_manager.go new file mode 100644 index 000000000000..69830de7d1a3 --- /dev/null +++ b/internal/xds/xdsdepmgr/xds_dependency_manager.go @@ -0,0 +1,251 @@ +/* + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package xdsdepmgr provides the implementation of the xDS dependency manager +// that manages all the xDS watches and resources as described in gRFC A74. +package xdsdepmgr + +import ( + "context" + "fmt" + + "google.golang.org/grpc/grpclog" + internalgrpclog "google.golang.org/grpc/internal/grpclog" + "google.golang.org/grpc/internal/grpcsync" + "google.golang.org/grpc/internal/xds/xdsclient" + "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" +) + +const prefix = "[xdsdepmgr %p] " + +var logger = grpclog.Component("xds") + +func prefixLogger(p *DependencyManager) *internalgrpclog.PrefixLogger { + return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p)) +} + +// ConfigWatcher is the interface for consumers of aggregated xDS configuration +// from the DependencyManager. The only consumer of this configuration is +// currently the xDS resolver. +type ConfigWatcher interface { + // Update is invoked when a new, validated xDS configuration is available. + // + // Implementations must treat the received config as read-only and should + // not modify it. + Update(*xdsresource.XDSConfig) + + // Error is invoked when an error is received from the listener or route + // resource watcher. This includes cases where: + // - The listener or route resource watcher reports a resource error. + // - The received listener resource is a socket listener, not an API + // listener. TODO(i/8114): Implement this check. + // - The received route configuration does not contain a virtual host + // matching the channel's default authority. + Error(error) +} + +// DependencyManager registers watches on the xDS client for all required xDS +// resources, resolves dependencies between them, and returns a complete +// configuration to the xDS resolver. +type DependencyManager struct { + // The following fields are initialized at creation time and are read-only + // after that. + logger *internalgrpclog.PrefixLogger + watcher ConfigWatcher + xdsClient xdsclient.XDSClient + ldsResourceName string + dataplaneAuthority string + nodeID string + + // Serializes watcher cancellations, resource updates, and error callbacks. + // While the xDS client guarantees that update and error callbacks are + // processed sequentially, it does not serialize watch cancellations. This + // serializer ensures all three event types run in sequence, preventing + // cancellations from running concurrently with other callbacks. + serializer *grpcsync.CallbackSerializer + serializerCancel context.CancelFunc + + // The following fields are only accessed from within the serializer. + listenerWatcher *listenerWatcher + currentListenerUpdate *xdsresource.ListenerUpdate + routeConfigWatcher *routeConfigWatcher + rdsResourceName string + currentRouteConfig *xdsresource.RouteConfigUpdate + currentVirtualHost *xdsresource.VirtualHost +} + +// New creates a new DependencyManager. +// +// - listenerName is the name of the Listener resource to request from the +// management server. +// - dataplaneAuthority is used to select the best matching virtual host from +// the route configuration received from the management server. +// - xdsClient is the xDS client to use to register resource watches. +// - watcher is the ConfigWatcher interface that will receive the aggregated +// XDSConfig updates and errors. +func New(listenerName, dataplaneAuthority string, xdsClient xdsclient.XDSClient, watcher ConfigWatcher) *DependencyManager { + dm := &DependencyManager{ + ldsResourceName: listenerName, + dataplaneAuthority: dataplaneAuthority, + xdsClient: xdsClient, + watcher: watcher, + nodeID: xdsClient.BootstrapConfig().Node().GetId(), + } + dm.logger = prefixLogger(dm) + ctx, cancel := context.WithCancel(context.Background()) + dm.serializer = grpcsync.NewCallbackSerializer(ctx) + dm.serializerCancel = cancel + + // Start the listener watch. Listener watch will start the other resource + // watches as needed. + dm.listenerWatcher = newListenerWatcher(listenerName, dm) + return dm +} + +// Close cancels all registered resource watches. +func (m *DependencyManager) Close() { + m.serializerCancel() + <-m.serializer.Done() + + if m.listenerWatcher != nil { + m.listenerWatcher.stop() + } + if m.routeConfigWatcher != nil { + m.routeConfigWatcher.stop() + } +} + +// annotateErrorWithNodeID annotates the given error with the provided xDS node +// ID. +func (m *DependencyManager) annotateErrorWithNodeID(err error) error { + return fmt.Errorf("[xDS node id: %v]: %w", m.nodeID, err) +} + +// maybeSendUpdate checks that all the resources have been received and sends +// the current aggregated xDS configuration to the watcher if all the updates +// are available. +func (m *DependencyManager) maybeSendUpdate() { + m.watcher.Update(&xdsresource.XDSConfig{ + Listener: m.currentListenerUpdate, + RouteConfig: m.currentRouteConfig, + VirtualHost: m.currentVirtualHost, + }) +} + +func (m *DependencyManager) applyRouteConfigUpdate(update *xdsresource.RouteConfigUpdate) { + matchVH := xdsresource.FindBestMatchingVirtualHost(m.dataplaneAuthority, update.VirtualHosts) + if matchVH == nil { + m.watcher.Error(m.annotateErrorWithNodeID(fmt.Errorf("could not find VirtualHost for %q", m.dataplaneAuthority))) + return + } + m.currentRouteConfig = update + m.currentVirtualHost = matchVH + m.maybeSendUpdate() +} + +func (m *DependencyManager) onListenerResourceUpdate(update *xdsresource.ListenerUpdate, onDone func()) { + handleUpdate := func(context.Context) { + defer onDone() + if m.logger.V(2) { + m.logger.Infof("Received update for Listener resource %q: %+v", m.ldsResourceName, update) + } + + m.currentListenerUpdate = update + + if update.InlineRouteConfig != nil { + // If there was a previous route config watcher because of a non-inline + // route configuration, cancel it. + m.rdsResourceName = "" + if m.routeConfigWatcher != nil { + m.routeConfigWatcher.stop() + m.routeConfigWatcher = nil + } + m.applyRouteConfigUpdate(update.InlineRouteConfig) + return + } + + // We get here only if there was no inline route configuration. If the route + // config name has not changed, send an update with existing route + // configuration and the newly received listener configuration. + if m.rdsResourceName == update.RouteConfigName { + m.maybeSendUpdate() + return + } + + // If the route config name has changed, cancel the old watcher and start a + // new one. At this point, since the new route config name has not yet been + // resolved, no update is sent to the channel, and therefore the old route + // configuration (if received) is used until the new one is received. + m.rdsResourceName = update.RouteConfigName + if m.routeConfigWatcher != nil { + m.routeConfigWatcher.stop() + m.currentVirtualHost = nil + } + m.routeConfigWatcher = newRouteConfigWatcher(m.rdsResourceName, m) + } + m.serializer.ScheduleOr(handleUpdate, onDone) +} + +func (m *DependencyManager) onListenerResourceError(err error, onDone func()) { + handleError := func(context.Context) { + m.logger.Warningf("Received resource error for Listener resource %q: %v", m.ldsResourceName, m.annotateErrorWithNodeID(err)) + + if m.routeConfigWatcher != nil { + m.routeConfigWatcher.stop() + } + m.rdsResourceName = "" + m.currentVirtualHost = nil + m.routeConfigWatcher = nil + m.watcher.Error(fmt.Errorf("listener resource error: %v", m.annotateErrorWithNodeID(err))) + onDone() + } + m.serializer.ScheduleOr(handleError, onDone) +} + +func (m *DependencyManager) onListenerResourceAmbientError(err error, onDone func()) { + m.serializer.ScheduleOr(func(context.Context) { + m.logger.Warningf("Listener resource ambient error: %v", m.annotateErrorWithNodeID(err)) + onDone() + }, onDone) +} + +func (m *DependencyManager) onRouteConfigResourceUpdate(resourceName string, update *xdsresource.RouteConfigUpdate, onDone func()) { + handleUpdate := func(context.Context) { + if m.logger.V(2) { + m.logger.Infof("Received update for RouteConfiguration resource %q: %+v", resourceName, update) + } + m.applyRouteConfigUpdate(update) + onDone() + } + m.serializer.ScheduleOr(handleUpdate, onDone) +} + +func (m *DependencyManager) onRouteConfigResourceError(resourceName string, err error, onDone func()) { + handleError := func(context.Context) { + m.logger.Warningf("Received resource error for RouteConfiguration resource %q: %v", resourceName, m.annotateErrorWithNodeID(err)) + m.watcher.Error(fmt.Errorf("route resource error: %v", m.annotateErrorWithNodeID(err))) + onDone() + } + m.serializer.ScheduleOr(handleError, onDone) +} + +func (m *DependencyManager) onRouteConfigResourceAmbientError(resourceName string, err error, onDone func()) { + handleError := func(context.Context) { + m.logger.Warningf("Route resource ambient error %q: %v", resourceName, m.annotateErrorWithNodeID(err)) + onDone() + } + m.serializer.ScheduleOr(handleError, onDone) +} diff --git a/internal/xds/xdsdepmgr/xds_dependency_manager_test.go b/internal/xds/xdsdepmgr/xds_dependency_manager_test.go new file mode 100644 index 000000000000..3e63333cf391 --- /dev/null +++ b/internal/xds/xdsdepmgr/xds_dependency_manager_test.go @@ -0,0 +1,808 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsdepmgr_test + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/envoyproxy/go-control-plane/pkg/wellknown" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/google/uuid" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/xds/bootstrap" + "google.golang.org/grpc/internal/xds/xdsclient" + "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" + "google.golang.org/grpc/internal/xds/xdsdepmgr" + + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + v3routerpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3" + v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" + + _ "google.golang.org/grpc/internal/xds/httpfilter/router" // Register the router filter +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +const ( + defaultTestTimeout = 10 * time.Second + defaultTestShortTimeout = 100 * time.Microsecond + + defaultTestServiceName = "service-name" + defaultTestRouteConfigName = "route-config-name" + defaultTestClusterName = "cluster-name" +) + +func newStringP(s string) *string { + return &s +} + +// testWatcher is an implementation of the ConfigWatcher interface that sends +// the updates and errors received from the dependency manager to respective +// channels, for the tests to verify. +type testWatcher struct { + updateCh chan *xdsresource.XDSConfig + errorCh chan error +} + +// Update sends the received XDSConfig update to the update channel. +func (w *testWatcher) Update(cfg *xdsresource.XDSConfig) { + w.updateCh <- cfg +} + +// Error sends the received error to the error channel. +func (w *testWatcher) Error(err error) { + w.errorCh <- err +} + +func verifyError(ctx context.Context, errCh chan error, wantErr, wantNodeID string) error { + select { + case gotErr := <-errCh: + if gotErr == nil { + return fmt.Errorf("got nil error from resolver, want error %q", wantErr) + } + if !strings.Contains(gotErr.Error(), wantErr) { + return fmt.Errorf("got error from resolver %q, want %q", gotErr, wantErr) + } + if !strings.Contains(gotErr.Error(), wantNodeID) { + return fmt.Errorf("got error from resolver %q, want nodeID %q", gotErr, wantNodeID) + } + case <-ctx.Done(): + return fmt.Errorf("timeout waiting for error from dependency manager") + } + return nil +} + +func verifyXDSConfig(ctx context.Context, xdsCh chan *xdsresource.XDSConfig, errCh chan error, want *xdsresource.XDSConfig) error { + select { + case <-ctx.Done(): + return fmt.Errorf("timeout waiting for update from dependency manager") + case update := <-xdsCh: + cmpOpts := []cmp.Option{ + cmpopts.EquateEmpty(), + cmpopts.IgnoreFields(xdsresource.HTTPFilter{}, "Filter", "Config"), + cmpopts.IgnoreFields(xdsresource.ListenerUpdate{}, "Raw"), + cmpopts.IgnoreFields(xdsresource.RouteConfigUpdate{}, "Raw"), + } + if diff := cmp.Diff(update, want, cmpOpts...); diff != "" { + return fmt.Errorf("received unexpected update from dependency manager. Diff (-got +want):\n%v", diff) + } + case err := <-errCh: + return fmt.Errorf("received unexpected error from dependency manager: %v", err) + } + return nil +} + +func createXDSClient(t *testing.T, bootstrapContents []byte) xdsclient.XDSClient { + t.Helper() + + // Setup the bootstrap file contents. + config, err := bootstrap.NewConfigFromContents(bootstrapContents) + if err != nil { + t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err) + } + + pool := xdsclient.NewPool(config) + if err != nil { + t.Fatalf("Failed to create an xDS client pool: %v", err) + } + c, cancel, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + }) + if err != nil { + t.Fatalf("Failed to create an xDS client: %v", err) + } + t.Cleanup(cancel) + return c +} + +// Spins up an xDS management server and sets up an xDS bootstrap configuration +// file that points to it. +// +// Returns the following: +// - A reference to the xDS management server +// - Contents of the bootstrap configuration pointing to xDS management +// server +func setupManagementServerForTest(t *testing.T, nodeID string) (*e2e.ManagementServer, []byte) { + t.Helper() + + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) + t.Cleanup(mgmtServer.Stop) + + bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + return mgmtServer, bootstrapContents +} + +// Tests the happy case where the dependency manager receives all the required +// resources and verifies that Update is called with with the correct XDSConfig. +func (s) TestHappyCase(t *testing.T) { + nodeID := uuid.New().String() + mgmtServer, bc := setupManagementServerForTest(t, nodeID) + xdsClient := createXDSClient(t, bc) + + watcher := &testWatcher{ + updateCh: make(chan *xdsresource.XDSConfig), + errorCh: make(chan error), + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) + route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + Routes: []*v3routepb.RouteConfiguration{route}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher) + defer dm.Close() + wantXdsConfig := &xdsresource.XDSConfig{ + Listener: &xdsresource.ListenerUpdate{ + RouteConfigName: defaultTestRouteConfigName, + HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, + }, + RouteConfig: &xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{ + Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, + ActionType: xdsresource.RouteActionRoute, + }}, + }, + }, + }, + VirtualHost: &xdsresource.VirtualHost{ + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{ + Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, + ActionType: xdsresource.RouteActionRoute}, + }, + }, + } + if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil { + t.Fatal(err) + } +} + +// Tests the case where the listener contains an inline route configuration and +// verifies that Update is called with the correct XDSConfig. +func (s) TestInlineRouteConfig(t *testing.T) { + nodeID := uuid.New().String() + mgmtServer, bc := setupManagementServerForTest(t, nodeID) + xdsClient := createXDSClient(t, bc) + + watcher := &testWatcher{ + updateCh: make(chan *xdsresource.XDSConfig), + errorCh: make(chan error), + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ + RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{ + RouteConfig: e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName), + }, + HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})}, // router fields are unused by grpc + }) + listener := &v3listenerpb.Listener{ + Name: defaultTestServiceName, + ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm}, + FilterChains: []*v3listenerpb.FilterChain{{ + Name: "filter-chain-name", + Filters: []*v3listenerpb.Filter{{ + Name: wellknown.HTTPConnectionManager, + ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm}, + }}, + }}, + } + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher) + defer dm.Close() + + wantConfig := &xdsresource.XDSConfig{ + Listener: &xdsresource.ListenerUpdate{ + InlineRouteConfig: &xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{ + Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, + ActionType: xdsresource.RouteActionRoute}, + }, + }, + }, + }, + HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, + }, + RouteConfig: &xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{ + Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, + ActionType: xdsresource.RouteActionRoute}, + }, + }, + }, + }, + VirtualHost: &xdsresource.VirtualHost{ + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{ + Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, + ActionType: xdsresource.RouteActionRoute}, + }, + }, + } + if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantConfig); err != nil { + t.Fatal(err) + } +} + +// Tests the case where dependency manager only receives listener resource but +// does not receive route config resource. Verfies that Update is not called +// since we do not have all resources. +func (s) TestIncompleteResources(t *testing.T) { + nodeID := uuid.New().String() + mgmtServer, bc := setupManagementServerForTest(t, nodeID) + xdsClient := createXDSClient(t, bc) + + watcher := &testWatcher{ + updateCh: make(chan *xdsresource.XDSConfig), + errorCh: make(chan error), + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) + if err := mgmtServer.Update(ctx, e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + SkipValidation: true, + }); err != nil { + t.Fatal(err) + } + dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher) + defer dm.Close() + + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + select { + case <-sCtx.Done(): + case update := <-watcher.updateCh: + t.Fatalf("Received unexpected update from dependency manager: %+v", update) + case err := <-watcher.errorCh: + t.Fatalf("Received unexpected error from dependency manager: %v", err) + } +} + +// Tests the case where dependency manager receives a listener resource error by +// sending the correct update first and then removing the listener resource. It +// verifies that Error is called with the correct error. +func (s) TestListenerResourceError(t *testing.T) { + nodeID := uuid.New().String() + mgmtServer, bc := setupManagementServerForTest(t, nodeID) + xdsClient := createXDSClient(t, bc) + + watcher := &testWatcher{ + updateCh: make(chan *xdsresource.XDSConfig), + errorCh: make(chan error), + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Send a correct update first + listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) + listener.FilterChains = nil + route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + Routes: []*v3routepb.RouteConfiguration{route}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher) + defer dm.Close() + + wantXdsConfig := &xdsresource.XDSConfig{ + Listener: &xdsresource.ListenerUpdate{ + RouteConfigName: defaultTestRouteConfigName, + HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, + }, + RouteConfig: &xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{ + Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, + ActionType: xdsresource.RouteActionRoute, + }}, + }, + }, + }, + VirtualHost: &xdsresource.VirtualHost{ + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{ + Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, + ActionType: xdsresource.RouteActionRoute, + }}, + }, + } + if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil { + t.Fatal(err) + } + + // Remove listener resource so that we get listener resource error. + resources.Listeners = nil + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + if err := verifyError(ctx, watcher.errorCh, fmt.Sprintf("xds: resource %q of type %q has been removed", defaultTestServiceName, "ListenerResource"), nodeID); err != nil { + t.Fatal(err) + } +} + +// Tests the case where dependency manager receives a route config resource +// error by sending a route resource that is NACKed by the XDSClient. It +// verifies that Error is called with correct error. +func (s) TestRouteResourceError(t *testing.T) { + nodeID := uuid.New().String() + mgmtServer, bc := setupManagementServerForTest(t, nodeID) + xdsClient := createXDSClient(t, bc) + + errorCh := make(chan error, 1) + watcher := &testWatcher{ + updateCh: make(chan *xdsresource.XDSConfig), + errorCh: errorCh, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) + route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) + // Remove the Match to make sure the route resource is NACKed by XDSClient + // sending a route resource error to dependency manager. + route.VirtualHosts[0].Routes[0].Match = nil + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + Routes: []*v3routepb.RouteConfiguration{route}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher) + defer dm.Close() + + if err := verifyError(ctx, watcher.errorCh, "route resource error", nodeID); err != nil { + t.Fatal(err) + } +} + +// Tests the case where route config updates receives does not have any virtual +// host. Verifies that Error is called with correct error. +func (s) TestNoVirtualHost(t *testing.T) { + nodeID := uuid.New().String() + mgmtServer, bc := setupManagementServerForTest(t, nodeID) + xdsClient := createXDSClient(t, bc) + + watcher := &testWatcher{ + updateCh: make(chan *xdsresource.XDSConfig), + errorCh: make(chan error), + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) + route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) + route.VirtualHosts = nil + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + Routes: []*v3routepb.RouteConfiguration{route}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher) + defer dm.Close() + + if err := verifyError(ctx, watcher.errorCh, "could not find VirtualHost", nodeID); err != nil { + t.Fatal(err) + } +} + +// Tests the case where we get an ambient error and verify that we correctly log +// a warning for it. To make sure we get an ambient error, we send a correct +// update first, then send an invalid one and then send the valid resource +// again. We send the valid resource again so that we can be sure the ambient +// error reaches the dependency manager since there is no other way to wait for +// it. +func (s) TestAmbientError(t *testing.T) { + // Expect a warning log for the ambient error. + grpctest.ExpectWarning("Listener resource ambient error") + + nodeID := uuid.New().String() + mgmtServer, bc := setupManagementServerForTest(t, nodeID) + xdsClient := createXDSClient(t, bc) + + watcher := &testWatcher{ + updateCh: make(chan *xdsresource.XDSConfig), + errorCh: make(chan error), + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Configure a valid listener and route. + listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) + route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + Routes: []*v3routepb.RouteConfiguration{route}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher) + defer dm.Close() + + // Wait for the initial valid update. + wantXdsConfig := &xdsresource.XDSConfig{ + Listener: &xdsresource.ListenerUpdate{ + RouteConfigName: defaultTestRouteConfigName, + HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, + }, + RouteConfig: &xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{ + Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, + ActionType: xdsresource.RouteActionRoute, + }}, + }, + }, + }, + VirtualHost: &xdsresource.VirtualHost{ + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{ + Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, + ActionType: xdsresource.RouteActionRoute, + }}, + }, + } + if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil { + t.Fatal(err) + } + + // Configure a listener resource that is expected to be NACKed because it + // does not contain the `RouteSpecifier` field in the HTTPConnectionManager. + // Since a valid one is already cached, this should result in an ambient + // error. + hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ + HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})}, + }) + lis := &v3listenerpb.Listener{ + Name: defaultTestServiceName, + ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm}, + FilterChains: []*v3listenerpb.FilterChain{{ + Name: "filter-chain-name", + Filters: []*v3listenerpb.Filter{{ + Name: wellknown.HTTPConnectionManager, + ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm}, + }}, + }}, + } + resources.Listeners = []*v3listenerpb.Listener{lis} + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // We expect no call to Error or Update on our watcher. We just wait for + // a short duration to ensure that. + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + select { + case err := <-watcher.errorCh: + t.Fatalf("Unexpected call to Error %v", err) + case update := <-watcher.updateCh: + t.Fatalf("Unexpected call to Update %+v", update) + case <-sCtx.Done(): + } + + // Send valid resources again. + listener = e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) + route = e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + Routes: []*v3routepb.RouteConfiguration{route}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil { + t.Fatal(err) + } +} + +// Tests the case where the cluster name changes in the route resource update +// and verify that each time Update is called with correct cluster name. +func (s) TestRouteResourceUpdate(t *testing.T) { + nodeID := uuid.New().String() + mgmtServer, bc := setupManagementServerForTest(t, nodeID) + xdsClient := createXDSClient(t, bc) + + watcher := &testWatcher{ + updateCh: make(chan *xdsresource.XDSConfig), + errorCh: make(chan error), + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Initial resources with defaultTestClusterName + listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) + route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + Routes: []*v3routepb.RouteConfiguration{route}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher) + defer dm.Close() + + // Wait for the first update. + wantXdsConfig := &xdsresource.XDSConfig{ + Listener: &xdsresource.ListenerUpdate{ + RouteConfigName: defaultTestRouteConfigName, + HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, + }, + RouteConfig: &xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{ + Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, + ActionType: xdsresource.RouteActionRoute, + }}, + }, + }, + }, + VirtualHost: &xdsresource.VirtualHost{ + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{ + Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, + ActionType: xdsresource.RouteActionRoute, + }}, + }, + } + if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil { + t.Fatal(err) + } + + // Update route to point to a new cluster. + newClusterName := "new-cluster-name" + route2 := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, newClusterName) + resources.Routes = []*v3routepb.RouteConfiguration{route2} + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Wait for the second update and verify it has the new cluster. + wantXdsConfig.RouteConfig.VirtualHosts[0].Routes[0].WeightedClusters[0].Name = newClusterName + wantXdsConfig.VirtualHost.Routes[0].WeightedClusters[0].Name = newClusterName + if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil { + t.Fatal(err) + } +} + +// Tests the case where the route resource is first sent from the management +// server and the changed to be inline with the listener and then again changed +// to be received from the management server. It verifies that each time Update +// called with the correct XDSConfig. +func (s) TestRouteResourceChangeToInline(t *testing.T) { + nodeID := uuid.New().String() + mgmtServer, bc := setupManagementServerForTest(t, nodeID) + defer mgmtServer.Stop() + xdsClient := createXDSClient(t, bc) + + watcher := &testWatcher{ + updateCh: make(chan *xdsresource.XDSConfig), + errorCh: make(chan error), + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Initial resources with defaultTestClusterName + listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) + route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + Routes: []*v3routepb.RouteConfiguration{route}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher) + defer dm.Close() + + // Wait for the first update. + wantXdsConfig := &xdsresource.XDSConfig{ + Listener: &xdsresource.ListenerUpdate{ + RouteConfigName: defaultTestRouteConfigName, + HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, + }, + RouteConfig: &xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{ + Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, + ActionType: xdsresource.RouteActionRoute, + }}, + }, + }, + }, + VirtualHost: &xdsresource.VirtualHost{ + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{ + Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, + ActionType: xdsresource.RouteActionRoute, + }}, + }, + } + if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil { + t.Fatal(err) + } + + // Update route to point to a new cluster. + newClusterName := "new-cluster-name" + hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ + RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{ + RouteConfig: e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, newClusterName), + }, + HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})}, // router fields are unused by grpc + }) + resources.Listeners[0].ApiListener.ApiListener = hcm + resources.Routes = nil + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Wait for the second update and verify it has the new cluster. + wantXdsConfig.Listener.InlineRouteConfig = &xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: newClusterName, Weight: 100}}, + ActionType: xdsresource.RouteActionRoute, + }}, + }, + }, + } + wantXdsConfig.Listener.RouteConfigName = "" + wantXdsConfig.RouteConfig.VirtualHosts[0].Routes[0].WeightedClusters[0].Name = newClusterName + wantXdsConfig.VirtualHost.Routes[0].WeightedClusters[0].Name = newClusterName + if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil { + t.Fatal(err) + } + + // Change the route resource back to non-inline. + listener = e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) + route = e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + Routes: []*v3routepb.RouteConfiguration{route}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Wait for the third update and verify it has the original cluster. + wantXdsConfig.Listener.InlineRouteConfig = nil + wantXdsConfig.Listener.RouteConfigName = defaultTestRouteConfigName + wantXdsConfig.RouteConfig.VirtualHosts[0].Routes[0].WeightedClusters[0].Name = defaultTestClusterName + wantXdsConfig.VirtualHost.Routes[0].WeightedClusters[0].Name = defaultTestClusterName + if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil { + t.Fatal(err) + } +}